10 亿个数中找出最大的 10000 个数(topK 问题)

针对 topK 类问题,通常比较好的方案是 分治 + Trie 树/hash + 小顶堆(最小堆),即先将数据集按照 Hash 方法分解成多个小数据集,然后使用 Trie 树或者 Hash 统计每个小数据集中的 query 词频,之后用小顶堆求出每个数据集中出现频率最高的前 K 个数,最后在所有 topK 中求出最终的 topK。

有 1 亿个浮点数,如何找出其中最大的 10000 个?

  • 最容易想到的方法是将数据全部排序
    然后在排序后的集合中进行查找,最快的排序算法的时间复杂度一般为 O(nlogn),如快排。但是在32位的机器上,每个 float 类型占 4 个字节,1 亿个浮点数就要占用 400MB 的存储空间,对于一些可用内存小于 400M 的计算机而言,很显然是不能一次将全部数据读入内存进行排序的。其实即时内存能够满足要求(如 8GB 内存),该方法也并不高效,因为题目的目的是找出最大的10000 个数即可,而排序却是将所有的元素都排序了,做了很多的无用功。

  • 局部淘汰法
    该方法与排序方法类似,用一个容器保存前 10000 个数,然后将剩余的所有数字,与容器内的最小数字相比,如果所有后续的元素都比容器内的 10000 个数还小,那么容器内这个 10000 个数就是最大 10000 个数。如果某一后续元素比容器内最小数字还大,则删除容器内最小元素,并将该元素插入到容器内,最后遍历完这 1 亿个数,得到的结果容器中保存的数即为最终结果了。此时的时间复杂度为 O(n+m^2),其中 m 为容器的大小,即 10000。

  • 分治法
    将 1 亿个数据分成 100 份,每份 100 万个数据,找到每份数据中最大的 10000 个,最后在剩下的 10010000 个数据里面找出最大的 10000 个。如果 100 万数据选择足够理想,那么可以过滤掉 1 亿数据里面 99% 的数据。100 万个数据里面查找最大的 10000 个数据的方法如下:
    用快排的方法,将数据分为 2 堆,
    如果大的那堆个数 N 大于 10000 个,继续对大堆快排一次分成 2 堆;
    如果大的那堆个数 N 小于 10000 个,就在小的那堆里面快排,找出第 10000-n 大的数字;
    递归以上过程,就可以找出第 10000 大的数。由于快排的特点,在第 10000 大的数字的左边,均是前 10000 大。
    这种方法需要每次的内存空间为 10^6
    4=4MB,一共需要101次这样的比较。

  • Hash 法
    如果这 1 亿个数里面有很多重复的数,先通过 Hash 法,把这 1 亿个数字去重复,这样如果重复率很高的话,会减少很大的内存用量,从而缩小运算空间,然后通过分治法或最小堆法查找最大的 10000 个数。

  • 最小堆
    首先读入前 10000 个数来创建大小为 10000 的最小堆,建堆的时间复杂度为 O(mlogm)(m为数组的大小即为 10000),
    然后遍历后续的数字,并于堆顶(最小)数字进行比较。
    如果比最小的数小,则继续读取后续数字;
    如果比堆顶数字大,则替换堆顶元素并重新调整堆为最小堆。
    整个过程直至 1 亿个数全部遍历完为止。然后按照中序遍历的方式输出当前堆中的所有 10000 个数字。该算法的时间复杂度为 O(nmlogm),空间复杂度是 10000(常数)。

实际运行:
实际上,最优的解决方案应该是最符合实际设计需求的方案,在时间应用中,可能有足够大的内存,那么直接将数据扔到内存中一次性处理即可,也可能机器有多个核,这样可以采用多线程处理整个数据集。

下面针对不同的应用场景,分析了适合相应应用场景的解决方案。

  • (1)单机 + 单核 + 足够大内存
    如果需要查找 10 亿个查询次(每个占 8B)中出现频率最高的 10 个,考虑到每个查询次占 8B,则 10 亿个查询次所需的内存大约是 10^9*8B=8GB 内存。如果有这么大内存,直接在内存中对查询次进行排序,顺序遍历找出 10 个出现频率最大的即可。这种方法简单快速。也可以先用 HashMap 求出每个词出现的频率,然后求出频率最大的 10 个词。

  • (2)单机 + 多核 + 足够大内存
    这时可以直接在内存中使用 Hash 方法将数据划分成 n 个 partition,每个 partition 交给一个线程处理,线程的处理逻辑同(1)类似,最后一个线程将结果合并。
    该方法存在一个瓶颈会明显影响效率,即数据倾斜。每个线程的处理速度可能不同,快的线程需要等待慢的线程,最终的处理速度取决于慢的线程。而针对此问题,解决的方法是:将数据划分成 c × n 个 partition(c>1),每个线程处理完当前 partition 后主动取下一个 partition 继续处理,直到所有数据完毕,最后由一个线程进行归并。

  • (3)单机 + 单核 + 受限内存
    这种情况下,需要将原数据文件切割成一个一个小文件,如采用 hash(x)%M,将原文件中的数据切割成 M 小文件,如果小文件仍大于内存大小,继续采用 Hash 的方法对数据文件进行分割,直到每个小文件小于内存大小,这样每个文件可放到内存中处理。采用(1)的方法依次处理每个小文件。

  • (4)多机 + 受限内存
    这种情况,为了合理利用多台机器的资源,可将数据分发到多台机器上,每台机器采用(3)中的策略解决本地的数据。可采用 hash+socket 方法进行数据分发。

从实际应用的角度考虑,(1)(2)(3)(4)方案并不可行,因为在大规模数据处理环境下,作业效率并不是首要考虑的问题,因为在大规模数据处理环境下,作业效率并不是首要考虑的问题,算法的扩展性和容错性才是首要考虑的。算法应该具有良好的扩展性,以便数据量进一步加大(随着业务的发展,数据量加大是必然的)时,在不修改算法框架的前提下,可达到近似的线性比;算法应该具有容错性,即当前某个文件处理失败后,能自动将其交给另外一个线程继续处理,而不是从头开始处理。

topK 问题很适合采用 MapReduce 框架解决,用户只需要编写一个 Map 函数和两个 Reduce 函数,然后提交到 Hadoop(采用 Mapchain 和 Reducechain)上即可解决该问题。具体而言,就是首先根据数据值或者把数据 hash(MD5) 后的值按照范围划分到不同的机器上,最好可以让数据划分后一次读入内存,这样不同的机器负责不同的数值范围,实际上就是 Map。得到结果后,各个机器只需要拿出各自出现次数最多的前 N 个数据,然后汇总,选出所有的数据中出现次数最多的前 N 个数据,这实际上就是 Reduce 过程。对于 Map 函数,采用 Hash 算法,将 Hash 值相同的数据交给同一个 Reduce task:对于第一个 Reduce 函数,采用 HashMap 统计出每个词出现的频率,对于第二个 Reduce 函数,统计所有 Reduce task,输出数据中的 topK 即可。
直接将数据均分到不同的机器上进行处理是无法得到正确的结果的。因为一个数据可能被均分到不同的机器上进行处理是无法得到正确的结果的。因为一个数据可能被均分到不同的机器上,而另一个则完全聚集到一个机器上,同时还可能存在具有相同数目的数据。

重复问题

(1)有10000000个记录,这些查询串的重复度比较高,如果除去重复后,不超过3000000个。一个查询串的重复度越高,说明查询它的用户越多,也就是越热门。请统计最热门的10个查询串,要求使用的内存不能超过1GB。
(2)有10个文件,每个文件1GB,每个文件的每一行存放的都是用户的query,每个文件的query都可能重复。按照query的频度排序。
(3)有一个1GB大小的文件,里面的每一行是一个词,词的大小不超过16个字节,内存限制大小是1MB。返回频数最高的100个词。
(4)提取某日访问网站次数最多的那个IP。
(5)10亿个整数找出重复次数最多的100个整数。
(6)搜索的输入信息是一个字符串,统计300万条输入信息中最热门的前10条,每次输入的一个字符串为不超过255B,内存使用只有1GB。
(7)有1000万个身份证号以及他们对应的数据,身份证号可能重复,找出出现次数最多的身份证号。

在海量数据中查找出重复出现的元素或者去除重复出现的元素也是常考的问题。针对此类问题,一般可以通过位图法实现。例如,已知某个文件内包含一些电话号码,每个号码为 8 位数字,统计不同号码的个数。
本体最好的解决方法是通过使用位图法来实现。8 位整数可以表示的最大十进制数值为 99999999。如果每个数字对应于位图中一个 bit 位,那么存储 8 位整数大约需要 99MB。因为 1B=8bit,所以 99Mbit 折合成内存为 99/8=12.357MB 的内存,即可以只用 12.375MB 的内存表示所有的 8 位数电话号码的内容。

大数据面试之分治法解决

1、海量日志数据,提取出某日访问百度次数最多的那个IP。

分析:百度作为国内第一大搜索引擎,每天访问它的 IP 数量巨大,如果想一次性把所有 IP 数据装进内存处理,则内存容量明显不够,故针对数据太大,内存受限的情况,可以把大文件转换成(通过取模映射)小文件,从而大而化小,逐个处理。
换言之,先映射,而后统计,最后排序。
解法:具体分为以下 3 个步骤:

  • 分而治之/hash 映射
    首先把这一天访问百度日志的所有 IP 提取出来,然后逐个写入到一个大文件中,接着采用映射的方法,比如 %1000,把整个大文件映射为 1000 个小文件。
  • hash_map 统计
    当大文件转化成了小文件,那么我们便可以采用 hash_map(ip, value)来分别对 1000 个小文件中的 IP 进行频率统计,再找出每个小文件中出现频率最大的 IP。
  • 堆/快排
    统计出 1000 个频率最大的 IP 后,依据各自频率的大小进行排序(可采取堆排序),找出那个频率最大的 IP,即为所求。

PS:Hash 取模是一种等价映射,不会存在同一个元素分散到不同小文件中去的情况,即这里采取的是 %1000 算法,那么同一个 IP 在 hash后,只可能落在同一个文件中,不可能被分散的。

2、寻找热门查询,300 万个查询字符串中统计最热门的 10 个查询。

原题:搜索引擎会通过日志文件把用户每次检索使用的所有检索串都记录下来,每个查询串的长度为 1-255 字节。假设目前有一千万个记录,请你统计最热门的 10 个查询串,要求使用的内存不能超过 1G.
分析:这些查询串的重复度比较高,虽然总数是一千万,但是如果去重后,不超过三百万个。一个查询串的重复度越高,说明查询它的用户越多,也就是越热门。
由上面第 1 题,我们知道,数据大则划为小的,例如一亿个 IP 求 Top10,可先 %1000 将 IP 分到 1000 个小文件中去,并保证一种 ip 只出现在一个文件中,再对每个小文件中的 ip 进行 hash_map 统计并按数量排序,最后归并或者最小堆依次处理每个小文件的 top10 以得到最后的结果。
但对于本题,数据规模比较小,能一次性装入内存。因为根据题目描述,虽然有一千万个 Query,但是由于重复度比较高,故去重后,事实上只有 300 万的 Query,每个 Query 大小为 255Byte,因此我们可以考虑把他们都放进内存中去(300 万个字符串假设没有重复,都是最大长度,那么最多占用内存 3M1K/4=0.75G。所以可以将所有字符串都存放在内存中进行处理)。
所以我们放弃分而治之/hash 映射的步骤,直接上 hash_map 统计,然后排序。So,针对此类典型的 topK 问题,采取的对策往往是:
hash_map + 堆。
*
解法**:

  • hash_map 统计
    先对这批海量数据预处理,具体方法是:维护一个 Key 为 Query 字串,Value 为该 Query 出现次数的 hash_map,即 hash_map(Query, value),每次读取一个 Query,如果该字串不在 Table 中,那么加入该字串,并将 Value 值设为 1;如果该字串在 Table 中,那么将该字串的计数加 1 即可。最终我们在 O(N) 的时间复杂度内用 hash_map 完成了统计;
  • 堆排序
    借助堆这个数据结构,找出 topK,时间复杂度为 N'logK。即借助堆结构,我们可以在 log 量级的时间内查找和调整/移动。因此,维护一个 K(该题目中是 10)大小的小根堆,然后遍历300万的Query,分别和根元素进行比对。所以,我们最终的时间复杂度是:O(n) + N'*O(logK),其中,N为1000万,N'为300万。