Python中的进程和线程

对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。

有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,进程内的这些“子任务”称为线程(Thread)。

由于每个进程至少要干一件事,所以,一个进程至少有一个线程。当然,像Word这种复杂的进程可以有多个线程,多个线程可以同时执行,多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。当然,真正地同时执行多线程需要多核CPU才可能实现。

若想要同时执行多个任务,也就是多线程的话,有两种解决方案

  • 启动多个进程,每个进程虽然只有一个线程,但多个线程可以一起执行多个任务(多进程模式)
  • 启动一个进程,在一个进程内启动多个线程(多线程模式)
  • 启动多个进程,每个进程内启动多个线程,这种模型相对复杂,实际情景中很少采用(多进程+多线程模式)

线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统决定,程序自己不能决定什么时候执行,执行多长时间。

多进程 multiprocessing

类Unix系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork() 调用一次,返回两次,因为操作系统自动把当前进程**(父进程)复制了一份(子进程)**,然后分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

有了fork调用,一个进程在接到新任务的时候就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当由新的http请求时,就fork出子进程来处理新的http请求。

但是fork在Windows系统中时没有的。于是出现了处理fork的通用模块,以保证在不同操作系统间的调用。

multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了有一个Process类来代表进程对象。

from multiprocessing import Process
import os
''' 子进程要执行的代码 '''
def run_proc(name):
    print('Run child process %s (%s)' % (name, os.getpid()))

if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test_code',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

执行结果:

Parent process 70227.
Child process will start.
Run child process test_code (70228)
Child process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,一般用于进程间的同步。

线程池Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

import os, time, random
from multiprocessing import Pool

def long_time_task(name):
    print("Run task %s (%s)..." % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print("Task %s run %0.f seconds." % (name, (end - start)))
    
if __name__ == "__main__":
    print("Parent process %s." % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print("Waiting for all subprocess done...")
    p.close()
    p.join() #等待所有子进程执行完毕
    print("All subprocess done.")

执行结果:

Parent process 65899.
Waiting for all subprocess done...
Run task 0 (65900)...
Run task 1 (65901)...
Run task 2 (65902)...
Run task 3 (65903)...
Task 3 run 0 seconds.
Run task 4 (65903)...
Task 2 run 1 seconds.
Task 0 run 2 seconds.
Task 1 run 2 seconds.
Task 4 run 2 seconds.
All subprocess done.

子进程

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

可以通过subprocess模块非常方便地启动一个子进程,然后控制其输入和输出。

import subprocess

print("$ nslookup www.baidu.com") 
r = subprocess.call(['nslookup', 'www.baidu.com']) # 运行命令nslookup,和在命令行直接运行的效果一样
print("Exit code: ", r)

进程间的通信

进程之间时需要通信的,操作系统提供了很多机制来实现进程间的通信,multiprocessing模块包装了底层的机制,提供了QueuePipes等多种方式来交换数据。

以Queue为例:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码
def write(q):
    print("Process to write: %s" % os.getpid())
    for value in ['A', 'B', 'C']:
        print("Put %s to queue..." % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码
def read(q):
    print("Process to read: %s" % os.getpid())
    while True:
        value = q.get(True)
        print("Get %s from queue." % value)

if __name__ == '__main__':
    # 父进程创建Queue,并传给各个子进程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束
    pw.join()
    # pr进程里的死循环,无法等待结束,只能强制终止
    pr.terminate()

实际运行效果:

Process to write: 94327
Put A to queue...
Process to read: 94328
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

在类Unix系统下,multiprocessing模块封装了fork()调用,使我们不再需要关注fork()细节。由于Windows系统中没有fork()调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象必须通过pickle序列化再传到子进程去,如果multiprocessingWindows下调用失败了,要先考虑是不是pickle失败了。

多线程

多任务可以由多进程完成,也可以由一个进程内的多线程完成。

由于线程是操作系统直接支持的执行单元,因此,高级语言通常都内置多线程的支持,Python的线程是真正的Podix Thread,而不是模拟出来的线程。

启动一个线程就是把一个函数传入并创建Thread实例,然后调用start()开始执行。

import time, threading

def loop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
# 执行效果:
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

由于任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程,Python的threading模块有个current_thread()函数,它永远返回当前的线程的实例。主线程实例的名字叫MainThread,子线程的名字创建时指定,我们用LoopThread命名子线程。名字仅仅在打印时用来显示,完全没有其他意义,如果不起名字Python就自动给线程命名为Thread-1Thread-2……

Lock

多线程和多进程最大的不同在于,多进程中,同一个变量各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都有所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险就是多个线程同时修改一个变量,把内容打乱了。

import time, threading
balance = 0
def change_it(n):
    # 先存后取,结果应该是0
    global balance
    balance = balance + n
    balance = balance - n
    print(balance)
def run_thread(n):
    for i in range(100000):
        change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

定义了一个共享变量balance,初始值为0,并且启动两个线程,先存后取,理论上结果应该为0,但是由于线程的调度是由操作系统决定的,当t1、t2交替执行的时候,只要循环次数足够多,balance的结果就不一定是0了。

究其原因,是因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。所以,我们必须确保一个线程在修改balance的时候,别的线程一定不能改。

如果要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行时,该线程因为获得了锁,因此其他线程不能同时执行,只能等待,直到锁被释放,获得该锁以后才能更改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以不会造成修改的冲突。创建一个锁可以通过threading.Lock()来实现。

balance = 0
lock = threading.Lock()
def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            change_it(n)
        finally:
            # 释放锁:
            lock.release()

锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

多核的情况下

如果在多核的情况下,照理来说是可以实现多线程的,但是Python解释器有一个历史遗留问题,那就是GIL,Python的线程虽然是指真正的线程,但解释器执行代码时,有一个GIL锁,任何Python线程执行前,必须先获得GIL锁,然后没执行一定数目的字节码,解释器就自动释放GIL锁,让别的线程有机会去执行。GIL全局锁实际上把所有线程的执行代码都上了锁,所以,所现称在Python中只能交替执行,即使100个线程跑在100核CPU上也只能用到一个核。

ThreadLocal

在多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦:

def process_student(name):
    std = Student(name)
    # std是局部变量,但是每个函数都要用到它,因此必须传进去
    do_task_1(std)
    do_task_2(std)
def do_task_1(std):
    do_subtask_1(std)
    do_subtask_2(std)
def do_task_2(std):
    do_subtask_2(std)
    do_subtask_2(std)

这时候就可以用到ThreadLocal

import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
    # 获取当前线程关联的student
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
    # 绑定ThreadLocal的student
    local_school.student = name
    process_student()
t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

全局变量local_school就是一个ThreadLocal对象,每个线程对它都可以读写student属性,但互不影响。可以把local_school看成是全局变量,但每个属性例如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。

进程 and 线程

要实现多任务,通常会设计Master-Worker模式,Master负责分配任务,Worker负责执行任务,因此,多任务环境下,通常是一个Master,多个Worker。

如果用多进程实现Master-Worker,主进程就是Master,其他进程就是Worker。

如果用多线程实现Master-Worker,主线程就是Master,其他线程就是Worker。

多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)著名的Apache最早就是采用多进程模式。

多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。

多线程模式通常比多进程快一点,但是也快不到哪去,而且,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。在Windows上,如果一个线程执行的代码出了问题,你经常可以看到这样的提示:“该程序执行了非法操作,即将关闭”,其实往往是某个线程出了问题,但是操作系统会强制结束整个进程。

在Windows下,多线程的效率比多进程要高,所以微软的IIS服务器默认采用多线程模式。由于多线程存在稳定性的问题,IIS的稳定性就不如Apache。为了缓解这个问题,IIS和Apache现在又有多进程+多线程的混合模式,真是把问题越搞越复杂。

计算密集型和IO密集型

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,不适合于计算密集型任务,对于计算密集型任务,最好用C语言编写。

IO密集型任务主要涉及到网络、磁盘IO,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,常见的大部分任务都是IO密集型任务,比如Web应用。

异步IO

考虑到CPU和IO之间巨大的速度差异,一个任务在执行的过程中大部分时间都在等待IO操作,单进程单线程模型会导致别的任务无法并行执行,因此,我们才需要多进程模型或者多线程模型来支持多任务并发执行。

现代操作系统对IO操作已经做了巨大的改进,最大的特点就是支持异步IO。如果充分利用操作系统提供的异步IO支持,就可以用单进程单线程模型来执行多任务,这种全新的模型称为事件驱动模型,Nginx就是支持异步IO的Web服务器,它在单核CPU上采用单进程模型就可以高效地支持多任务。在多核CPU上,可以运行多个进程(数量与CPU核心数相同),充分利用多核CPU。由于系统总的进程数量十分有限,因此操作系统调度非常高效。用异步IO编程模型来实现多任务是一个主要的趋势。

对应到Python语言,单进程的异步编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。