概念:一个进程就是一个正在运行的程序,进程是操作系统资源分配的基本单位,每个进程都有自己独立的内存空间。同一时刻只能执行一个进程,所谓的多进程是指CPU通过在进程间来回切换达到在宏观上的多个进程同时执行。

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

python中的multiprocessing包负责管理多进程
multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

#子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

执行结果如下:

Parent process 8876.
Child process will start.
Run child process test (7572)...
Child process end.

创建子进程时,只需要在Process()中的target参数传入一个执行函数和在args或者kwargs参数传入函数对应出参数,创建一个Process实例,用start()方法启动。join()方法可以等待子进程结束后再继续往下执行,通常用于进程的同步

进程池(Pool)
如果要启动大量的子进程,可以使用进程池的方式批量创建子进程。
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()方法后就不能添加新的进程了。

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

结果如下:

Parent process 3620.
Waiting for all subprocesses done...
Run task 0 (7952)...
Run task 1 (5212)...
Run task 2 (9788)...
Run task 3 (6080)...
Task 1 runs 0.80 seconds.
Run task 4 (5212)...
Task 0 runs 1.31 seconds.
Task 2 runs 1.50 seconds.
Task 4 runs 0.73 seconds.
Task 3 runs 2.29 seconds.
All subprocesses done.

输出的结果task0 ,1 ,2 ,3是立刻执行的。而task4要等待前面某个task完成后才继续执行,这是因为Pool的默认大小被设置为4,因此最多同时执行4个进程。
进程池的方法

  • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。

  • close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

  • terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

  • join() : wait工作线程的退出,在调用join()前,必须调用close()或者terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价于wait),否则进程会成为僵尸进程。

  • map(func, iterable[, chunksize])

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])

  • imap(func, iterable[, chunksize])

  • imap_unordered(func, iterable[, chunksize])

  • starmap(func, iterable[, chunksize])

  • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

子进程
详细见

进程间的通信(生产者-消费者模型)
Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果:

Process to read: 7744
Process to write: 6152
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Queue 就是对队列,它是线程安全的
queue模块:

import queue

q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。

q.join()    # 等到队列为空的时候,往后执行
q.qsize()   # 返回队列的大小 (不可靠)
q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
q.put(item, block=True, timeout=None) #  将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,
                        为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,
                          如果队列无法给出放入item的位置,则引发 queue.Full 异常
q.get(block=True, timeout=None) #   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,
                      若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
q.put_nowait(item) #   等效于 put(item,block=False)
q.get_nowait() #    等效于 get(item,block=False)

在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

转载自:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017628290184064
https://blog.csdn.net/xiaochendefendoushi/article/details/80968885