算法原理

线性方程组

假设有两个方程,x+y=2和2x+y=3,要求3x+4y?我们直接求解二元一次方程即可得答案为7。

超定方程组

对于方程组Ra=y,R为n*m的矩阵,如果R列满秩且n>m,则方程没有精确解,这时称方程组为朝顶方程组。

线性回归问题

假设公司有n个同事(n = 10000),他们的年龄为A = [a1, a2…an],职级为B = [b1, b2…bn],工资为C = [c1, c2…cn],满足方程组Ax + By + z = C,我们想求出x, y 和z的值从而预测同事的工资,这样的问题就是典型的线性回归问题。我们有3个未知数x, y, z要求解,却有10000个方程,这显然是一个超定方程组。

最小二乘法

这里假设n个同事有m个特征,包括年龄,职称等等,收集这些特征组成m行n列的矩阵X,同事的工资为m行1列的矩阵Y,且满足m>n。我们要求解n个未知数W=[w1,w2,…,wn]和一个未知数b,满足方程组W * X + b = Y。令预测值为 <mover accent="true"> Y ^ </mover> \hat Y Y^,那么有
M S E = <mstyle mathsize="1&#46;2em"> 1 m <mstyle mathsize="1em"> 1 m ( Y i <mover accent="true"> Y ^ </mover> i ) 2 </mstyle> </mstyle> MSE = \large\frac{1}{m}\normalsize\sum_{1}^{m}(Y_{i} - \hat Y_{i})^2 MSE=m11m(YiY^i)2。当我们的预测值完全等于真实值的时候,MSE等于0。根据上面的讲解,显然我们不太可能找到满足方程的精确解W,也就不可能准确地预测出Y,所以MSE不可能为0。但是我们想办法找出方程的近似解让MSE最小,这就是最小二乘法。

求近似解

怎么求取让MSE为0的近似解呢?可以归纳为如下步骤:

  • 1.使用MSE作为损失函数
    L = <mstyle mathsize="1&#46;2em"> 1 m <mstyle mathsize="1em"> 1 m ( Y i <mover accent="true"> Y ^ </mover> i ) 2 </mstyle> </mstyle> L = \large\frac{1}{m}\normalsize\sum_{1}^{m}(Y_{i} - \hat Y_{i})^2 L=m11m(YiY^i)2
  • 2.已知
    <mover accent="true"> Y ^ </mover> = W X + b \hat Y=WX + b Y^=WX+b
  • 3.对w求偏导,得:
    <mstyle mathsize="1&#46;2em"> d L d W <mstyle mathsize="1em"> = <mstyle mathsize="1&#46;2em"> 2 m <mstyle mathsize="1em"> 1 m ( Y i W X i b ) X i </mstyle> </mstyle> </mstyle> </mstyle> \large\frac{\mathrm{d}L}{\mathrm{d}W}\normalsize= -\large\frac{2}{m}\normalsize\sum_{1}^{m}(Y_{i} - WX_{i} - b)X_{i} dWdL=m21m(YiWXib)Xi
  • 4.对b求导可得:
    <mstyle mathsize="1&#46;2em"> d L d b <mstyle mathsize="1em"> = <mstyle mathsize="1&#46;2em"> 2 m <mstyle mathsize="1em"> 1 m ( Y i W X i b ) </mstyle> </mstyle> </mstyle> </mstyle> \large\frac{\mathrm{d}L}{\mathrm{d}b}\normalsize= -\large\frac{2}{m}\normalsize\sum_{1}^{m}(Y_{i} - WX_{i} - b) dbdL=m21m(YiWXib)
    所以,参数W的梯度就是式3,参数b的梯度就是式4。

算法实现

本文使用随机梯度下降算法来优化MSE损失函数。

#coding=utf-8
import numpy as np
from numpy.random import choice, seed
from random import sample, normalvariate
from numpy import ndarray
from time import time
from random import randint, seed, random

# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
    def fun():
        start = time()
        fn()
        ret = time() - start
        if ret < 1e-6:
            unit = "ns"
            ret *= 1e9
        elif ret < 1e-3:
            unit = "us"
            ret *= 1e6
        elif ret < 1:
            unit = "ms"
            ret *= 1e3
        else:
            unit = "s"
        print("Total run time is %.1f %s\n" % (ret, unit))
    return fun()

def load_data():
    f = open("boston/housing.csv")
    X = []
    y = []
    for line in f:
        line = line[:-1].split(',')
        xi = [float(s) for s in line[:-1]]
        yi = line[-1]
        if '.' in yi:
            yi = float(yi)
        else:
            yi = int(yi)
        X.append(xi)
        y.append(yi)
    f.close()
    return X, y

# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
    if random_state is not None:
        seed(random_state)
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    for i in range(len(X)):
        if random() < prob:
            X_train.append(X[i])
            y_train.append(y[i])
        else:
            X_test.append(X[i])
            y_test.append(y[i])
    seed()
    return X_train, X_test, y_train, y_test

# 计算回归模型的拟合优度
def get_r2(reg, X, y):
    y_hat = reg.predict(X)
    m = len(y)
    n = len(y_hat)
    sse = sum((yi - yi_hat) ** 2 for yi, yi_hat in zip(y, y_hat))
    y_avg = sum(y) / len(y)
    sst = sum((yi - y_avg) ** 2 for yi in y)
    r2 = 1 - sse / sst
    print("Test r2 is %.3f!" % r2)
    return r2

# 将数据归一化到[0, 1]范围
def min_max_scale(X):
    m = len(X[0])
    x_max = [-float('inf') for _ in range(m)]
    x_min = [float('inf') for _ in range(m)]
    for row in X:
        x_max = [max(a, b) for a, b in zip(x_max, row)]
        x_min = [min(a, b) for a, b in zip(x_min, row)]

    ret = []
    for row in X:
        tmp = [(x - b) / (a - b) for a, b, x in zip(x_max, x_min, row)]
        ret.append(tmp)
    return ret

class RegressionBase(object):
    def __init__(self):
        self.bias = None
        self.weights = None

    def _predict(self, Xi):
        return NotImplemented

    def get_gradient_delta(self, Xi, yi):
        y_hat = self._predict(Xi)
        bias_grad_delta = yi - y_hat
        weights_grad_delta = [bias_grad_delta * Xij for Xij in Xi]
        return bias_grad_delta, weights_grad_delta

    # 全梯度下降
    def batch_gradient_descent(self, X, y, lr, epochs):
        #b = b - learning_rate * 1 / m * b_grad_i, b_grad_i < - grad
        #W = W - learning_rate * 1 / m * w_grad_i, w_grad_i < - grad
        m, n = len(X), len(X[0])
        self.bias = 0
        # 正太分布
        self.weights = [normalvariate(0, 0.01) for _ in range(n)]
        for _ in range(epochs):
            bias_grad = 0
            weights_grad = [0 for _ in range(n)]
            for i in range(m):
                bias_grad_delta, weights_grad_delta = self.get_gradient_delta(X[i], y[i])
                bias_grad += bias_grad_delta
                weights_grad = [w_grad + w_grad_d for w_grad, w_grad_d
                                in zip(weights_grad, weights_grad_delta)]
            self.bias = self.bias + lr * bias_grad * 2 / m
            self.weights = [w + lr * w_grad * 2 / m for w,
                                                        w_grad in zip(self.weights, weights_grad)]

    # 随机梯度下降
    def stochastic_gradient_descent(self, X, y, lr, epochs, sample_rate):
        m, n = len(X), len(X[0])
        k = int(m * sample_rate)
        self.bias = 0
        self.weights = [normalvariate(0, 0.01) for _ in range(n)]
        for _ in range(epochs):
            for i in sample(range(m), k):
                bias_grad, weights_grad = self.get_gradient_delta(X[i], y[i])
                self.bias += lr * bias_grad
                self.weights = [w + lr * w_grad for w,
                                                    w_grad in zip(self.weights, weights_grad)]

    def fit(self, X, y, lr, epochs, method="batch", sample_rate=1.0):
        assert method in ("batch", "stochastic")
        if method == "batch":
            self.batch_gradient_descent(X, y, lr, epochs)
        if method == "stochastic":
            self.stochastic_gradient_descent(X, y, lr, epochs, sample_rate)

    def predict(self, X):
        return NotImplemented


class LinearRegreession(RegressionBase):
    def __init__(self):
        RegressionBase.__init__(self)

    def _predict(self, Xi):
        return sum(wi * xij for wi, xij in zip(self.weights, Xi)) + self.bias

    def predict(self, X):
        return [self._predict(xi) for xi in X]



def main():
    X, y = load_data()
    X = min_max_scale(X)
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=10)
    @run_time
    def batch():
        print("Tesing the performance of LinearRegression(batch)...")
        reg = LinearRegreession()
        reg.fit(X=X_train, y=y_train, lr=0.02, epochs=5000)
        get_r2(reg, X_test, y_test)

    @run_time
    def stochastic():
        print("Tesing the performance of LinearRegression(stochastic)...")
        reg = LinearRegreession()
        reg.fit(X=X_train, y=y_train, lr=0.001, epochs=1000,
                method="stochastic", sample_rate=0.5)
        get_r2(reg, X_test, y_test)


main()

对boston房价数据集测试结果