原理

如果一个样本在特征空间中的k个最相似(即特征空间中最邻近)的样本中的大多数属于某一个类别,则该样本也属于这个类别。
KNN算法不仅可以用于分类,还可以用于回归。通过找出一个样本的k个最近邻居,将这些邻居的属性的平均值赋给该样本,就可以得到该样本的属性。

实现

如果按照原理来暴力实现的话,是很简单的。但暴力实现的复杂度是很高的,因为我们都要遍历样本的所有数据求离当前要查询的数据的距离。所以在上两篇文章中引入了最大堆和KD-Tree,这两种数据结构正式为KD-Tree的加速查找服务的。我们知道KNN算法是一个寻找Top K的问题,解决这类问题可以使用大顶堆,可以省去排序的过程。然后对于多维数组的查找的问题,KD-Tree也是解决这类问题的一个不错的方式。
具体来说,KD-Tree只能找到最近邻,而我们需要找到k近邻。所以当找到最近邻的时候,让算法不要退出循环,继续查找,直到我们的大顶堆中堆顶也比未被查找的邻居们都近时,再退出循环。那么保留在大顶堆里面的就是topk了。

代码

#coding=utf-8
from kdtree import KDTree
from max_heap import MaxHeap

from copy import copy
from random import randint, seed, random
from time import time
from random import choice
from math import exp, log

# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
    def fun():
        start = time()
        fn()
        ret = time() - start
        if ret < 1e-6:
            unit = "ns"
            ret *= 1e9
        elif ret < 1e-3:
            unit = "us"
            ret *= 1e6
        elif ret < 1:
            unit = "ms"
            ret *= 1e3
        else:
            unit = "s"
        print("Total run time is %.1f %s\n" % (ret, unit))
    return fun()

def load_cancer():
    f = open("boston/breast_cancer.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split(',')
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

def load_house_data():
    f = open("boston/housing.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split(',')
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
    if random_state is not None:
        seed(random_state)
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    for i in range(len(X)):
        if random() < prob:
            X_train.append(X[i])
            y_train.append(y[i])
        else:
            X_test.append(X[i])
            y_test.append(y[i])
    seed()
    return X_train, X_test, y_train, y_test

# 准确率
def get_acc(y, y_hat):
    return sum(yi == yi_hat for yi, yi_hat in zip(y, y_hat)) / len(y)

# 查准率
def get_precision(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    predicted_positive = sum(y_hat)
    return true_postive / predicted_positive

# 查全率
def get_recall(y, y_hat):
    true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_postive / actual_positive

# 计算真正率
def get_tpr(y, y_hat):
    true_positive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
    actual_positive = sum(y)
    return true_positive / actual_positive

# 计算真负率
def get_tnr(y, y_hat):
    true_negative = sum(1 - (yi or yi_hat) for yi, yi_hat in zip(y, y_hat))
    actual_negative = len(y) - sum(y)
    return true_negative / actual_negative

# 画ROC曲线
def get_roc(y, y_hat_prob):
    thresholds = sorted(set(y_hat_prob), reverse=True)
    ret = [[0, 0]]
    for threshold in thresholds:
        y_hat = [int(yi_hat_prob >= threshold) for yi_hat_prob in y_hat_prob]
        ret.append([get_tpr(y, y_hat), 1 - get_tnr(y, y_hat)])
    return ret
# 计算AUC(ROC曲线下方的面积)
def get_auc(y, y_hat_prob):
    roc = iter(get_roc(y, y_hat_prob))
    tpr_pre, fpr_pre = next(roc)
    auc = 0
    for tpr, fpr in roc:
        auc += (tpr + tpr_pre) * (fpr - fpr_pre) / 2
        tpr_pre = tpr
        fpr_pre = fpr
    return auc

def model_evaluation(clf, X, y):
    y_hat = clf.predict(X)
    y_hat_prob = [clf._predict(Xi) for Xi in X]
    ret = dict()
    ret["Accuracy"] = get_acc(y, y_hat)
    ret["Recall"] = get_recall(y, y_hat)
    ret['Precision'] = get_precision(y, y_hat)
    ret['AUC'] = get_auc(y, y_hat_prob)
    for k, v in ret.items():
        print("%s: %.3f" % (k, v))
    print()
    return ret

# 计算回归模型的拟合优度
def get_r2(reg, X, y):
    y_hat = reg.predict(X)
    m = len(y)
    n = len(y_hat)
    sse = sum((yi - yi_hat) ** 2 for yi, yi_hat in zip(y, y_hat))
    y_avg = sum(y) / len(y)
    sst = sum((yi - y_avg) ** 2 for yi in y)
    r2 = 1 - sse / sst
    print("Test r2 is %.3f!" % r2)
    return r2

# 将数据归一化到[0, 1]范围
def min_max_scale(X):
    m = len(X[0])
    x_max = [-float('inf') for _ in range(m)]
    x_min = [float('inf') for _ in range(m)]
    for row in X:
        x_max = [max(a, b) for a, b in zip(x_max, row)]
        x_min = [min(a, b) for a, b in zip(x_min, row)]

    ret = []
    for row in X:
        tmp = [(x - b) / (a - b) for a, b, x in zip(x_max, x_min, row)]
        ret.append(tmp)
    return ret

class KNeighborsBase(object):

    def __init__(self):
        self.k_neighbors = None
        self.tree = None

    def fit(self, X, y, k_neighbors=3):
        self.k_neighbors = k_neighbors
        self.tree = KDTree()
        self.tree.build_tree(X, y)

    # 1.获取kd_Tree
    # 2.建立大顶堆
    # 3.建立队列
    # 4.外层循环更新大顶堆
    # 5.内层循环遍历kd_Tree
    # 6.满足堆顶是第k近邻时退出循环

    def knn_search(self, Xi):
        tree = self.tree
        heap = MaxHeap(self.k_neighbors, lambda x: x.dist)
        # 搜索Xi时,从根节点到叶节点的路径
        nd = tree.search(Xi, tree.root)
        # 初始化队列
        que = [(tree.root, nd)]
        while que:
            # 计算Xi和根节点的距离
            nd_root, nd_cur = que.pop(0)
            nd_root.dist = tree.get_eu_dist(Xi, nd_root)
            heap.add(nd_root)
            while nd_cur is not nd_root:
                # 计算Xi和当前节点的距离
                nd_cur.dist = tree.get_eu_dist(Xi, nd_cur)
                # 更新最好的节点和距离
                heap.add(nd_cur)
                if nd_cur.brother and (not heap or heap.items[0].dist > tree.get_hyper_plane_dist(Xi, nd_cur.father)):
                    _nd = tree.search(Xi, nd_cur.brother)
                    que.append((nd_cur.brother, _nd))
                nd_cur = nd_cur.father

        return heap

    def _predict(self, Xi):
        return NotImplemented

    def predict(self, X):
        return [self._predict(Xi) for Xi in X]


class KNeighborsClassifier(KNeighborsBase):
    def __init__(self):
        KNeighborsBase.__init__(self)

    def _predict(self, Xi):
        heap = self.knn_search(Xi)
        n_pos = sum(nd.split[1] for nd in heap._items)
        return int(n_pos * 2 > self.k_neighbors)

class KNeighborsRegressor(KNeighborsBase):
    def __init__(self):
        KNeighborsBase.__init__(self)

    def _predict(self, Xi):
        heap = self.knn_search(Xi)
        return sum(nd.split[1] for nd in heap._items) / self.k_neighbors


@run_time
def main1():
    print("Tesing the performance of KNN classifier...")
    X, y = load_cancer()
    X = min_max_scale(X)
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=20)
    clf = KNeighborsClassifier()
    clf.fit(X_train, y_train, k_neighbors=21)
    model_evaluation(clf, X_test, y_test)

@run_time
def main2():
    print("Tesing the performance of KNN regressor...")
    X, y = load_house_data()
    X = min_max_scale(X)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=10)
    reg = KNeighborsRegressor()
    reg.fit(X=X_train, y=y_train, k_neighbors=3)
    get_r2(reg, X_test, y_test)

结果

源码github地址

https://github.com/BBuf/machine-learning