线程池

概述

系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。

使用

  • 线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类:
    • ThreadPoolExecutor 用于创建线程池
    • ProcessPoolExecutor 用于创建进程池。

注意: 在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。

  • 使用线程池来执行线程任务的步骤如下:
    1. 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
    2. 定义一个普通函数作为线程任务。
    3. 调用 ThreadPoolExecutor 对象的 submit() 方法来提交并执行线程任务。
    4. 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
from concurrent.futures import ThreadPoolExecutor
import threading
import time

def action(max):
    for i in range(max):
        print('tread name:', threading.current_thread().name)
    return max


pool = ThreadPoolExecutor(max_workers=2)
# 50 作为 action 参数
future1 = pool.submit(action, 100)
future2 = pool.submit(action, 50)

time.sleep(1)  # 阻塞一秒等待任务都执行完成
print('future1 done --', future1.done())
print('future2 done --', future2.done())

print('future1 --', future1.result()) #如果 Future 代表的线程任务还未完成,result()将会阻塞当前线程
print('future1 --', future2.result())

pool.shutdown()

  • 为避免result()将会阻塞当前线程,通过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。
  • 当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
with ThreadPoolExecutor(max_workers=2) as pool:
    future1 = pool.submit(action, 100)
    future2 = pool.submit(action, 50)
	
    # 回调函数
    def get_result(future):
        future.result()

    future1.add_done_callback(get_result)
    future2.add_done_callback(get_result)

  • Exectuor 还提供了一个 map(func, iterables, timeout=None, chunksize=1) 方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方**为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。
with ThreadPoolExecutor(max_workers=2) as pool:

    results = pool.map(action, (20, 30))

    time.sleep(1)  # 阻塞一下让结果最后输出
    for result in results:
        print(result)  # 结果 20, 30 与输入顺序一致

总结

可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致。

ThreadLocal

为什么要使用?

当多线程操作同一公有资源时,如果涉及到修改该资源的操作,可能会因为数据不同步而导致的错误,所以需要引入锁机制。

加锁带来的问题:线程占用锁,导致其他想要访问临界资源的线程只能停下来等待,等待锁的释放,若阻塞时间过长,最后会导致程序的整体性能下降。

解决方案: - 读写锁:把读线程和写线程的锁分离 - ThreadLocal:在自己线程的内存空间中拷贝一份。看似全局变量(可以被各个线程调用),但各线程调用的都是该变量的副本。

使用

首先举例不安全的情况

class Test:
    res = ''

    def action(self, name):
        self.res = name
        time.sleep(1)
        print('name:' + self.res)

    def fun(self):
        with ThreadPoolExecutor(max_workers=2) as pool:
            pool.submit(self.action, 't1')
            pool.submit(self.action, 't2')


if __name__ == '__main__':
    test = Test()
    test.fun()

由上例可以看出,当 t1 线程修改 res 并被阻塞后,线程 t2 对 res 再次进行修改,导致输出两次 name: t2

解决方案,采用 ThreadLocal, 为各个线程创建完全属于它们自己的变量(又称线程局部变量)。

class Test:
    local = threading.local()

    def action(self, name):
        self.local.res = name
        time.sleep(1)
        print('name:' + self.local.res)

    def fun(self):
        with ThreadPoolExecutor(max_workers=2) as pool:
            pool.submit(self.action, 't1')
            pool.submit(self.action, 't2')


if __name__ == '__main__':
    test = Test()
    test.fun()