算法原理

问题引入

接着上篇博客聊到的回归树,我们说到要预测一个班级的同学的成绩,用平均数数不妨为一个还不错的值。我们假设这个班级除了他自己,还有3位同学(请忽略为什么这个班级人这么少。。。)的成绩为50,60,70。那么其他学生的平均成绩就为60,然后我们用60来预测学生的成绩。也就是[60,60,60]。每个同学的成绩的残差就是成绩减去预测值=[5,6,7]-[6,6,6]=[-10,0,10]。

进一步

为了让模型更加准确,一个思路是让模型的残差变小。如何减少残差呢?我们不妨对残差建立一颗回归树,然后预测出准确的残差。假设这棵树预测的残差是[-9, 0, 9],将上一轮的预测值和这一轮的预测值求和,没个同学的成绩为[60,60,60]+[-9,0,9]=[51,60,69],显然更加靠近[50,60,70],成绩的残差现在变成[-1,0,1],模型的准确性得到了提升。我们将上面得到的过程整理如下:
第1轮预测:60, 60, 60
第1轮残差:[-9, 0, 9]
第2轮预测:[60, 60, 60[ + [-9, 0, 9] = [51, 60, 69]
第2轮残差:[-1, 0, 1]
第3轮预测:[60, 60, 60] + [-9, 0, 9] + [-0.8, 0, 0.7] = [50.2, 6, 6.97]
第3轮残差:[-0.8, 0, 0.3]
而这个过程中让残差越来越小的模型即为GBDT。

算法完整过程

#coding=utf-8
from copy import copy
from random import randint, seed, random
from time import time
from regression import RegressionTree
from random import choice

# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
    def fun():
        start = time()
        fn()
        ret = time() - start
        if ret < 1e-6:
            unit = "ns"
            ret *= 1e9
        elif ret < 1e-3:
            unit = "us"
            ret *= 1e6
        elif ret < 1:
            unit = "ms"
            ret *= 1e3
        else:
            unit = "s"
        print("Total run time is %.1f %s\n" % (ret, unit))
    return fun()

def load_data():
    f = open("boston/housing.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split()
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
    if random_state is not None:
        seed(random_state)
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    for i in range(len(X)):
        if random() < prob:
            X_train.append(X[i])
            y_train.append(y[i])
        else:
            X_test.append(X[i])
            y_test.append(y[i])
    seed()
    return X_train, X_test, y_train, y_test

# 计算回归模型的拟合优度
def get_r2(reg, X, y):
    y_hat = reg.predict(X)
    m = len(y)
    n = len(y_hat)
    sse = sum((yi - yi_hat) ** 2 for yi, yi_hat in zip(y, y_hat))
    y_avg = sum(y) / len(y)
    sst = sum((yi - y_avg) ** 2 for yi in y)
    r2 = 1 - sse / sst
    print("Test r2 is %.3f!" % r2)
    return r2

class GradientBoostingBase(object):
    # 初始化,存储回归树、学习率、初始预测值和变换函数。
    # (注:回归不需要做变换,因此函数的返回值等于参数)
    def __init__(self):
        self.trees = None
        self.lr = None
        self.init_val = None
        self.fn = lambda x: x

    # 计算初始预测值,初始预测值即y的平均值。
    def get_init_val(self, y):
        return sum(y) / len(y)

    # 计算残差
    def get_residuals(self, y, y_hat):
        return [yi - self.fn(y_hat_i) for yi, y_hat_i in zip(y, y_hat)]

    # 找到例子属于哪个分类的叶子节点
    def match_node(self, row, tree):
        nd = tree.root
        while nd.left and nd.right:
            if row[nd.feature] < nd.split:
                nd = nd.left
            else:
                nd = nd.right
        return nd

    # 得到回归树的所有的叶子节点
    def get_leaves(self, tree):
        nodes = []
        que = [tree.root]
        while que:
            node = que.pop(0)
            if node.left is None or node.right is None:
                nodes.append(node)
                continue
            left_node = node.left
            right_node = node.right
            que.append(left_node)
            que.append(right_node)
        return nodes

    # 将样本的索引划分为回归树的相应叶节点。
    # 返回一个字典,类似于:{node1: [1, 3, 5], node2: [2, 4, 6]...},代表哪个节点对哪些样本进行了决策(分类)
    def divide_regions(self, tree, nodes, x):
        regions = {node: [] for node in nodes}
        for i, row in enumerate(X):
            node = self._match_node(row, tree)
            regions[node].append(i)
        return regions

    # # 计算回归树的叶子节点值
    # def get_score(self, idxs, y_hat, residuals):
    #     return None
    #
    # # 更新回归树的叶子节点值
    # def update_score(self, tree, X, y_hat, residuals):
    #     nodes = self.get_leaves(tree)
    #     regions = self.divide_regions(tree, nodes, X)
    #     for node, idxs in regions.items():
    #         node.score = self.get_score(idxs, y_hat, residuals)
    #     tree.get_rules()

    # 训练模型的时候需要注意以下几点:
    # 1.控制树的最大深度max_depth;
    # 2.控制分裂时最少的样本量min_samples_split;
    # 3.训练每一棵回归树的时候要乘以一个学习率lr,防止模型过拟合;
    # 4.对样本进行抽样的时候要采用有放回的抽样方式。
    def fit(self, X, y, n_estimators, lr, max_depth, min_samples_split, subsample=None):
        self.init_val = self.get_init_val(y)
        n = len(y)
        y_hat = [self.init_val] * n
        residuals = self.get_residuals(y, y_hat)

        self.trees = []
        self.lr = lr
        for _ in range(n_estimators):
            idx = range(n)
            if subsample is not None:
                k = int(subsample * n)
                idx = choices(population=idx, k=k)
            X_sub = [X[i] for i in idx]
            residuals_sub = [residuals[i] for i in idx]
            y_hat_sub = [y_hat[i] for i in idx]
            tree = RegressionTree()
            tree.fit(X_sub, residuals_sub, max_depth, min_samples_split)
            # Update scores of tree leaf nodes
            # self.update_score(tree, X_sub, y_hat_sub, residuals_sub)
            # Update y_hat
            y_hat = [y_hat_i + lr * res_hat_i for y_hat_i, res_hat_i in zip(y_hat, tree.predict(X))]
            # Update residuals
            residuals = self.get_residuals(y, y_hat)
            self.trees.append(tree)

    # 对单个样本进行预测
    def _predict(self, Xi):
        ret = self.init_val + sum(self.lr * tree._predict(Xi) for tree in self.trees)
        return self.fn(ret)

    # 对多个样本进行预测
    def predict(self, X):
        return [self._predict(Xi) for Xi in X]
@run_time
def main():
    print('Testing the accuracy of GBDT RegressionTree...')
    X, y = load_data()
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=10)
    reg = GradientBoostingBase()
    reg.fit(X=X_train, y=y_train, n_estimators=4,
            lr=0.5, max_depth=3, min_samples_split=2)
    get_r2(reg, X_test, y_test)

在波士顿房价预测的数据集上的结果如下:
可以看到相对于回归树算法,模型的准确率有较大提升。