原理
如果一个样本在特征空间中的k个最相似(即特征空间中最邻近)的样本中的大多数属于某一个类别,则该样本也属于这个类别。
KNN算法不仅可以用于分类,还可以用于回归。通过找出一个样本的k个最近邻居,将这些邻居的属性的平均值赋给该样本,就可以得到该样本的属性。
实现
如果按照原理来暴力实现的话,是很简单的。但暴力实现的复杂度是很高的,因为我们都要遍历样本的所有数据求离当前要查询的数据的距离。所以在上两篇文章中引入了最大堆和KD-Tree,这两种数据结构正式为KD-Tree的加速查找服务的。我们知道KNN算法是一个寻找Top K的问题,解决这类问题可以使用大顶堆,可以省去排序的过程。然后对于多维数组的查找的问题,KD-Tree也是解决这类问题的一个不错的方式。
具体来说,KD-Tree只能找到最近邻,而我们需要找到k近邻。所以当找到最近邻的时候,让算法不要退出循环,继续查找,直到我们的大顶堆中堆顶也比未被查找的邻居们都近时,再退出循环。那么保留在大顶堆里面的就是topk了。
代码
#coding=utf-8
from kdtree import KDTree
from max_heap import MaxHeap
from copy import copy
from random import randint, seed, random
from time import time
from random import choice
from math import exp, log
# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
def fun():
start = time()
fn()
ret = time() - start
if ret < 1e-6:
unit = "ns"
ret *= 1e9
elif ret < 1e-3:
unit = "us"
ret *= 1e6
elif ret < 1:
unit = "ms"
ret *= 1e3
else:
unit = "s"
print("Total run time is %.1f %s\n" % (ret, unit))
return fun()
def load_cancer():
f = open("boston/breast_cancer.csv")
X = []
y = []
for line in f:
line = line[:-1].split(',')
xi = [float(s) for s in line[:-1]]
yi = line[-1]
if '.' in yi:
yi = float(yi)
else:
yi = int(yi)
X.append(xi)
y.append(yi)
f.close()
return X, y
def load_house_data():
f = open("boston/housing.csv")
X = []
y = []
for line in f:
line = line[:-1].split(',')
xi = [float(s) for s in line[:-1]]
yi = line[-1]
if '.' in yi:
yi = float(yi)
else:
yi = int(yi)
X.append(xi)
y.append(yi)
f.close()
return X, y
# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
if random_state is not None:
seed(random_state)
X_train = []
X_test = []
y_train = []
y_test = []
for i in range(len(X)):
if random() < prob:
X_train.append(X[i])
y_train.append(y[i])
else:
X_test.append(X[i])
y_test.append(y[i])
seed()
return X_train, X_test, y_train, y_test
# 准确率
def get_acc(y, y_hat):
return sum(yi == yi_hat for yi, yi_hat in zip(y, y_hat)) / len(y)
# 查准率
def get_precision(y, y_hat):
true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
predicted_positive = sum(y_hat)
return true_postive / predicted_positive
# 查全率
def get_recall(y, y_hat):
true_postive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
actual_positive = sum(y)
return true_postive / actual_positive
# 计算真正率
def get_tpr(y, y_hat):
true_positive = sum(yi and yi_hat for yi, yi_hat in zip(y, y_hat))
actual_positive = sum(y)
return true_positive / actual_positive
# 计算真负率
def get_tnr(y, y_hat):
true_negative = sum(1 - (yi or yi_hat) for yi, yi_hat in zip(y, y_hat))
actual_negative = len(y) - sum(y)
return true_negative / actual_negative
# 画ROC曲线
def get_roc(y, y_hat_prob):
thresholds = sorted(set(y_hat_prob), reverse=True)
ret = [[0, 0]]
for threshold in thresholds:
y_hat = [int(yi_hat_prob >= threshold) for yi_hat_prob in y_hat_prob]
ret.append([get_tpr(y, y_hat), 1 - get_tnr(y, y_hat)])
return ret
# 计算AUC(ROC曲线下方的面积)
def get_auc(y, y_hat_prob):
roc = iter(get_roc(y, y_hat_prob))
tpr_pre, fpr_pre = next(roc)
auc = 0
for tpr, fpr in roc:
auc += (tpr + tpr_pre) * (fpr - fpr_pre) / 2
tpr_pre = tpr
fpr_pre = fpr
return auc
def model_evaluation(clf, X, y):
y_hat = clf.predict(X)
y_hat_prob = [clf._predict(Xi) for Xi in X]
ret = dict()
ret["Accuracy"] = get_acc(y, y_hat)
ret["Recall"] = get_recall(y, y_hat)
ret['Precision'] = get_precision(y, y_hat)
ret['AUC'] = get_auc(y, y_hat_prob)
for k, v in ret.items():
print("%s: %.3f" % (k, v))
print()
return ret
# 计算回归模型的拟合优度
def get_r2(reg, X, y):
y_hat = reg.predict(X)
m = len(y)
n = len(y_hat)
sse = sum((yi - yi_hat) ** 2 for yi, yi_hat in zip(y, y_hat))
y_avg = sum(y) / len(y)
sst = sum((yi - y_avg) ** 2 for yi in y)
r2 = 1 - sse / sst
print("Test r2 is %.3f!" % r2)
return r2
# 将数据归一化到[0, 1]范围
def min_max_scale(X):
m = len(X[0])
x_max = [-float('inf') for _ in range(m)]
x_min = [float('inf') for _ in range(m)]
for row in X:
x_max = [max(a, b) for a, b in zip(x_max, row)]
x_min = [min(a, b) for a, b in zip(x_min, row)]
ret = []
for row in X:
tmp = [(x - b) / (a - b) for a, b, x in zip(x_max, x_min, row)]
ret.append(tmp)
return ret
class KNeighborsBase(object):
def __init__(self):
self.k_neighbors = None
self.tree = None
def fit(self, X, y, k_neighbors=3):
self.k_neighbors = k_neighbors
self.tree = KDTree()
self.tree.build_tree(X, y)
# 1.获取kd_Tree
# 2.建立大顶堆
# 3.建立队列
# 4.外层循环更新大顶堆
# 5.内层循环遍历kd_Tree
# 6.满足堆顶是第k近邻时退出循环
def knn_search(self, Xi):
tree = self.tree
heap = MaxHeap(self.k_neighbors, lambda x: x.dist)
# 搜索Xi时,从根节点到叶节点的路径
nd = tree.search(Xi, tree.root)
# 初始化队列
que = [(tree.root, nd)]
while que:
# 计算Xi和根节点的距离
nd_root, nd_cur = que.pop(0)
nd_root.dist = tree.get_eu_dist(Xi, nd_root)
heap.add(nd_root)
while nd_cur is not nd_root:
# 计算Xi和当前节点的距离
nd_cur.dist = tree.get_eu_dist(Xi, nd_cur)
# 更新最好的节点和距离
heap.add(nd_cur)
if nd_cur.brother and (not heap or heap.items[0].dist > tree.get_hyper_plane_dist(Xi, nd_cur.father)):
_nd = tree.search(Xi, nd_cur.brother)
que.append((nd_cur.brother, _nd))
nd_cur = nd_cur.father
return heap
def _predict(self, Xi):
return NotImplemented
def predict(self, X):
return [self._predict(Xi) for Xi in X]
class KNeighborsClassifier(KNeighborsBase):
def __init__(self):
KNeighborsBase.__init__(self)
def _predict(self, Xi):
heap = self.knn_search(Xi)
n_pos = sum(nd.split[1] for nd in heap._items)
return int(n_pos * 2 > self.k_neighbors)
class KNeighborsRegressor(KNeighborsBase):
def __init__(self):
KNeighborsBase.__init__(self)
def _predict(self, Xi):
heap = self.knn_search(Xi)
return sum(nd.split[1] for nd in heap._items) / self.k_neighbors
@run_time
def main1():
print("Tesing the performance of KNN classifier...")
X, y = load_cancer()
X = min_max_scale(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=20)
clf = KNeighborsClassifier()
clf.fit(X_train, y_train, k_neighbors=21)
model_evaluation(clf, X_test, y_test)
@run_time
def main2():
print("Tesing the performance of KNN regressor...")
X, y = load_house_data()
X = min_max_scale(X)
X_train, X_test, y_train, y_test = train_test_split(
X, y, random_state=10)
reg = KNeighborsRegressor()
reg.fit(X=X_train, y=y_train, k_neighbors=3)
get_r2(reg, X_test, y_test)
结果
源码github地址
https://github.com/BBuf/machine-learning