算法原理
问题引入
接着上篇博客聊到的回归树,我们说到要预测一个班级的同学的成绩,用平均数数不妨为一个还不错的值。我们假设这个班级除了他自己,还有3位同学(请忽略为什么这个班级人这么少。。。)的成绩为50,60,70。那么其他学生的平均成绩就为60,然后我们用60来预测学生的成绩。也就是[60,60,60]。每个同学的成绩的残差就是成绩减去预测值=[5,6,7]-[6,6,6]=[-10,0,10]。
进一步
为了让模型更加准确,一个思路是让模型的残差变小。如何减少残差呢?我们不妨对残差建立一颗回归树,然后预测出准确的残差。假设这棵树预测的残差是[-9, 0, 9],将上一轮的预测值和这一轮的预测值求和,没个同学的成绩为[60,60,60]+[-9,0,9]=[51,60,69],显然更加靠近[50,60,70],成绩的残差现在变成[-1,0,1],模型的准确性得到了提升。我们将上面得到的过程整理如下:
第1轮预测:60, 60, 60
第1轮残差:[-9, 0, 9]
第2轮预测:[60, 60, 60[ + [-9, 0, 9] = [51, 60, 69]
第2轮残差:[-1, 0, 1]
第3轮预测:[60, 60, 60] + [-9, 0, 9] + [-0.8, 0, 0.7] = [50.2, 6, 6.97]
第3轮残差:[-0.8, 0, 0.3]
而这个过程中让残差越来越小的模型即为GBDT。
算法完整过程
#coding=utf-8
from copy import copy
from random import randint, seed, random
from time import time
from regression import RegressionTree
from random import choice
# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
def fun():
start = time()
fn()
ret = time() - start
if ret < 1e-6:
unit = "ns"
ret *= 1e9
elif ret < 1e-3:
unit = "us"
ret *= 1e6
elif ret < 1:
unit = "ms"
ret *= 1e3
else:
unit = "s"
print("Total run time is %.1f %s\n" % (ret, unit))
return fun()
def load_data():
f = open("boston/housing.csv")
X = []
y = []
for line in f:
line = line[:-1].split()
xi = [float(s) for s in line[:-1]]
yi = line[-1]
if '.' in yi:
yi = float(yi)
else:
yi = int(yi)
X.append(xi)
y.append(yi)
f.close()
return X, y
# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
if random_state is not None:
seed(random_state)
X_train = []
X_test = []
y_train = []
y_test = []
for i in range(len(X)):
if random() < prob:
X_train.append(X[i])
y_train.append(y[i])
else:
X_test.append(X[i])
y_test.append(y[i])
seed()
return X_train, X_test, y_train, y_test
# 计算回归模型的拟合优度
def get_r2(reg, X, y):
y_hat = reg.predict(X)
m = len(y)
n = len(y_hat)
sse = sum((yi - yi_hat) ** 2 for yi, yi_hat in zip(y, y_hat))
y_avg = sum(y) / len(y)
sst = sum((yi - y_avg) ** 2 for yi in y)
r2 = 1 - sse / sst
print("Test r2 is %.3f!" % r2)
return r2
class GradientBoostingBase(object):
# 初始化,存储回归树、学习率、初始预测值和变换函数。
# (注:回归不需要做变换,因此函数的返回值等于参数)
def __init__(self):
self.trees = None
self.lr = None
self.init_val = None
self.fn = lambda x: x
# 计算初始预测值,初始预测值即y的平均值。
def get_init_val(self, y):
return sum(y) / len(y)
# 计算残差
def get_residuals(self, y, y_hat):
return [yi - self.fn(y_hat_i) for yi, y_hat_i in zip(y, y_hat)]
# 找到例子属于哪个分类的叶子节点
def match_node(self, row, tree):
nd = tree.root
while nd.left and nd.right:
if row[nd.feature] < nd.split:
nd = nd.left
else:
nd = nd.right
return nd
# 得到回归树的所有的叶子节点
def get_leaves(self, tree):
nodes = []
que = [tree.root]
while que:
node = que.pop(0)
if node.left is None or node.right is None:
nodes.append(node)
continue
left_node = node.left
right_node = node.right
que.append(left_node)
que.append(right_node)
return nodes
# 将样本的索引划分为回归树的相应叶节点。
# 返回一个字典,类似于:{node1: [1, 3, 5], node2: [2, 4, 6]...},代表哪个节点对哪些样本进行了决策(分类)
def divide_regions(self, tree, nodes, x):
regions = {node: [] for node in nodes}
for i, row in enumerate(X):
node = self._match_node(row, tree)
regions[node].append(i)
return regions
# # 计算回归树的叶子节点值
# def get_score(self, idxs, y_hat, residuals):
# return None
#
# # 更新回归树的叶子节点值
# def update_score(self, tree, X, y_hat, residuals):
# nodes = self.get_leaves(tree)
# regions = self.divide_regions(tree, nodes, X)
# for node, idxs in regions.items():
# node.score = self.get_score(idxs, y_hat, residuals)
# tree.get_rules()
# 训练模型的时候需要注意以下几点:
# 1.控制树的最大深度max_depth;
# 2.控制分裂时最少的样本量min_samples_split;
# 3.训练每一棵回归树的时候要乘以一个学习率lr,防止模型过拟合;
# 4.对样本进行抽样的时候要采用有放回的抽样方式。
def fit(self, X, y, n_estimators, lr, max_depth, min_samples_split, subsample=None):
self.init_val = self.get_init_val(y)
n = len(y)
y_hat = [self.init_val] * n
residuals = self.get_residuals(y, y_hat)
self.trees = []
self.lr = lr
for _ in range(n_estimators):
idx = range(n)
if subsample is not None:
k = int(subsample * n)
idx = choices(population=idx, k=k)
X_sub = [X[i] for i in idx]
residuals_sub = [residuals[i] for i in idx]
y_hat_sub = [y_hat[i] for i in idx]
tree = RegressionTree()
tree.fit(X_sub, residuals_sub, max_depth, min_samples_split)
# Update scores of tree leaf nodes
# self.update_score(tree, X_sub, y_hat_sub, residuals_sub)
# Update y_hat
y_hat = [y_hat_i + lr * res_hat_i for y_hat_i, res_hat_i in zip(y_hat, tree.predict(X))]
# Update residuals
residuals = self.get_residuals(y, y_hat)
self.trees.append(tree)
# 对单个样本进行预测
def _predict(self, Xi):
ret = self.init_val + sum(self.lr * tree._predict(Xi) for tree in self.trees)
return self.fn(ret)
# 对多个样本进行预测
def predict(self, X):
return [self._predict(Xi) for Xi in X]
@run_time
def main():
print('Testing the accuracy of GBDT RegressionTree...')
X, y = load_data()
X_train, X_test, y_train, y_test = train_test_split(
X, y, random_state=10)
reg = GradientBoostingBase()
reg.fit(X=X_train, y=y_train, n_estimators=4,
lr=0.5, max_depth=3, min_samples_split=2)
get_r2(reg, X_test, y_test)
在波士顿房价预测的数据集上的结果如下:
可以看到相对于回归树算法,模型的准确率有较大提升。