水塘抽样(Reservoir sampling)
题目:给出一个数据流,这个数据流的长度很大或者未知。并且对该数据流中的数据只能访问一次。请写出一个随机选择算法,使得数据流中所有数据被选中的概率相等。
这个问题的扩展就是:如何从未知或者很大样本空间随机的取k个数?或者说,数据流长度为N行,要随机抽取k行,则每一行被抽取的概率为k/N。
这个问题也就是在大数据流中的随机抽样问题:当内存无法加载全部数据时,如何从包含未知大小的数据流中随机选取k个数据,并且要保证每个数据被抽取到的概率相等。
当 k=1时
首先考虑最简单的情况,当 k=1时,如何选取:
- 假设数据流含有
N
个数据,要保证每条数据被抽取到的概率相等,那么每个数被抽取的概率应该为 N1- 遇到第
1
个数 n1的时候,保留它, p(n1)=1 - 遇到第
2
个数 n2的时候,以 21的概率保留它,那么 p(n1)=1×21=21, p(n2)=21 - 遇到第
3
个数 n3的时候,以 31的概率保留它,那么 p(n1)=p(n2)=21×(1−31)=31,p(n3)=31 - …
- 遇到第
i
个数 ni的时候,以 i1的概率保留它,那么 p(n1)=p(n2)=p(n3)=...=p(ni−1)=i−11×(1−i1)=i1,p(ni)=i1
- 遇到第
通过以上规律可以看出,对于 k=1的情况,数据流中第i
个数被保留的概率为 i1。只要采取这种策略,只需要遍历一遍数据流就可以得到采样值,并且保证所有数据被选中的概率均为 N1。
当 k>1时
对于 k>1的情况,我们可以采取类似的策略:
- 假设数据流中含有
N
个数据,要保证每条数据被抽取到的概率相等,那么每个数被抽取的概率必然是 Nk- 对于前
k
个数 n1,n2,...,nk,我们保留下来,则 p(n1)=p(n2)=...=p(nk)=1(下面连等采用 p(n1−k)的形式 - 对于第
k+1
个数 nk+1,以 k+1k的概率保留它(这里只是指本次保留下来),那么前k
个数中的 nr(r∈1−k)被保留的概率可以这样表示: p(nr被保留)=p(上一轮nr被保留)×(p(nk+1被丢弃)+p(nk+1被保留)×p(nr未被替换)),即 p1−k=k+11+k+1k×kk−1=k+1k - 对于第
k+2
个数 nk+2,以 k+2k的概率保留它(这里只是指本次保留下来),那么前k+1
个被保留下来的数中的 nr(r∈1−k+1)被保留的概率为: p1−k=k+1k×k+22+k+1k×k+2k−1 - …
- 对于第
i(i>k)
个数 ni,以 ik的概率保留它,前i-1
个数中的 nr(r∈1−i−1)被保留的概率为: p1−k=i−1k×ii−k+i−1k×ik−1=ik
- 对于前
对于前k
个数,全部保留,对于第i(i>k)
个数,以 ik的概率保留第i
个数,并以 k1的概率与前面已选择的k
个数中的任意一个替换。
总结
也就是说,在取第i
个数据的时候,生成一个0
到1
的随机数p
,如果 p<ik,替换池中任意一个为第i
个数;当 p>ik,继续保留前面的数。直到数据流结束,返回此k
个数。但是为了保证计算准确性,一般是生成一个0
到i
的随机数,跟k
相比。
Scala代码实现
import scala.util.Random
/**
* @author xiaoer
* @date 2020/1/11 23:53
*/
object ReservoirSampling {
def main(args: Array[String]): Unit = {
val s: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)
val k: Array[Int] = new Array[Int](3)
val result: Array[Int] = reservoirSampling(k, s)
println(result.toBuffer)
}
/**
* 水塘抽样算法
*
* @param k 抽样结果
* @param s 样本总数
* @return k 样本结果
*/
def reservoirSampling(k: Array[Int], s: Array[Int]): Array[Int] = {
// 将前 k 个数据全部抽取
for (i <- k.indices) {
k(i) = s(i)
}
// k+1 往后的数据
for (i <- k.length until s.length) {
val seed: Int = Random.nextInt(i)
if (seed < k.length) {
k(seed) = s(i)
}
}
// 返回值
k
}
}