""" 参考:https://github.com/gwgundersen/bocd/blob/master/bocd.py """
"""============================================================================ Python implementation of Bayesian online changepoint detection for a normal model with unknown mean parameter. For details, see Adams & MacKay 2007: "Bayesian Online Changepoint Detection" https://arxiv.org/abs/0710.3742 This code implements the figure in the following blog post: http://gregorygundersen.com/blog/2019/08/13/bocd/ Author: Gregory Gundersen ============================================================================"""
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import numpy as np
from scipy.stats import norm
def bocd(data, model, hazard):
"""Return run length posterior using Algorithm 1 in Adams & MacKay 2007. """
R = np.zeros((T + 1, T + 1))
R[0, 0] = 1
message = np.array([1])
for t in range(1, T + 1):
x = data[t - 1]
pis = model.pred_prob(t, x)
growth_probs = pis * message * (1 - hazard)
cp_prob = sum(pis * message * hazard)
new_joint = np.append(cp_prob, growth_probs)
R[t, :t + 1] = new_joint
evidence = sum(new_joint)
R[t, :] /= evidence
model.update_statistics(t, x)
message = new_joint
return R
class NormalKnownPrecision:
def __init__(self, mean0, prec0):
"""Initialize model parameters. """
self.mean0 = mean0
self.prec0 = prec0
self.mean_params = np.array([mean0])
self.prec_params = np.array([prec0])
def pred_prob(self, t, x):
"""Compute predictive probabilities. """
d = lambda x, mu, tau: norm.pdf(x, mu, 1 / tau + 1)
return np.array([d(x, self.mean_params[i], self.prec_params[i])
for i in range(t)])
def update_statistics(self, t, x):
"""Update sufficient statistics. """
offsets = np.arange(1, t + 1)
new_mean_params = (self.mean_params * offsets + x) / (offsets + 1)
new_prec_params = self.prec_params + 1
self.mean_params = np.append([self.mean0], new_mean_params)
self.prec_params = np.append([self.prec0], new_prec_params)
def generate_data(mean0, prec0, T, cp_prob):
"""Generate partitioned data of T observations according to constant changepoint probability `cp_prob` with hyperpriors `mean0` and `prec0`. """
means = [0]
data = []
cpts = []
for t in range(0, T):
if np.random.random() < cp_prob:
mean = np.random.normal(mean0, 1 / prec0)
means.append(mean)
cpts.append(t)
data.append(np.random.normal(means[-1], 1))
return data, cpts
def plot_posterior(T, data, R, cpts):
"""Plot data, run length posterior, and groundtruth changepoints. """
fig, axes = plt.subplots(2, 1, figsize=(20, 10))
ax1, ax2 = axes
ax1.scatter(range(0, T), data)
ax1.plot(range(0, T), data)
ax1.set_xlim([0, T])
ax1.margins(0)
norm = LogNorm(vmin=0.0001, vmax=1)
ax2.imshow(np.rot90(R), aspect='auto', cmap='gray_r', norm=norm)
ax2.set_xlim([0, T])
ticks = list(range(0, T+1, 50))
ax2.set_yticks(ticks)
ax2.set_yticklabels(ticks[::-1])
ax2.margins(0)
for cpt in cpts:
ax1.axvline(cpt, c='r', ls='dotted')
ax2.axvline(cpt, c='r', ls='dotted')
plt.tight_layout()
plt.show()
if __name__ == '__main__':
T = 300
cp_prob = 1/50
mean0 = 0
prec0 = 0.2
data, cpts = generate_data(mean0, prec0, T, cp_prob)
model = NormalKnownPrecision(mean0, prec0)
R = bocd(data=data, model=model, hazard=1/50)
for row in R:
assert np.isclose(np.sum(row), 1)
plot_posterior(T, data, R, cpts)
参考:
- Bayesian Online Changepoint Detection
- Github_1 (BOCD)