并行计算
几种运算模式:
以下以 python 为例,仅针对基本的较大数据搜索实现。
-
串行
最简单的串行,比如一个 for 循环,不断迭代计算,一般只会占用单核;
- 而问题是当存储的临时变量太多,RAM内存的东西越来越多,直到存不下(可能内存泄漏,参考 https://www.cnpython.com/qa/516575 )不仅仅 for 循环会越来越慢,甚至可能造成程序终止。
仅对 python 语言加速并不能从根本上解决问题,如使用 numba 装饰器等。
串行的例子:
def f(x): print('sss') res = x for j in range(99999999): res += j if j == 55555555: print(j) return if __name__ == '__main__': tb = time.time() for k in range(6): f(k) print('\ntask_1', time.time() - tb) tb = time.time() # task_1 33.59829568862915
-
多线程
可以使用 threading 的 Thread 方法将添加多个线程,分别处理多分独立问题
- 但 python 存在 GIL 全局锁,GIL 的存在让 Python 的多线程应用只能实现并发,而不能实现并行。如果想实现并行,只能通过多进程。 具体的参考 Xiaoliang`blog。也即,设想调用一个函数两次,串行时间为 100s,预计双线程操作下计算时间减半为 50s,但实际上为 70-80s。多线程的第二次调用仅仅提交了任务,仅当第一次调用结束才会开始执行。
多线程例子:
def f(x): print('sss') res = x for j in range(99999999): res += j if j == 55555555: print(j) return if __name__ == '__main__': to = time.time() for i in range(6): P = Thread(target=f, args=(i, )) P.start() P.join() print('\ntask_2', time.time() - to) # task_2 26.460124969482422
-
并行
可以使用
from multiprocessing import Process
使程序并行,占用多核,同时处理。并行例子:
def f(x): print('sss') res = x for j in range(99999999): res += j if j == 55555555: print(j) return if __name__ == '__main__': ts = time.time() for i in range(6): p = Process(target=f, args=(i, )) p.start() print('\ntask_3', time.time() - ts) # task_3 0.05161023139953613
需要注意,task_3 所消耗的时间并不是准确的 0.05161023139953613 ,在其 print 时间之后,程序仍在运行,但可以明显感受到时间缩短,也可以通过查看同时运行的任务数量来判断是否同时进行计算。
问题:
如何在并行计算时,拿到某一个进程算的函数的返回值?并且根据这个返回值判断是否终止当前的所有进程?
例如:
def f(x):
print('sss')
res = x
for j in range(99999999):
res += j
if x == 3:
print(j)
return True
if __name__ == '__main__':
ts = time.time()
for i in range(6):
p = Process(target=f, args=(i, ))
# 我希望拿到 f 的返回值,并通过其判断是否需要终止当前所有进程。
# 可能我用到的时候需要在当前循环外再套一个循环,并且终止了当前所有进程后,循环继续向前推进。
p.start()
# p.join()
print('\ntask_3', time.time() - ts)
- 使用 进程池 加上实现判断的 回调函数 可以实现以上需求,解决问题:
需要同步进行的任务:
def f(x):
print(x)
res = x
for j in range(99999999):
res += j
if x == 5:
print('system down!')
return True
return False
-
创建一个进程池
pool = multiprocessing.Pool(6)
参数是希望同时进行的进程数量。
-
异步提交任务(异步较同步更灵活,而且可以抽取每个任务来处理,参考https://blog.csdn.net/m0_60237095/article/details/121286176 )
# callback 函数 def call_back(result): # 需要调用的函数 f 返回值是 True or False if result: # 判断进程的返回值 pool.terminate() # 终止当前所有进程 for i in range(16): pool.apply_async(f, args=(i, ), callback=call_back)
-
pool.close() # 关闭进程池,不再接受新的任务 pool.join() # 等待所有进程完成
通过 callback 函数判断程序是否持续进行,异步使得每次执行完成都可以检测到其返回值,从而加以判断和施加操作。因为异步不断加入新任务,所以会有一个顺序,但这个时间先后太小可以忽略。
异步含判定条件例子:
def f(x):
print(x)
res = x
for j in range(99999999):
res += j
if x == 5:
print('system down!')
return True
return False
def callback(result):
if result: # 判断进程的返回值
pool.terminate() # 终止当前所有进程
if __name__ == '__main__':
ts = time.time()
pool = multiprocessing.Pool(6) # 创建进程池
for i in range(16):
pool.apply_async(f, args=(i, ), callback=callback) # 异步提交任务给进程池,并设置回调函数
pool.close() # 关闭进程池,不再接受新的任务
pool.join() # 等待所有进程完成
print('\ntask_3', time.time() - ts)
'''
0
1
2
3
4
5
system down!
6
task_3 0.43288302421569824
'''
需要注意的是,并行时无法对竞态对象操作(如set查找增加,set.discard可以无差别删除而不需判断是否存在),看下面代码:
def pallmain(self):
pool = multiprocessing.Pool(8)
for c in self.C:
pool.apply_async(self.parallel, args=(c, ))
pool.close()
pool.join()
def parallel(self, cc):
temp = []
for initp in self.Init:
for i, init in enumerate(initp):
temp[2 * i] = 2 * init + 1
for j, conj in enumerate(cc):
temp[2 * j + 1] = 2 * conj
if self.judge(temp):
# result 存在竞态关系,无法并行操作(逻辑会失效)
self.result.add(tuple(temp))