并行计算

几种运算模式:

以下以 python 为例,仅针对基本的较大数据搜索实现。

  1. 串行

    最简单的串行,比如一个 for 循环,不断迭代计算,一般只会占用单核;

    • 而问题是当存储的临时变量太多,RAM内存的东西越来越多,直到存不下(可能内存泄漏,参考 https://www.cnpython.com/qa/516575 )不仅仅 for 循环会越来越慢,甚至可能造成程序终止。

    仅对 python 语言加速并不能从根本上解决问题,如使用 numba 装饰器等。

    串行的例子:

    def f(x):
        print('sss')
        res = x
        for j in range(99999999):
            res += j
            if j == 55555555:
                print(j)
                return
    if __name__ == '__main__':
    
        tb = time.time()
        for k in range(6):
            f(k)
        print('\ntask_1', time.time() - tb)
    
        tb = time.time()
        
    # task_1 33.59829568862915
    
  2. 多线程

    可以使用 threading 的 Thread 方法将添加多个线程,分别处理多分独立问题

    • 但 python 存在 GIL 全局锁,GIL 的存在让 Python 的多线程应用只能实现并发,而不能实现并行。如果想实现并行,只能通过多进程。 具体的参考 Xiaoliang`blog。也即,设想调用一个函数两次,串行时间为 100s,预计双线程操作下计算时间减半为 50s,但实际上为 70-80s。多线程的第二次调用仅仅提交了任务,仅当第一次调用结束才会开始执行。

    多线程例子:

    def f(x):
        print('sss')
        res = x
        for j in range(99999999):
            res += j
            if j == 55555555:
                print(j)
                return
     
     if __name__ == '__main__':
        to = time.time()
        for i in range(6):
            P = Thread(target=f, args=(i, ))
            P.start()
            P.join()
        print('\ntask_2', time.time() - to)
        
    # task_2 26.460124969482422 
    
  3. 并行

    可以使用 from multiprocessing import Process 使程序并行,占用多核,同时处理。

    并行例子:

    def f(x):
        print('sss')
        res = x
        for j in range(99999999):
            res += j
            if j == 55555555:
                print(j)
                return
                
    if __name__ == '__main__':
        ts = time.time()
        for i in range(6):
            p = Process(target=f, args=(i, ))
            p.start()
        print('\ntask_3', time.time() - ts)
    
    # task_3 0.05161023139953613
    

    需要注意,task_3 所消耗的时间并不是准确的 0.05161023139953613 ,在其 print 时间之后,程序仍在运行,但可以明显感受到时间缩短,也可以通过查看同时运行的任务数量来判断是否同时进行计算。

问题:

如何在并行计算时,拿到某一个进程算的函数的返回值?并且根据这个返回值判断是否终止当前的所有进程?

例如:

def f(x):
    print('sss')
    res = x
    for j in range(99999999):
        res += j
        if x == 3:
            print(j)
            return True


if __name__ == '__main__':
    ts = time.time()
    for i in range(6):
        p = Process(target=f, args=(i, ))
        # 我希望拿到 f 的返回值,并通过其判断是否需要终止当前所有进程。
        # 可能我用到的时候需要在当前循环外再套一个循环,并且终止了当前所有进程后,循环继续向前推进。
        p.start()
        # p.join()
    print('\ntask_3', time.time() - ts)
  • 使用 进程池 加上实现判断的 回调函数 可以实现以上需求,解决问题:

需要同步进行的任务:

def f(x):
    print(x)
    res = x
    for j in range(99999999):
        res += j
        if x == 5:
            print('system down!')
            return True
    return False
  1. 创建一个进程池

    pool = multiprocessing.Pool(6)
    

    参数是希望同时进行的进程数量。

  2. 异步提交任务(异步较同步更灵活,而且可以抽取每个任务来处理,参考https://blog.csdn.net/m0_60237095/article/details/121286176

    # callback 函数
    def call_back(result):
        # 需要调用的函数 f 返回值是 True or False
        if result:  # 判断进程的返回值
            pool.terminate()  # 终止当前所有进程
    
    for i in range(16):
           pool.apply_async(f, args=(i, ), callback=call_back)
    
  3.     pool.close()  # 关闭进程池,不再接受新的任务
        pool.join()  # 等待所有进程完成
    

通过 callback 函数判断程序是否持续进行,异步使得每次执行完成都可以检测到其返回值,从而加以判断和施加操作。因为异步不断加入新任务,所以会有一个顺序,但这个时间先后太小可以忽略。

异步含判定条件例子:

def f(x):
    print(x)
    res = x
    for j in range(99999999):
        res += j
        if x == 5:
            print('system down!')
            return True
    return False


def callback(result):
    if result:  # 判断进程的返回值
        pool.terminate()  # 终止当前所有进程


if __name__ == '__main__':
    ts = time.time()
    pool = multiprocessing.Pool(6)  # 创建进程池
    for i in range(16):
        pool.apply_async(f, args=(i, ), callback=callback)  # 异步提交任务给进程池,并设置回调函数

    pool.close()  # 关闭进程池,不再接受新的任务
    pool.join()  # 等待所有进程完成

    print('\ntask_3', time.time() - ts)

'''
0
1
2
3
4
5
system down!
6

task_3 0.43288302421569824
'''    

需要注意的是,并行时无法对竞态对象操作(如set查找增加,set.discard可以无差别删除而不需判断是否存在),看下面代码:

    def pallmain(self):

        pool = multiprocessing.Pool(8)
        for c in self.C:
            pool.apply_async(self.parallel, args=(c, ))
        pool.close()
        pool.join()
   def parallel(self, cc):
        temp = []
        for initp in self.Init:
          for i, init in enumerate(initp):
            temp[2 * i] = 2 * init + 1
            for j, conj in enumerate(cc):
              temp[2 * j + 1] = 2 * conj
              if self.judge(temp):
                # result 存在竞态关系,无法并行操作(逻辑会失效)
                self.result.add(tuple(temp))