这里只用了机器学习(朴素贝叶斯,SVM)做了文本分类,没有使用深度学习方法。

文本分类流程:分词->去停用词->获取词典(降维)->求TF-IDF特征->分类
图片说明

图片说明

许多常用单词对分类决策的帮助不大,比如“的”和标点符号等。也可能一些词在所有类别中均匀出现,为了消除这些词的影响,一方面可以使用停用词表,另一方面可以用卡方非参数检验来过滤掉与类别相关程度不高的词语。
卡方检验是一种检验两个变量独立性的方法,基本思想是通过观察实际值与理论值的偏差来确定理论的正确与否。具体做的时候常常先假设两个变量确实是独立的(原假设),然后观察实际值与理论值的偏差程度

import json
import codecs
import re
from math import log2
import time
def chi_square(N_10, N_11, N_00, N_01):
    """
    卡方计算
    :param N_10:
    :param N_11:
    :param N_00:
    :param N_01:
    :return: 词项t卡方值
    """
    fenzi = (N_11 + N_10 + N_01 + N_00)*(N_11*N_00-N_10*N_01)*(N_11*N_00-N_10*N_01)
    fenmu = (N_11+N_01)*(N_11+N_10)*(N_10+N_00)*(N_01+N_00)
    if fenmu == 0:
        return 0
    return fenzi*1.0/fenmu

def selectFeatures(documents, category_name, top_k, select_type="chi"):
    """
    特征抽取
    :param documents: 预处理后的文档集
    :param category_name: 类目名称
    :param top_k:  返回的最佳特征数量
    :param select_type: 特征选择的方法,可取值chi,mi,freq,默认为chi
    :return:  最佳特征词序列
    """
    L = []
    # 互信息和卡方特征抽取方法
    if select_type == "chi" or select_type == "mi":
        for t in vocabulary:
            print("hans:",t)
            start_time = time.time()
            N_11 = 0
            N_10 = 0
            N_01 = 0
            N_00 = 0
            N = 0
            for label, word_set in documents:
                if (t in word_set) and (category_name == label):
                    N_11 += 1
                elif (t in word_set) and (category_name != label):
                    N_10 += 1
                elif (t not in word_set) and (category_name == label):
                    N_01 += 1
                elif (t not in word_set) and (category_name != label):
                    N_00 += 1
                else:
                    print("N error")
                    exit(1)

            if N_00 == 0 or N_01 == 0 or N_10 == 0 or N_11 == 0:
                continue
            # 卡方计算
            A_tc = chi_square(N_10, N_11, N_00, N_01)
            L.append((t, A_tc))
            end_time = time.time()
            print("time:",end_time-start_time)
    return sorted(L, key=lambda x:x[1], reverse=True)[:top_k]

vocabulary = set()
def main():
    # 读取文档集(需要根据具体类目名称修改)
    category_name_li = ["entertainment", "military", "sports",
                        "education", "finance", "politics","stock",
                        "energy","home","social","tech"]
    # 获取文本(根目录需要根据具体类目名称修改)
    all_text = getDocuments(category_name_li)
    print("all_text len = ", len(all_text))
    # 读取词汇表
    vocabulary = getVocabulary(category_name_li)
    print("vocabulary len = ", len(vocabulary))
    # 获取特征词表
    print("="*20, '\n', "  卡方特征选择  \n", "="*20)
    feature_select_type = "chi"
    for category_name in category_name_li:
        # 特征抽取,最后一个参数可选值 "chi"卡方
        feature_li = selectFeatures(all_text, category_name, 1000, feature_select_type)
        print(category_name)
        f = open("count/"+category_name+".txt",'w')
        for t, i_uc in feature_li:
            print("%s\t%.3f" % (t, i_uc))
            f.write(t+'\t'+str(i_uc)+'\n')
        f.close()

if __name__ == "__main__":
    main()

这里我选取了每一类的top1000卡方值。但是实际的特征并不是每一类1000个词,其实1000还挺多的,还可以再缩小范围。
图片说明

from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer, TfidfTransformer
tfidf_vec = TfidfVectorizer(vocabulary=self.vocabulary)
tfidf_matrix_train = tfidf_vec.fit_transform(train_list) #train_list=['名词1 名词2 ...','名词1 名词2 ...',...]
tfidf_matrix_test = tfidf_vec.fit_transform(test_list) ##test_list=['名词1 名词2 ...','名词1 名词2 ...',...]
print(tfidf_matrix_train.shape)
print(tfidf_matrix_test.shape)

# output:
# (400000, 864)
# (600000, 864)

tfidf_matrix_train,tfidf_matrix_test 分别是训练集和测试集的TF-IDF向量,shape为(400000, 864),(600000,864),维度为864,训练集和测试集文本数分别为400000和600000

我们希望把这个tfidf向量保存下来,因为不想每次训练测试的时候都计算一次tfidf,这个过程有点耗时,不仅TfidfVectorizer计算慢(数据量太大),加载源数据也非常慢,需要好几分钟。

TfidfVectorizer返回的是一个稀疏矩阵,即tfidf_matrix_train和tfidf_matrix_test是两个稀疏矩阵,是scipy里实现的csr_matrix。下面的代码实现了csr_matrix的保存和加载,整个过程非非常快只要两三秒,保存的npz文件也只有几十兆。

from scipy import sparse
sparse.save_npz("train.npz",tfidf_matrix_train)
sparse.save_npz("test.npz",tfidf_matrix_test)
train_data = sparse.load_npz("train.npz")
test_data = sparse.load_npz("test.npz")

在处理数据的过程中,多次用到crs_matrix,比如在做交叉验证的时候,需要调换训练集和测试集的TF-IDF向量,所以这里简单总结一下crs_matrix用法

from scipy.sparse import csr_matrix
row = [0,0,1,3]                                        # 行下标
col = [1,2,2,0]                                        # 列下标
data = [1.0,2.0,3.0,4.0]                               # value
tfidf = csr_matrix((data, (row, col)), shape=(4,4))    # 构造函数
print(tfidf)
print(tfidf.toarray())                                 # 稀疏矩阵转m*n矩阵

''' output:
  (0, 1)        1.0
  (0, 2)        2.0
  (1, 2)        3.0
  (3, 0)        4.0

[[0. 1. 2. 0.]
 [0. 0. 3. 0.]
 [0. 0. 0. 0.]
 [4. 0. 0. 0.]]
'''


d_coo = tfidf.getrow(0).tocoo()                       # 得到稀疏矩阵第0行并转换成COOrdinate格式
print(d_coo.col)
print(d_coo.data)

'''output:
[1 2]

[1. 2.]
'''

5.分类

朴素贝叶斯:

from sklearn.naive_bayes import MultinomialNB
import numpy as np
from sklearn.metrics import confusion_matrix
class Eval:
    def __init__(self,pred,gt,cfg):
        self.pred = np.array(pred)
        self.gt = np.array(gt)
        assert len(self.pred.shape)==1, len(self.gt.shape)==1

    '''
    准确率(accuracy) = 预测对的/所有 = (TP+TN)/(TP+FN+FP+TN)
    '''
    def get_acc(self):
        return len(np.where(self.pred==self.gt)[0])/len(self.gt)

    '''
    精确率是针对我们预测结果而言的,它表示的是预测为正的样本中有多少是真正的正样本。那么预测为正就有两种    可能了,一种就是把正类预测为正类(TP),另一种就是把负类预测为正类(FP)
    精确率(precision) = TP/(TP+FP)
    '''
    def get_precision(self):
        precision = []
        u, indices = np.unique(self.pred, return_inverse=True)
        for i in range(len(u)):
            TP_FP = np.where(indices==i)[0]
            TP = set(np.where(self.gt==u[i])[0]).intersection(set(TP_FP))
            precision.append(len(TP)/len(TP_FP))
        return precision

    '''
    召回率是针对我们原来的样本而言的,它表示的是样本中的正例有多少被预测正确了。那也有两种可能,一种是把    原来的正类预测成正类(TP),另一种就是把原来的正类预测为负类(FN)。
    召回率(recall) = TP/(TP+FN)
    '''
    def get_recall(self):
        recall = []
        u, indices = np.unique(self.gt, return_inverse=True)
        for i in range(len(u)):
            TP_FN = np.where(indices==i)[0]
            TP = set(np.where(self.pred==u[i])[0]).intersection(set(TP_FN))
            recall.append(len(TP)/len(TP_FN))
        return recall

    '''
    生成混淆矩阵
    '''
    def get_matrix(self):
        cm = confusion_matrix(self.gt,self.pred)
        return cm

clf = MultinomialNB()
print("training...")
clf.fit(X_train, y_train)              # X_train:TF-IDF;y_train: label_list
print("predict start!")
y_predict = clf.predict(X_test)
evalution = Eval(y_predict, y_test, args.cfg)
print("Acc:",evalution.get_acc())
print("Precision:",evalution.get_precision())
print("Recall:",evalution.get_recall())
print("Confusion matrix:")
print(evalution.get_matrix())

SVM:

clf=LinearSVC()
print("SVM training")
clf.fit(X_train,y_train)                      # X_train:TF-IDF;y_train: label_list
print("predict start!")
y_predict = clf.predict(X_test)
evalution = Eval(y_predict, y_test, args.cfg)
print("Acc:", evalution.get_acc())
print("Precision:", evalution.get_precision())
print("Recall:", evalution.get_recall())
print("Confusion matrix:", evalution.get_matrix())