前言

这里以ID3算法做二分类为例介绍决策树算法的原理。所谓决策树就是由一个个"决策"组成的树。决策树中,结点分为两种,放“决策依据”的是非叶结点,放“决策结果”的是叶结点。那么决策是什么呢?很简单,决策就是对于一个问题有多种答案,我们选择答案的过程就是决策,例如判断一个人的身高是否大于180就是一种决策。

ID3算法原理

ID3算法的局限性

(1)不支持连续特征
(2)采用信息增益大的特征优先建立决策树的节点。在相同条件下,取值比较多的特征比取值少的特征信息增益大。
(3)不支持缺失值处理
(4)没有应对过拟合的策略

#代码实现

#coding=utf-8
from math import log2
from copy import copy
from time import time
from random import random
from random import randint, seed, random
import numpy as np

def list_split(X, idxs, feature, split):
    ret = [[], []]
    while idxs:
        if X[idxs[0]][feature] < split:
            ret[0].append(idxs.pop(0))
        else:
            ret[1].append(idxs.pop(0))
    return ret

# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
    def fun():
        start = time()
        fn()
        ret = time() - start
        if ret < 1e-6:
            unit = "ns"
            ret *= 1e9
        elif ret < 1e-3:
            unit = "us"
            ret *= 1e6
        elif ret < 1:
            unit = "ms"
            ret *= 1e3
        else:
            unit = "s"
        print("Total run time is %.1f %s\n" % (ret, unit))
    return fun()

# 加载肺癌数据集
def load_data():
    f = open("boston/breast_cancer.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split(',')
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
    if random_state is not None:
        seed(random_state)
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    for i in range(len(X)):
        if random() < prob:
            X_train.append(X[i])
            y_train.append(y[i])
        else:
            X_test.append(X[i])
            y_test.append(y[i])
    seed()
    return np.array(X_train), np.array(X_test), np.array(y_train), np.array(y_test)

# 准确率
def get_acc(y, y_hat):
    return sum(yi == yi_hat for yi, yi_hat in zip(y, y_hat)) / len(y)

# 查准率
def get_precision(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    predicted_positive = sum(y_hat)
    return true_postive / predicted_positive

# 查全率
def get_recall(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_postive / actual_positive

# 计算真正率
def get_tpr(y, y_hat):
    true_positive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_positive / actual_positive

# 计算真负率
def get_tnr(y, y_hat):
    true_negative = sum(1 - (yi or yi_hat) for yi, yi_hat in zip(y, y_hat))
    actual_negative = len(y) - sum(y)
    return true_negative / actual_negative

# 画ROC曲线
def get_roc(y, y_hat_prob):
    thresholds = sorted(set(y_hat_prob), reverse=True)
    ret = [[0, 0]]
    for threshold in thresholds:
        y_hat = [int(yi_hat_prob >= threshold) for yi_hat_prob in y_hat_prob]
        ret.append([get_tpr(y, y_hat), 1 - get_tnr(y, y_hat)])
    return ret
# 计算AUC(ROC曲线下方的面积)
def get_auc(y, y_hat_prob):
    roc = iter(get_roc(y, y_hat_prob))
    tpr_pre, fpr_pre = next(roc)
    auc = 0
    for tpr, fpr in roc:
        auc += (tpr + tpr_pre) * (fpr - fpr_pre) / 2
        tpr_pre = tpr
        fpr_pre = fpr
    return auc

# 定义决策树的节点
class Node(object):
    def __init__(self, prob=None):
        self.prob = prob
        self.left = None
        self.right = None
        self.feature = None
        self.split = None


class DecisionTree(object):
    # 本决策树只支持使用ID3的二分类
    # root代表根节点,depth代表决策树的深度
    def __init__(self):
        self.root = Node()
        self.depth = 1
        self._rules = None

    # 对特定特征feature和分裂点x
    def get_split_effect(self, X, y, idx, feature, split):
        n = len(idx)
        pos_cnt = [0, 0]
        cnt = [0, 0]
        for i in idx:
            xi, yi = X[i][feature], y[i]
            if xi < split:
                cnt[0] += 1
                pos_cnt[0] += yi
            else:
                cnt[1] += 1
                pos_cnt[1] += y[i]
        # 计算分裂影响
        prob = [pos_cnt[0] / cnt[0], pos_cnt[1] / cnt[1]]
        rate = [cnt[0] / n, cnt[1] / n]
        return prob, rate

    # 计算熵
    def get_entropy(self, p):
        if p == 1 or p == 0:
            return 0
        else:
            q = 1 - p
            return -(p * log2(p) + q * log2(q))

    # 计算信息熵
    def get_info(self, y, idx):
        p = sum(y[i] for i in idx) / len(idx)
        return self.get_entropy(p)

    # 计算条件熵
    def get_cond_info(self, prob, rate):
        info_left = self.get_entropy(prob[0])
        info_right = self.get_entropy(prob[1])
        return rate[0] * info_left + rate[1] * info_right
    # 寻找feature特征下的最好的分裂点
    def choose_split(self, X, y, idxs, feature):
        unique = set([X[i][feature] for i in idxs])
        if len(unique) == 1:
            return None
        unique.remove(min(unique))
        def f(split):
            info = self.get_info(y, idxs)
            prob, rate = self.get_split_effect(X, y, idxs, feature, split)
            cond_info = self.get_cond_info(prob, rate)
            # 信息增益
            gain = info - cond_info
            return gain, split, prob
        # 得到用于最大信息增益的分裂点
        gain, split, prob = max((f(split) for split in unique), key=lambda x: x[0])

        return gain, feature, split, prob

    # 寻找具有最大信息增益的特征
    def choose_feature(self, X, y, idxs):
        m = len(X[0])
        split_rets = map(lambda j: self.choose_split(X, y, idxs, j), range(m))
        split_rets = filter(lambda x: x is not None, split_rets)
        return max(split_rets, default=None, key=lambda x: x[0])

    def expr2literal(self, expr):
        feature, op, split = expr
        op = ">=" if op == 1 else "<"
        return "Feature%d %s %.4f" % (feature, op, split)

    # 获取决策树所有叶子节点的规则
    def get_rules(self):
        que = [[self.root, []]]
        self._rules = []
        while que:
            nd, exprs = que.pop(0)
            if not(nd.left or nd.right):
                literals = list(map(self.expr2literal, exprs))
                self._rules.append([literals, nd.prob])
            if nd.left:
                rule_left = copy(exprs)
                rule_left.append([nd.feature, -1, nd.split])
                que.append([nd.left, rule_left])
            if nd.right:
                rule_right = copy(exprs)
                rule_right.append([nd.feature, 1, nd.split])
                que.append([nd.right, rule_right])

    # 训练数据
    def fit(self, X, y, max_depth=4, min_samples_split=2):
        idxs = list(range(len(y)))
        que = [(self.depth+1, self.root, idxs)]
        while que:
            depth, nd, idxs = que.pop(0)
            if depth > max_depth:
                depth -= 1
                break
            if len(idxs) < min_samples_split or nd.prob == 1 or nd.prob == 0:
                continue
            split_ret = self.choose_feature(X, y, idxs)
            if split_ret is None:
                continue
            _, feature, split, prob = split_ret
            nd.feature = feature
            nd.split = split
            nd.left = Node(prob[0])
            nd.right = Node(prob[1])
            idxs_split = list_split(X, idxs, feature, split)
            que.append((depth + 1, nd.left, idxs_split[0]))
            que.append((depth + 1, nd.right, idxs_split[1]))
        self.depth = depth
        self.get_rules()

    def print_rules(self):
        for i, rule in enumerate(self._rules):
            literals, prob = rule
            print("Rule %d: " % i, ' | '.join(literals) + ' => y_hat %.4f' % prob)
            print()
    def _predict(self, Xi):
        nd = self.root
        while nd.left and nd.right:
            if Xi[nd.feature] < nd.split:
                nd = nd.left
            else:
                nd = nd.right
        return nd.prob

    def predict(self, X, threshold=0.5):
        return [int(self._predict(Xi) >= threshold) for Xi in X]


@run_time
# 效果评估
def main():
    print("Tesing the performance of DecisionTree...")
    data, label = load_data()
    data_train, data_test, label_train, label_test = train_test_split(data, label, random_state=100)
    clf = DecisionTree()
    clf.fit(data_train, label_train)
    clf.print_rules()
    y_hat = clf.predict(data_test)
    acc = get_acc(label_test, y_hat)
    print("Accuracy is %.3f" % acc)

在肺癌数据集上测试效果

可以看到分类效果还不错。