算法原理

条件概率

条件概率表示在B=b成立的条件下,A=a的概率,记作P(A=a|B=b),或者说条件概率是指事件A=a在另外一个事件B=b已经发生的条件下的概率。联合概率、边缘概率、条件概率的关系如下:
P ( A B ) = P ( A , B ) P ( B ) P(A|B)=\frac{P(A,B)}{P(B)} P(AB)=P(B)P(A,B)
写成乘法的形式为:
P ( A , B ) = P ( B ) P ( A B ) = P ( A ) P ( B A ) P(A,B)=P(B)*P(A|B)=P(A)*P(B|A) P(A,B)=P(B)P(AB)=P(A)P(BA)

联合概率

假设有随机变量A和B,此时P(A=a,B=b)用于表示A=a且B=b同时发生的概率。这类包含多个条件且所有条件同时成立的概率叫做联合概率。与之对应,P(A=a)或P(B=b)这类仅与单个随机变量相关的概率称为边缘概率。其中联合概率和边缘概率具有如下关系:
P ( A = a ) = b P ( A = a , B = b ) P(A=a)=\sum_bP(A=a,B=b) P(A=a)=bP(A=a,B=b)
P ( A = b ) = a P ( A = a , B = b ) P(A=b)=\sum_aP(A=a,B=b) P(A=b)=aP(A=a,B=b)

全概率公式

如果事件 A 1 A_1 A1, A 2 A_2 A2, A 3 A_3 A3,…, A n A_n An构成一个完备事件组,即他们两两互不相容,其和为全集;并且 P ( A i ) P(A_i) P(Ai)大于0,则对任意事件 B B B有:
P ( B ) = P ( B A 1 ) P ( A 1 ) + P ( B A 2 ) + . . . + P ( B A n ) P ( A n ) = i = 1 n P ( B A i ) P ( A i ) P(B)=P(B|A_1)P(A_1)+P(B|A_2)+...+P(B|A_n)P(A_n)=\sum_{i=1}^nP(B|A_i)P(A_i) P(B)=P(BA1)P(A1)+P(BA2)+...+P(BAn)P(An)=i=1nP(BAi)P(Ai)
上面的公式称为全概率公式。全概率公式是对复杂事件A的概率求解问题转化为了在不同情况下发生的简单事件的概率求和问题。

贝叶斯公式

这里使用机器学习常用的X,y来表示:
P ( y i x ) = P ( x y i ) P ( y i ) / P ( x ) P(y_{i}|x) = P(x|y_{i})P(y_{i})/P(x) P(yix)=P(xyi)P(yi)/P(x)
根据全概率公式,展开分母P(x),得到:
P ( y i x ) = P ( x y i ) P ( y i ) / 1 m P ( x y j ) P ( y j ) P(y_{i}|x) = P(x|y_{i})P(y_{i})/\sum_{1}^{m}P(x|y_{j})P(y_{j}) P(yix)=P(xyi)P(yi)/1mP(xyj)P(yj)

高斯分布

如果x是连续变量,如何估计P(x|yi)呢?我们可以假设在yi的条件下,x服从高斯分布。根据高斯分布的概率密度函数可以算出p(x|yi),公式如下:
P ( x ) = <mstyle mathsize="1&#46;2em"> 1 σ 2 π <mstyle mathsize="1em"> e ( x μ ) 2 2 σ 2 </mstyle> </mstyle> P(x) = \large\frac{1}{\sigma\sqrt{2\pi}}\normalsize e^{-\frac{(x-\mu)^{2}}{2\sigma^2{}}} P(x)=σ2π 1e2σ2(xμ)2

高斯朴素贝叶斯

如果x是多维的数据,那么我们可以假设P(x1|yi),P(x2|yi)…P(xn|yi)对应的事件是彼此独立的,这些值连乘在一起得到P(x|yi),“彼此独立”也就是朴素贝叶斯的朴素之处。
最后代入贝叶斯公式,就可以算出固定x分布下,每个yi的概率,寻找概率最大的yi作为分类输出。

代码实现

#coding=utf-8

import numpy as np
from numpy import ndarray, exp, pi, sqrt
from random import randint, seed, random
from numpy.random import choice, seed
from collections import Counter

# 加载肺癌数据集
def load_data():
    f = open("boston/breast_cancer.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split(',')
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
    if random_state is not None:
        seed(random_state)
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    for i in range(len(X)):
        if random() < prob:
            X_train.append(X[i])
            y_train.append(y[i])
        else:
            X_test.append(X[i])
            y_test.append(y[i])
    seed()
    return np.array(X_train), np.array(X_test), np.array(y_train), np.array(y_test)

# 准确率
def get_acc(y, y_hat):
    return sum(yi == yi_hat for yi, yi_hat in zip(y, y_hat)) / len(y)

# 查准率
def get_precision(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    predicted_positive = sum(y_hat)
    return true_postive / predicted_positive

# 查全率
def get_recall(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_postive / actual_positive

# 计算真正率
def get_tpr(y, y_hat):
    true_positive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_positive / actual_positive

# 计算真负率
def get_tnr(y, y_hat):
    true_negative = sum(1 - (yi or yi_hat) for yi, yi_hat in zip(y, y_hat))
    actual_negative = len(y) - sum(y)
    return true_negative / actual_negative

# 画ROC曲线
def get_roc(y, y_hat_prob):
    thresholds = sorted(set(y_hat_prob), reverse=True)
    ret = [[0, 0]]
    for threshold in thresholds:
        y_hat = [int(yi_hat_prob >= threshold) for yi_hat_prob in y_hat_prob]
        ret.append([get_tpr(y, y_hat), 1 - get_tnr(y, y_hat)])
    return ret
# 计算AUC(ROC曲线下方的面积)
def get_auc(y, y_hat_prob):
    roc = iter(get_roc(y, y_hat_prob))
    tpr_pre, fpr_pre = next(roc)
    auc = 0
    for tpr, fpr in roc:
        auc += (tpr + tpr_pre) * (fpr - fpr_pre) / 2
        tpr_pre = tpr
        fpr_pre = fpr
    return auc

class GaussianNB(object):
    # 初始化、存储先验概率、训练集的均值、方差及label的类别数量
    def __init__(self):
        self.prior = None
        self.avgs = None
        self.vars = None
        self.n_class = None

    # 计算先验概率
    # 通过Python自带的Counter计算每个类别的占比,再将结果存储到numpy数组中
    def get_prior(self, label):
        cnt = Counter(label)
        prior = np.array([cnt[i] / len(label) for i in range(len(cnt))])
        return prior

    # 计算训练集均值
    # 每个label类别分别计算均值
    def get_avgs(self, data, label):
        return np.array([data[label == i].mean(axis=0) for i in range(self.n_class)])

    # 计算训练集方差
    def get_vasrs(self, data, label):
        return np.array([data[label == i].var(axis=0) for i in range(self.n_class)])

    # 计算似然度
    # 通过高斯分布的概率密度函数计算出似然再连乘得到似然度
    # .prod代表连乘操作
    def get_likehood(self, row):
        return (1 / sqrt(2 * pi * self.vars) * exp(
            -(row - self.avgs) ** 2 / (2 * self.vars))).prod(axis=1)

    # 训练模型
    def fit(self, data, label):
        self.prior = self.get_prior(label)
        self.n_class = len(self.prior)
        self.avgs = self.get_avgs(data, label)
        self.vars = self.get_vasrs(data, label)

    # 预测概率prob
    # 用先验概率乘以似然度再归一化得到每个label的prob
    def predict_prob(self, data):
        likehood = np.apply_along_axis(self.get_likehood, axis=1, arr=data)
        probs = self.prior * likehood
        probs_sum = probs.sum(axis=1)
        return probs / probs_sum[:, None]

    # 预测label
    # 对于单个样本,取prob最大值所对应的类别,就是label的预测值。
    def predict(self, data):
        return self.predict_prob(data).argmax(axis=1)


# 效果评估
def main():
    print("Tesing the performance of Gaussian NaiveBayes...")
    data, label = load_data()
    data_train, data_test, label_train, label_test = train_test_split(data, label, random_state=100)
    clf = GaussianNB()
    clf.fit(data_train, label_train)
    y_hat = clf.predict(data_test)
    acc = get_acc(label_test, y_hat)
    print("Accuracy is %.3f" % acc)


main()

在肺癌数据上测试结果


拟合效果还不错。