Python多线程与协程的介绍使用
一、开始使用多线程
Thread类由threading模块提供,通过实例化Thread类可以创建线程,其构造函数如下:
Thread(group = None, target= None, name = None, args = (), kwargs = None, *, daemon = None)
其中关于参数的解释如下:
group,保留参数,暂时没有用处,可以忽略; target,可以传入一个函数的引用,表示新启动的线程执行该函数逻辑; args,表示要传递给新线程的位置参数; kwargs,表示要传递给新线程的键值对参数; name,可以设定新线程的名称; daemon,表示该线程是否是守护线程,一般我们都不需要;
下面给一个最简单的多线程使用案例:
from time import sleep from threading import Thread, current_thread def do_thread_work(): # current_thread()会返回当前线程的引用 thread_name = current_thread().name print(f"开始执行{thread_name}线程...") # 模拟耗时的业务操作 sleep(2) print(f"{thread_name}线程已经执行完毕!") print("主线程开始了...") thread1 = Thread(target=do_thread_work, name="thread1") thread2 = Thread(target=do_thread_work, name="thread2") thread3 = Thread(target=do_thread_work, name="thread3") thread1.start() thread2.start() thread3.start() # join()表示主线程要等子线程执行结束之后才继续执行 thread1.join() thread2.join() thread3.join() print("主线程执行结束了!") 输出内容如下: 主线程开始了... 开始执行thread1线程... 开始执行thread2线程... 开始执行thread3线程... thread3线程已经执行完毕! thread1线程已经执行完毕! thread2线程已经执行完毕! 主线程执行结束了!
二、线程锁的使用
当多个线程访问共享资源的时候,势必会引起并发的问题,为了保证共享资源同时只能被一个线程使用,所以才有了线程锁。
如下是没有使用线程锁的案例,各个线程没有进行并发控制访问共享资源,从而导致共享资源被用超了:
from time import sleep from threading import Thread, Lock Num = 10 def do_thread_work(): # 标注Num使用的是全局变量 global Num while True: # Num递减到0 if Num > 0: sleep(0.1) Num -= 1 print(f"递减后Num的值为{Num}") else: break # 使用推导语句创建一个线程列表 threads = [Thread(target=do_thread_work) for i in range(3)] for t in threads: t.start() print("主线程结束!") 输出内容如下: 主线程结束! 递减后Num的值为9 递减后Num的值为8 递减后Num的值为7 递减后Num的值为6 递减后Num的值为5 递减后Num的值为4 递减后Num的值为3 递减后Num的值为2 递减后Num的值为1 递减后Num的值为0 递减后Num的值为-1 递减后Num的值为-2 现在我们使用线程锁lock改造一下上述代码: from time import sleep from threading import Thread, Lock Num = 10 # 获取一个线程锁实例 lock = Lock() def do_thread_work(): # 标注Num使用的是全局变量 global Num while True: # 获取锁之后才进行Num的比较和递减操作 if lock.acquire(): # Num递减到0 if Num > 0: sleep(0.1) Num -= 1 print(f"递减后Num的值为{Num}") else: # 所有的退出情况都要释放锁 lock.release() break # 所有的退出情况都要释放锁 lock.release() # 使用推导语句创建一个线程列表 threads = [Thread(target=do_thread_work) for i in range(3)] for t in threads: t.start() print("主线程结束!") 输出结果如下: 主线程结束! 递减后Num的值为9 递减后Num的值为8 递减后Num的值为7 递减后Num的值为6 递减后Num的值为5 递减后Num的值为4 递减后Num的值为3 递减后Num的值为2 递减后Num的值为1 递减后Num的值为0
当然在如上的例子中,我们需要在else和if结束的地方增加两处lock.release(),因为每一个程序的出口都要确保锁被释放,这样显得不够简洁优雅,而且开发者很容易忘记,Python为我们提供了上下文管理器的用法来处理这种情况:
def do_thread_work(): # 标注Num使用的是全局变量 global Num while True: # 使用上下文管理器来锁定资源,其会自己管理锁进行释放 with lock: if Num > 0: sleep(0.1) Num -= 1 print(f"递减后Num的值为{Num}") else: break
如此代码就变得简洁易读多了。
三、等待事件信号的使用
等待事件常用于线程之间有依赖的场景,比如A线程执行到一半,需要暂停,等待B线程执行一部分之后,通知A线程继续执行。
from time import sleep from threading import Thread, Event # 定义两个下载事件 event_download_task1 = Event() event_download_task2 = Event() def download_part1(): print("第一部分开始下载...") for i in range(10): sleep(0.5) print("#" * i) print("第一部分下载完成!") event_download_task1.set() def download_part2(): # 等待第一部分下载完成 event_download_task1.wait() print("第二部分开始下载...") for i in range(10): sleep(0.5) print("#" * i) print("第二部分下载完成!") event_download_task2.set() def install(): # 等待第二部分下载完成 event_download_task2.wait() print("全部下载完成,开始安装...") for i in range(10): sleep(0.5) print("#" * i) print("安装完成!") thread1 = Thread(target=download_part1) thread2 = Thread(target=download_part2) thread3 = Thread(target=install) thread1.start() thread2.start() thread3.start() 输出内容如下: 第一部分开始下载... # ## ### #### ##### ###### ####### ######## ######### 第一部分下载完成! 第二部分开始下载... # ## ### #### ##### ###### ####### ######## ######### 第二部分下载完成! 全部下载完成,开始安装... # ## ### #### ##### ###### ####### ######## ######### 安装完成!
四、同步屏障的使用
当我们启动多个线程时,各个线程的执行进度都是不受我们控制的,我们想要设置一个同步点,使得所有线程都到这个同步点时停下来,等待其它所有线程都达到这个同步点之后,再一起出发,执行各自接下来的部分。
import threading # 设置同步屏障对象,指定等待的线程数量为3 barrier = threading.Barrier(parties=3) def do_something(): thread_name = threading.current_thread().name print(f"线程{thread_name}已经到达同步屏障点,等待中...") # 使得当前线程在该屏障点阻塞 barrier.wait() print(f"线程{thread_name}继续执行!") for i in range(3): thread = threading.Thread(target=do_something, name=f"线程{i+1}") thread.start() 输出的内容如下: 线程线程1已经到达同步屏障点,等待中... 线程线程2已经到达同步屏障点,等待中... 线程线程3已经到达同步屏障点,等待中... 线程线程3继续执行! 线程线程2继续执行! 线程线程1继续执行! 五、异步函数(协程)的使用 我们知道进程是资源分配的最小单位,线程是任务调度的最小单位,但有的时候线程操作一些IO耗时任务的时候,仍然会造成CPU资源的浪费,因此才出现了协程的概念,协程是比线程更小粒度的任务处理单元。 进程和线程是计算机操作系统支持的,协程是在操作系统中不存在,只是Python提供的一种概念,为了提升并发度,提高执行效率而生; 协程,也可称作微线程,是非抢占资源的,其本质就是一种用户态的上下文切换技术,通过一个线程实现不同代码块之间的相互切换执行; 协程的意义就是节省线程中IO等待时的资源,利用IO等待的时间,充分利用CPU去做一些其它代码片段定义的事情; 协程通常使用关键字async来定义,然后使用await来挂起自身协程,等待另外一个协程完成后再继续执行: from asyncio import * async def async_function(): return 1 async def await_coroutine(): # 挂起自身,等待另一个协程结束 result = await async_function() print(result) run(await_coroutine()) 需要注意,await只能出现在async修饰的协程函数中,而且只能用来等待另外一个协程函数。 我们还可以通过创建任务的方式来一次性并发执行多个协程,当前线程会自己切换上下文进行多个协程任务的执行: import asyncio async def son1(): print("son1") async def son2(): print("son2") async def father(): task1 = asyncio.create_task(son1()) task2 = asyncio.create_task(son2()) await task1 await task2 print("father") asyncio.run(father()) 其等价于 import asyncio async def son1(): print("son1") async def son2(): print("son2") async def father(): await son1() await son2() print("father") asyncio.run(father())
协程只有和异步IO结合起来才能发挥最大的并发威力。
六、线程池的使用
在实际的工程使用中,频繁地创建和销毁线程比较耗费性能,所以一般都采用线程池的方式,提前创建好空闲的线程。当有任务需要操作时,直接将任务提交给线程池,由线程池自己选取空闲的线程进行操作,线程池中的线程管理对开发者都是透明的,所以既能提高系统性能,对开发者而言也是省时省力。
from time import sleep from concurrent.futures import ThreadPoolExecutor # 创建拥有5个线程的线程池 executor = ThreadPoolExecutor(5) def do_thread_work(i): sleep(1) print(i) for i in range(10): executor.submit(do_thread_work, i) 如上,开发者只要关心自己的逻辑即可,无需操心线程的管理工作。 如果需要获取线程返回的结果,可以使用as_completed函数来帮助我们: from time import sleep from concurrent.futures import ThreadPoolExecutor, as_completed # 创建拥有5个线程的线程池 executor = ThreadPoolExecutor(5) # 存放所有线程任务的返回 tasks = [] def do_thread_work(i): sleep(1) return i for i in range(10): e = executor.submit(do_thread_work, i) tasks.append(e) for task in as_completed(tasks): print(task.result())