Python多线程与协程的介绍使用

作者: adm 分类: python 发布时间: 2023-11-10

一、开始使用多线程
Thread类由threading模块提供,通过实例化Thread类可以创建线程,其构造函数如下:

Thread(group = None, target= None, name = None, args = (), kwargs = None, *, daemon = None)

其中关于参数的解释如下:

group,保留参数,暂时没有用处,可以忽略;
target,可以传入一个函数的引用,表示新启动的线程执行该函数逻辑;
args,表示要传递给新线程的位置参数;
kwargs,表示要传递给新线程的键值对参数;
name,可以设定新线程的名称;
daemon,表示该线程是否是守护线程,一般我们都不需要;

下面给一个最简单的多线程使用案例:

from time import sleep
from threading import Thread, current_thread

def do_thread_work():
    # current_thread()会返回当前线程的引用
    thread_name = current_thread().name
    print(f"开始执行{thread_name}线程...")
    # 模拟耗时的业务操作
    sleep(2)
    print(f"{thread_name}线程已经执行完毕!")

print("主线程开始了...")

thread1 = Thread(target=do_thread_work, name="thread1")
thread2 = Thread(target=do_thread_work, name="thread2")
thread3 = Thread(target=do_thread_work, name="thread3")

thread1.start()
thread2.start()
thread3.start()

# join()表示主线程要等子线程执行结束之后才继续执行
thread1.join()
thread2.join()
thread3.join()

print("主线程执行结束了!")
输出内容如下:

主线程开始了...
开始执行thread1线程...
开始执行thread2线程...
开始执行thread3线程...
thread3线程已经执行完毕!
thread1线程已经执行完毕!
thread2线程已经执行完毕!
主线程执行结束了!

二、线程锁的使用
当多个线程访问共享资源的时候,势必会引起并发的问题,为了保证共享资源同时只能被一个线程使用,所以才有了线程锁。

如下是没有使用线程锁的案例,各个线程没有进行并发控制访问共享资源,从而导致共享资源被用超了:

from time import sleep
from threading import Thread, Lock

Num = 10

def do_thread_work():
    # 标注Num使用的是全局变量
    global Num
    while True:
        # Num递减到0
        if Num > 0:
            sleep(0.1)
            Num -= 1
            print(f"递减后Num的值为{Num}")
        else:
            break

# 使用推导语句创建一个线程列表
threads = [Thread(target=do_thread_work) for i in range(3)]
for t in threads:
    t.start()

print("主线程结束!")
输出内容如下:

主线程结束!
递减后Num的值为9
递减后Num的值为8
递减后Num的值为7
递减后Num的值为6
递减后Num的值为5
递减后Num的值为4
递减后Num的值为3
递减后Num的值为2
递减后Num的值为1
递减后Num的值为0
递减后Num的值为-1
递减后Num的值为-2
现在我们使用线程锁lock改造一下上述代码:

from time import sleep
from threading import Thread, Lock

Num = 10
# 获取一个线程锁实例
lock = Lock()

def do_thread_work():
    # 标注Num使用的是全局变量
    global Num
    while True:
        # 获取锁之后才进行Num的比较和递减操作
        if lock.acquire():
            # Num递减到0
            if Num > 0:
                sleep(0.1)
                Num -= 1
                print(f"递减后Num的值为{Num}")
            else:
                # 所有的退出情况都要释放锁
                lock.release()
                break
            # 所有的退出情况都要释放锁
            lock.release()

# 使用推导语句创建一个线程列表
threads = [Thread(target=do_thread_work) for i in range(3)]
for t in threads:
    t.start()

print("主线程结束!")
输出结果如下:

主线程结束!
递减后Num的值为9
递减后Num的值为8
递减后Num的值为7
递减后Num的值为6
递减后Num的值为5
递减后Num的值为4
递减后Num的值为3
递减后Num的值为2
递减后Num的值为1
递减后Num的值为0

当然在如上的例子中,我们需要在else和if结束的地方增加两处lock.release(),因为每一个程序的出口都要确保锁被释放,这样显得不够简洁优雅,而且开发者很容易忘记,Python为我们提供了上下文管理器的用法来处理这种情况:

def do_thread_work():
    # 标注Num使用的是全局变量
    global Num
    while True:
        # 使用上下文管理器来锁定资源,其会自己管理锁进行释放
        with lock:
            if Num > 0:
                sleep(0.1)
                Num -= 1
                print(f"递减后Num的值为{Num}")
            else:
                break

如此代码就变得简洁易读多了。

三、等待事件信号的使用
等待事件常用于线程之间有依赖的场景,比如A线程执行到一半,需要暂停,等待B线程执行一部分之后,通知A线程继续执行。

from time import sleep
from threading import Thread, Event

# 定义两个下载事件
event_download_task1 = Event()
event_download_task2 = Event()

def download_part1():
    print("第一部分开始下载...")
    for i in range(10):
        sleep(0.5)
        print("#" * i)
    print("第一部分下载完成!")
    event_download_task1.set()

def download_part2():
    # 等待第一部分下载完成
    event_download_task1.wait()
    print("第二部分开始下载...")
    for i in range(10):
        sleep(0.5)
        print("#" * i)
    print("第二部分下载完成!")
    event_download_task2.set()

def install():
    # 等待第二部分下载完成
    event_download_task2.wait()
    print("全部下载完成,开始安装...")
    for i in range(10):
        sleep(0.5)
        print("#" * i)
    print("安装完成!")

thread1 = Thread(target=download_part1)
thread2 = Thread(target=download_part2)
thread3 = Thread(target=install)

thread1.start()
thread2.start()
thread3.start()
输出内容如下:

第一部分开始下载...

#
##
###
####
#####
######
#######
########
#########
第一部分下载完成!
第二部分开始下载...

#
##
###
####
#####
######
#######
########
#########
第二部分下载完成!
全部下载完成,开始安装...

#
##
###
####
#####
######
#######
########
#########
安装完成!

四、同步屏障的使用
当我们启动多个线程时,各个线程的执行进度都是不受我们控制的,我们想要设置一个同步点,使得所有线程都到这个同步点时停下来,等待其它所有线程都达到这个同步点之后,再一起出发,执行各自接下来的部分。

import threading

# 设置同步屏障对象,指定等待的线程数量为3
barrier = threading.Barrier(parties=3)

def do_something():
    thread_name = threading.current_thread().name
    print(f"线程{thread_name}已经到达同步屏障点,等待中...")
    # 使得当前线程在该屏障点阻塞
    barrier.wait()
    print(f"线程{thread_name}继续执行!")

for i in range(3):
    thread = threading.Thread(target=do_something, name=f"线程{i+1}")
    thread.start()
输出的内容如下:

线程线程1已经到达同步屏障点,等待中...
线程线程2已经到达同步屏障点,等待中...
线程线程3已经到达同步屏障点,等待中...
线程线程3继续执行!
线程线程2继续执行!
线程线程1继续执行!
五、异步函数(协程)的使用
我们知道进程是资源分配的最小单位,线程是任务调度的最小单位,但有的时候线程操作一些IO耗时任务的时候,仍然会造成CPU资源的浪费,因此才出现了协程的概念,协程是比线程更小粒度的任务处理单元。

进程和线程是计算机操作系统支持的,协程是在操作系统中不存在,只是Python提供的一种概念,为了提升并发度,提高执行效率而生;
协程,也可称作微线程,是非抢占资源的,其本质就是一种用户态的上下文切换技术,通过一个线程实现不同代码块之间的相互切换执行;
协程的意义就是节省线程中IO等待时的资源,利用IO等待的时间,充分利用CPU去做一些其它代码片段定义的事情;
协程通常使用关键字async来定义,然后使用await来挂起自身协程,等待另外一个协程完成后再继续执行:

from asyncio import *

async def async_function():
    return 1

async def await_coroutine():
    # 挂起自身,等待另一个协程结束
    result = await async_function()
    print(result)
    
run(await_coroutine())
需要注意,await只能出现在async修饰的协程函数中,而且只能用来等待另外一个协程函数。

我们还可以通过创建任务的方式来一次性并发执行多个协程,当前线程会自己切换上下文进行多个协程任务的执行:

import asyncio

async def son1():
    print("son1")

async def son2():
    print("son2")

async def father():
    task1 = asyncio.create_task(son1())
    task2 = asyncio.create_task(son2())

    await task1
    await task2

    print("father")

asyncio.run(father())
其等价于

import asyncio

async def son1():
    print("son1")

async def son2():
    print("son2")

async def father():
    await son1()
    await son2()

    print("father")

asyncio.run(father())

协程只有和异步IO结合起来才能发挥最大的并发威力。

六、线程池的使用
在实际的工程使用中,频繁地创建和销毁线程比较耗费性能,所以一般都采用线程池的方式,提前创建好空闲的线程。当有任务需要操作时,直接将任务提交给线程池,由线程池自己选取空闲的线程进行操作,线程池中的线程管理对开发者都是透明的,所以既能提高系统性能,对开发者而言也是省时省力。


from time import sleep
from concurrent.futures import ThreadPoolExecutor

# 创建拥有5个线程的线程池
executor = ThreadPoolExecutor(5)

def do_thread_work(i):
   sleep(1)
   print(i)


for i in range(10):
   executor.submit(do_thread_work, i)
如上,开发者只要关心自己的逻辑即可,无需操心线程的管理工作。

如果需要获取线程返回的结果,可以使用as_completed函数来帮助我们:

from time import sleep
from concurrent.futures import ThreadPoolExecutor, as_completed

# 创建拥有5个线程的线程池
executor = ThreadPoolExecutor(5)
# 存放所有线程任务的返回
tasks = []

def do_thread_work(i):
   sleep(1)
   return i


for i in range(10):
   e = executor.submit(do_thread_work, i)
   tasks.append(e)

for task in as_completed(tasks):
   print(task.result())

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!