线程池
概述
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
使用
- 线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类:
- ThreadPoolExecutor 用于创建线程池
- ProcessPoolExecutor 用于创建进程池。
注意: 在用完一个线程池后,应该调用该线程池的 shutdown() 方法,该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务,但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后,该线程池中的所有线程都会死亡。
- 使用线程池来执行线程任务的步骤如下:
- 调用 ThreadPoolExecutor 类的构造器创建一个线程池。
- 定义一个普通函数作为线程任务。
- 调用 ThreadPoolExecutor 对象的 submit() 方法来提交并执行线程任务。
- 当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def action(max):
for i in range(max):
print('tread name:', threading.current_thread().name)
return max
pool = ThreadPoolExecutor(max_workers=2)
# 50 作为 action 参数
future1 = pool.submit(action, 100)
future2 = pool.submit(action, 50)
time.sleep(1) # 阻塞一秒等待任务都执行完成
print('future1 done --', future1.done())
print('future2 done --', future2.done())
print('future1 --', future1.result()) #如果 Future 代表的线程任务还未完成,result()将会阻塞当前线程
print('future1 --', future2.result())
pool.shutdown()
- 为避免result()将会阻塞当前线程,通过 Future 的 add_done_callback() 方法来添加回调函数,该回调函数形如 fn(future)。
- 当线程任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
with ThreadPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(action, 100)
future2 = pool.submit(action, 50)
# 回调函数
def get_result(future):
future.result()
future1.add_done_callback(get_result)
future2.add_done_callback(get_result)
- Exectuor 还提供了一个 map(func, iterables, timeout=None, chunksize=1) 方法,该方法的功能类似于全局函数 map(),区别在于线程池的 map() 方**为 iterables 的每个元素启动一个线程,以并发方式来执行 func 函数。这种方式相当于启动 len(iterables) 个线程,井收集每个线程的执行结果。
with ThreadPoolExecutor(max_workers=2) as pool:
results = pool.map(action, (20, 30))
time.sleep(1) # 阻塞一下让结果最后输出
for result in results:
print(result) # 结果 20, 30 与输入顺序一致
总结
可以看出,使用 map() 方法来启动线程,并收集线程的执行结果,不仅具有代码简单的优点,而且虽然程序会以并发方式来执行 action() 函数,但最后收集的 action() 函数的执行结果,依然与传入参数的结果保持一致。
ThreadLocal
为什么要使用?
当多线程操作同一公有资源时,如果涉及到修改该资源的操作,可能会因为数据不同步而导致的错误,所以需要引入锁机制。
加锁带来的问题:线程占用锁,导致其他想要访问临界资源的线程只能停下来等待,等待锁的释放,若阻塞时间过长,最后会导致程序的整体性能下降。
解决方案: - 读写锁:把读线程和写线程的锁分离 - ThreadLocal:在自己线程的内存空间中拷贝一份。看似全局变量(可以被各个线程调用),但各线程调用的都是该变量的副本。
使用
首先举例不安全的情况
class Test:
res = ''
def action(self, name):
self.res = name
time.sleep(1)
print('name:' + self.res)
def fun(self):
with ThreadPoolExecutor(max_workers=2) as pool:
pool.submit(self.action, 't1')
pool.submit(self.action, 't2')
if __name__ == '__main__':
test = Test()
test.fun()
由上例可以看出,当 t1 线程修改 res 并被阻塞后,线程 t2 对 res 再次进行修改,导致输出两次 name: t2
解决方案,采用 ThreadLocal, 为各个线程创建完全属于它们自己的变量(又称线程局部变量)。
class Test:
local = threading.local()
def action(self, name):
self.local.res = name
time.sleep(1)
print('name:' + self.local.res)
def fun(self):
with ThreadPoolExecutor(max_workers=2) as pool:
pool.submit(self.action, 't1')
pool.submit(self.action, 't2')
if __name__ == '__main__':
test = Test()
test.fun()