前言

这个算法依赖于前两篇文章,即回归树和GBDT回归。需要先搞清楚这两个算法的原理,地址为:https://blog.csdn.net/just_sort/article/details/100574688 和 https://blog.csdn.net/just_sort/article/details/100604262 。

算法原理

sigmoid函数

如果对Logistic 回归或者神经网络有所了解,都是很熟悉这个函数的,函数的表达式为: f ( x ) = 1 1 + e x f(x)=\frac{1}{1+e^{-x}} f(x)=1+ex1,函数图像如下:

sigmoid的值域为(0,1),导数为 f ( x ) = f ( x ) ( 1 f ( x ) ) f'(x)=f(x)*(1-f(x)) f(x)=f(x)(1f(x))

改造GBDT回归算法

在上篇文章中我们知道,假设要做m轮预测,预测函数为Fm,初始常量或每一轮的回归树为fm,输入变量为X,有:
F m ( X ) = F m 1 ( X ) + f m ( X ) F_m(X)=F_{m-1}(X)+f_m(X) Fm(X)=Fm1(X)+fm(X)
由于GBDT回归预测的值是连续无界的,而分类问题是离散有界的,所以这个时候引入的sigmoid就大展神威,我们可以将 p = 1 1 + e F m ( X ) p=\frac{1}{1+e^{-F_m(X)}} p=1+eFm(X)1作为最后的分类结果

一个例子

我们以分类问题的典型例子猫狗分类来进行举例,假设图片物体是猫为0,是狗为1。我们从回归树的文章中推导出,要预测班级学生的成绩用平均成绩是比较好的选择,那么要预测这个分类问题,这个初始预测值该如何计算呢,这里尝试推导一下:

公式7少了一个右括号,由此可知,如果要用一个常量来预测y,那么用 l o g ( s u m ( y ) / s u m ( 1 y ) ) log(sum(y)/sum(1-y)) log(sum(y)/sum(1y))是一个比较好的选择。
和GBDT回归的步骤相似,我们通过初始值[1, 0, 1]来计算样本分类结果为狗的初始值为 l o g ( ( 1 + 0 + 1 ) / ( 0 + 1 + 0 ) ) = 0.693 log((1+0+1)/(0+1+0))=0.693 log((1+0+1)/(0+1+0))=0.693,所以我们用0.693这个常量来预测分类为狗的可能性,即Sigmoid([0.693,0.693,0.693])=[0.667, 0.667, 0.667]。那么每个样本分类为狗的残差为 [1, 0, 1] - [0.667, 0.667, 0.667],所以残差为[0.333, -0.667, 0.333]。
同样,为了让模型更加准确,我们想要最小化这个残差。那么如何减少残差呢?和GBDT回归类似,我们不妨对残差建立一颗回归树,然后预测出准确的残差。假设这棵树预测的残差是[1,-0.5,1],将上一轮的预测值和这一轮的预测值求和,之后再求sigmoid值,每个样本分类为狗的可能性为Sigmoid([0.693, 0.693, 0.693] + [1, -0.5, 1]) = [0.845, 0.548, 0.845],显然与真实值[1, 0, 1]更加接近了, 这个时候每个样本为狗的残差变为[0.155, -0.548, 0.155],预测的准确性得到了提升。
我们将上面的过程整理如下:

  • 第1轮预测:Sigmoid(0.693, 0.693, 0.693) = [0.667, 0.667, 0.667]
  • 第1轮残差:[1, -0.5, 1]
  • 第2轮预测:Sigmoid(0.693, 0.693, 0.693 + [1, -0.5, 1]) (第1颗回归树)) = Sigmoid([1.693, 0.193, 1.693]) = [0.845, 0.548, 0.845]
  • 第2轮残差:[0.155, -0.548, 0.155]
  • 第3轮预测:Sigmoid(0.693, 0.693, 0.693 + 1, -0.5, 1 + 2, -1, 2) = Sigmoid([3.693, -0.807, 3.693]) = [0.976, 0.309, 0.976]
  • 第3轮残差:[0.024, -0.309, 0.024]
    可以看到残差越来越小,这种预测方法就是GBDT分类算法。

详细公式推导


因此,我们需要通过用第m-1轮的预测值和残差来得到函数fm,进而优化函数fm。而回归树的原理就是通过最佳划分区域的均值来进行预测,与GBDT回归不同,要把这个均值改为一个例子中的公式7。所以fm可以选用回归树作为基础模型,将初始值,m-1颗回归树的预测值相加再求Sigmoid值便可以预测y。

代码实现

#coding=utf-8
from copy import copy
from random import randint, seed, random
from time import time
from regression import RegressionTree
from gbdt_regressor import GradientBoostingBase
from random import choice
from math import exp, log

# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
    def fun():
        start = time()
        fn()
        ret = time() - start
        if ret < 1e-6:
            unit = "ns"
            ret *= 1e9
        elif ret < 1e-3:
            unit = "us"
            ret *= 1e6
        elif ret < 1:
            unit = "ms"
            ret *= 1e3
        else:
            unit = "s"
        print("Total run time is %.1f %s\n" % (ret, unit))
    return fun()

def load_cancer():
    f = open("boston/breast_cancer.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split(',')
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
    if random_state is not None:
        seed(random_state)
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    for i in range(len(X)):
        if random() < prob:
            X_train.append(X[i])
            y_train.append(y[i])
        else:
            X_test.append(X[i])
            y_test.append(y[i])
    seed()
    return X_train, X_test, y_train, y_test

# 准确率
def get_acc(y, y_hat):
    return sum(yi == yi_hat for yi, yi_hat in zip(y, y_hat)) / len(y)

# 查准率
def get_precision(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    predicted_positive = sum(y_hat)
    return true_postive / predicted_positive

# 查全率
def get_recall(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_postive / actual_positive

# 计算真正率
def get_tpr(y, y_hat):
    true_positive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_positive / actual_positive

# 计算真负率
def get_tnr(y, y_hat):
    true_negative = sum(1 - (yi or yi_hat) for yi, yi_hat in zip(y, y_hat))
    actual_negative = len(y) - sum(y)
    return true_negative / actual_negative

# 画ROC曲线
def get_roc(y, y_hat_prob):
    thresholds = sorted(set(y_hat_prob), reverse=True)
    ret = [[0, 0]]
    for threshold in thresholds:
        y_hat = [int(yi_hat_prob >= threshold) for yi_hat_prob in y_hat_prob]
        ret.append([get_tpr(y, y_hat), 1 - get_tnr(y, y_hat)])
    return ret
# 计算AUC(ROC曲线下方的面积)
def get_auc(y, y_hat_prob):
    roc = iter(get_roc(y, y_hat_prob))
    tpr_pre, fpr_pre = next(roc)
    auc = 0
    for tpr, fpr in roc:
        auc += (tpr + tpr_pre) * (fpr - fpr_pre) / 2
        tpr_pre = tpr
        fpr_pre = fpr
    return auc

def model_evaluation(clf, X, y):
    y_hat = clf.predict(X)
    y_hat_prob = [clf._predict(Xi) for Xi in X]
    ret = dict()
    ret["Accuracy"] = get_acc(y, y_hat)
    ret["Recall"] = get_recall(y, y_hat)
    ret['Precision'] = get_precision(y, y_hat)
    ret['AUC'] = get_auc(y, y_hat_prob)
    for k, v in ret.items():
        print("%s: %.3f" % (k, v))
    print()
    return ret

def sigmoid(x, x_min=-100):
    return 1 / (1 + exp(-x)) if x > x_min else 0

class GradientBoostingClassifier(GradientBoostingBase):
    def __init__(self):
        GradientBoostingBase.__init__(self)
        self.fn = sigmoid
    def get_init_val(self, y):
        n = len(y)
        y_sum = sum(y)
        return log((y_sum) / (n - y_sum))
    def get_score(self, idxs, y_hat, residuals):
        numerator = denominator = 0
        for idx in idxs:
            numerator += residuals[idx]
            denominator += y_hat[idx] * (1 - y_hat[idx])

        return numerator / denominator
    def predict(self, X, threshold=0.5):
        return [int(self._predict(Xi) >= threshold) for Xi in X]


@run_time
def main():
    print('Testing the accuracy of GBDT ClassifyTree...')
    X, y = load_cancer()
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=20)
    clf = GradientBoostingClassifier()
    clf.fit(X_train, y_train, n_estimators=2,
            lr=0.8, max_depth=3, min_samples_split=2)
    model_evaluation(clf, X_test, y_test)

调用的GBDT回归的代码修改为:

#coding=utf-8
from copy import copy
from random import randint, seed, random
from time import time
from regression import RegressionTree
from random import choice

# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
    def fun():
        start = time()
        fn()
        ret = time() - start
        if ret < 1e-6:
            unit = "ns"
            ret *= 1e9
        elif ret < 1e-3:
            unit = "us"
            ret *= 1e6
        elif ret < 1:
            unit = "ms"
            ret *= 1e3
        else:
            unit = "s"
        print("Total run time is %.1f %s\n" % (ret, unit))
    return fun()

def load_data():
    f = open("boston/housing.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split(',')
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
    if random_state is not None:
        seed(random_state)
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    for i in range(len(X)):
        if random() < prob:
            X_train.append(X[i])
            y_train.append(y[i])
        else:
            X_test.append(X[i])
            y_test.append(y[i])
    seed()
    return X_train, X_test, y_train, y_test


class GradientBoostingBase(object):
    # 初始化,存储回归树、学习率、初始预测值和变换函数。
    # (注:回归不需要做变换,因此函数的返回值等于参数)
    def __init__(self):
        self.trees = None
        self.lr = None
        self.init_val = None
        self.fn = lambda x: x

    # 计算初始预测值,初始预测值即y的平均值。
    def get_init_val(self, y):
        return sum(y) / len(y)

    # 计算残差
    def get_residuals(self, y, y_hat):
        return [yi - self.fn(y_hat_i) for yi, y_hat_i in zip(y, y_hat)]

    # 找到例子属于哪个分类的叶子节点
    def match_node(self, row, tree):
        nd = tree.root
        while nd.left and nd.right:
            if row[nd.feature] < nd.split:
                nd = nd.left
            else:
                nd = nd.right
        return nd

    # 得到回归树的所有的叶子节点
    def get_leaves(self, tree):
        nodes = []
        que = [tree.root]
        while que:
            node = que.pop(0)
            if node.left is None or node.right is None:
                nodes.append(node)
                continue
            left_node = node.left
            right_node = node.right
            que.append(left_node)
            que.append(right_node)
        return nodes

    # 将样本的索引划分为回归树的相应叶节点。
    # 返回一个字典,类似于:{node1: [1, 3, 5], node2: [2, 4, 6]...},代表哪个节点对哪些样本进行了决策(分类)
    def divide_regions(self, tree, nodes, X):
        regions = {node: [] for node in nodes}
        for i, row in enumerate(X):
            node = self.match_node(row, tree)
            regions[node].append(i)
        return regions

    # 计算回归树的叶子节点值
    def get_score(self, idxs, y_hat, residuals):

        return None


    # 更新回归树的叶子节点值
    def update_score(self, tree, X, y_hat, residuals):
        nodes = self.get_leaves(tree)
        regions = self.divide_regions(tree, nodes, X)
        for node, idxs in regions.items():
            node.score = self.get_score(idxs, y_hat, residuals)
        tree.get_rules()

    # 训练模型的时候需要注意以下几点:
    # 1.控制树的最大深度max_depth;
    # 2.控制分裂时最少的样本量min_samples_split;
    # 3.训练每一棵回归树的时候要乘以一个学习率lr,防止模型过拟合;
    # 4.对样本进行抽样的时候要采用有放回的抽样方式。
    def fit(self, X, y, n_estimators, lr, max_depth, min_samples_split, subsample=None):
        self.init_val = self.get_init_val(y)
        n = len(y)
        y_hat = [self.init_val] * n
        residuals = self.get_residuals(y, y_hat)

        self.trees = []
        self.lr = lr
        for _ in range(n_estimators):
            idx = range(n)
            if subsample is not None:
                k = int(subsample * n)
                idx = choices(population=idx, k=k)
            X_sub = [X[i] for i in idx]
            residuals_sub = [residuals[i] for i in idx]
            y_hat_sub = [y_hat[i] for i in idx]
            tree = RegressionTree()
            tree.fit(X_sub, residuals_sub, max_depth, min_samples_split)
            # Update scores of tree leaf nodes
            self.update_score(tree, X_sub, y_hat_sub, residuals_sub)
            # Update y_hat
            # y_hat = [y_hat_i + lr * res_hat_i for y_hat_i, res_hat_i in zip(y_hat, tree.predict(X))]
            # Update residuals
            residuals = self.get_residuals(y, y_hat)
            self.trees.append(tree)

    # 对单个样本进行预测
    def _predict(self, Xi):
        ret = self.init_val + sum(self.lr * tree._predict(Xi) for tree in self.trees)
        return self.fn(ret)

    # 对多个样本进行预测
    def predict(self, X):
        #return [self._predict(Xi) for Xi in X]
        return None

实际上就是把几个函数留空,让GBDT分类继承的时候重写就可以了。

对肺癌数据集的测试

可以看到各项指标都还不错。