前言

本文深度对比 Python 并发方案适用场景和优缺点,主要是介绍 asyncio 这个方案。

注:本文代码需要使用 Python 3.10 及以上版本才能正常运行。

Python 并发和并行方案

在 Python 世界有 3 种并发和并行方案,如下:

  1. 多线程 (threading)
  2. 多进程 (multiprocessing)
  3. 异步 IO (asyncio)

注:并发和并行的区别先不提,最后会借着例子更好的解释,另外稍后也会提到concurrent.futures,不过它不是一种独立的方案,所以在这里没有列出来。

这些方案是为了解决不同特点的性能瓶颈。性能问题主要有 2 种:

  1. CPU 密集型 (CPU-bound)。这也就是指计算密集型任务,它的特点事需要要进行大量的计算。例如 Python 内置对象的各种方法的执行,科学计算,视频转码等等。
  2. I/O 密集型 (I/O-bound)。凡是涉及到网络、内存访问、磁盘 I/O 等的任务都是 IO 密集型任务,这类任务的特点是 CPU 消耗很少,任务的大部分时间都在等待 I/O 操作完成。例如数据库连接、Web 服务、文件读写等等。

如果你不知道一个任务哪种类型,我的经验是你问问自己,如果给你一个更好更快的 CPU 它可以更快,那么这就是一个 CPU 密集的任务,否则就是 I/O 密集的任务。

这三个方案中对于 CPU 密集型的任务,优化方案只有一种,就是使用多进程充分利用多核 CPU 一起完成任务,达到提速的目的。而对于 I/O 密集型的任务,则这三种方案都可以

接着借着一个抓取网页并写入本地 (典型的 I/O 密集型任务) 小例子来挨个拆解对比一下这些方案。先看例子:

import requests

url = 'https://movie.douban.com/top250?start='
headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'  # noqa
}


def fetch(session, page):
    with (session.get(f'{url}{page*25}', headers=headers) as r,
          open(f'top250-{page}.html', 'w') as f):
        f.write(r.text)


def main():
    with requests.Session() as session:
        for p in range(25):
            fetch(session, p)


if __name__ == '__main__':
    main()

在这个例子中会抓取豆瓣电影 Top250 的 25 个页面 (每页显示 10 个电影),使用 requests 库,不同页面按顺序请求,一共花了 3.9 秒:

time python io_non_concurrent.py
python io_non_concurrent.py  0.23s user 0.05s system 7% cpu 3.911 total

这个速度虽然看起来还是很好的,一方面是豆瓣做了很好的优化,一方面我家的带宽网速也比较好。接着用上面三种方案优化看看效果。

多进程版本

Python 解释器使用单进程,如果服务器或者你的电脑是多核的,这么用其实是很浪费的,所以可以通过多进程提速:

from multiprocessing import Pool

def main():
    with (Pool() as pool,
          requests.Session() as session):
        pool.starmap(fetch, [(session, p) for p in range(25)])

注:这里省略到了那些上面已经出现的了代码,只展示改变了的那部分。

使用多进程池,但没指定进程数量,所以会按着 Macbook 的核数启动 10 个进程一起工作,耗时如下:

time python use_multiprocessing.py
python use_multiprocessing.py  2.15s user 0.30s system 232% cpu 1.023 total

多进程理论上可以有十倍效率的提升,因为 10 个进程在一起执行任务。当然由于任务数量是 25,不是整数倍,是无法达到 10 倍的降低耗时,而且由于抓取太快了,没有充分显示多进程方案下的效率提升,所以用时 1 秒,也就是大约 4 倍的效率提升。

多进程方案下没有明显的缺点,只要机器够强悍,就可以更快。

多线程版本

Python 解释器不是线程安全的,为此 Python 设计了 GIL: 获得 GIL 锁才可以访问线程中的 Python 对象。所以在任何一个时间,只有一个线程可以执行代码,这样就不会引发竞态条件 (Race Condition) ,虽然 GIL 的问题很多,但是 GIL 却是还有它存在的优点,例如简化了内存管理等等,这些不是本文重点所以就不展开了,有兴趣的可以专门去了解。

那么有同学会问,既然同一时间永远只有一个线程在工作,那么多线程可以提高并发效率的原因是什么呢?

解释这个问题还是要提 GIL。延伸阅读链接 1《Understanding the Python GIL》中做了很好的解释(这里要注意,我们提的方案是 Python 3.2 新的 GIL,而不是 Python2 的旧版 GIL,现在网上有很多针对旧的 GIL 的描述,其实是过时的,这部分也可以看看延伸阅读链接 2 的文章帮助理解它们的区别),我截其中几张 PPT 来说明:

在上图里,本来只有 1 个线程,所以不需要释放或者获得 GIL,但是接着出现了第二个线程,这样就是多个线程,一开始线程 2 是挂起状态,因为它没有 GIL。

线程 1 在一个cv_wait周期内会自愿的放弃 GIL,例如出现了 I/O 阻塞,或者超时了 (线程不能一直拿着不放,即便在一个周期内没有出现 I/O 阻塞也要强制释放执行权,这个默认时间是 5 毫秒,可以通过sys.setswitchinterval设置,当然设置前你得知道你在做什么) 都会触发这个释放 GIL 的操作。

这里演示了常规的例子 (非超时被迫释放),在cv_wait阶段,线程 1 由于遇到了 I/O 阻塞,会发送信号给线程 2,此时线程 1 让出 GIL 并挂起,而线程 2 获得 GIL,如此循环,之后线程 2 会释放 GIL 给线程 1。这个 PPT 在业界非常知名,建议大家多看看。之后的 PPT 还列举了超时的处理,由于和我们这篇文章关系稍远也不展开了,有兴趣的接着看。btw,我第一次看这个 PPT 觉得这个超时时间好可怕,也就是说 1 秒钟要最少切换 200 次,这也太浪费了,所以你可以尝试在代码中调大这个超时时间哟。

通过上面的内容,多线程通过 GIL 的控制,每个线程都得到了更好的执行时机,所以不会出现被某个线程任务一直阻塞,因为如果线程遇到阻塞会自愿让出 GIL 让自己挂起,把机会让给其他线程,这样就提高了执行任务总体的效率。多线程模式下最完美的场景就是任何时间点对应的线程都在做事,而不是有的线程其实等着被执行,但是实际上却被阻塞着。

我们看一下多线程的方案:

from multiprocessing.pool import ThreadPool

def main():
    with (ThreadPool(processes=5) as pool,
          requests.Session() as session):
        pool.starmap(fetch, [(session, p) for p in range(25)])

这里说明 2 点:

  1. 多进程和多线程例子中我都使用了【池】,这是一个好的习惯,因为线 (进) 程过多会带来额外的开销,其中包括创建销毁的开销、调度开销等等,同时也降低了计算机的整体性能。使用线 (进) 程池维护多个线 (进) 程,等待监督管理者分配可并发执行的任务。这样一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。另外用标准库里的进程池和线程池的实现写额外代码极少,而且代码结构还很像,特别适合写对比的例子。
  2. processes 如果不指定也是和 CPU 核数一致的 10,但是并不是线程越多越好,因为线程多了,反而出现本来正常有效的执行却被 GIL 强制释放,这就造成多余上下文切换反而是一个负担了。

在这个例子中,线程数为 5,这个其实一方面是经验,一方面是多次调试值的结果,所以这也暴露了多线程编程中如果稍有不慎会让优化变差,也会存在没有找到最优值得问题,因为GIL 控制线程是一个黑盒操作,开发者无法直接控制,这哪怕对一些相对有经验的 Python 开发也非常不友好。

我们看一下时间:

time python use_threading.py
python use_threading.py  0.62s user 0.24s system 74% cpu 1.157 total

可以看到,多线程方案下比原始方案速度快了一倍以上,但是比多进程方案差一点 (事实上我认为在真实的例子中会差很多)。这是因为在多进程方案下多核 CPU 都在独立工作,而多线程方案一方面由于效率问题下不能使用那么多数量的线程,而且由于 GIL 的限制,在不需要被释放 GIL 的时候依然被强制释放,就这么不断的切换的过程中反而降低了效率,让效果大打折扣。

concurrent.futures 版本

这里也顺便提一下concurrent.futures的方案。其实它不是一个全新的方案,这是在其他语言 (例如 Java) 里早就出现的一种框架,可以通过它控制线 (进) 程的启动、执行和关闭。我把它理解为抽象了多进程池和多线程池的代码,让开发者不需要关注多线程和多进程模块的具体细节和用法。其实理解起来也不难,你可以这么拆解:

其实理解起来也不难,例如 ThreadPoolExecutor 可以这么拆解:ThreadPoolExecutor = Thread + Pool + Executor,其实就是线程 + 池 + 执行器。就是预先创建一个线程池用来被重复使用,Executor 将任务提交和任务执行进行解耦,它完成线程的调配 (如何以及何时) 和任务的执行部分。

如果你想了解它的细节,我推荐直接看它的源码文件头部的注释,里面对于数据流有非常详细的说明,可以说比任何技术文章写的都要深入准确了。

这里只演示一下 ThreadPoolExecutor 的用法:

from functools import partial
from concurrent.futures import ThreadPoolExecutor

def main():
    with (ThreadPoolExecutor(max_workers=5) as pool,
          requests.Session() as session):
        list(pool.map(partial(fetch, session), range(25)))

是不是很熟悉的配方?接口和上面用的进程池线程池都很像,但是要注意max_workers如果不指定的话数量是 CPU 个数 + 4,最大为 32。它和多线程的用法问题一样,这个max_workers需要调优 (这里为了对比,所以用了相同的数值)。

 time python use_executor.py
python use_executor.py  0.63s user 0.32s system 82% cpu 1.153 total

虽然concurrent.futures是现在更主流的方案,但是在我使用的体验里,它的效率要略低于直接使用进程池或者线程池的代码,因为它高度抽象,却把事情搞得复杂了,例如用到了对应的 queue (queue 模块) 和信号量 (Semaphore),这些反而限制了性能的提升。所以我的建议是,Python 初学者可以用它,但高级开发者应该自己控制并发实现。

asyncio 版本

前面的多线程相关的方案中,需要开发者根据经验或者去实验,找到一个 (或者多个) 最优的线程数量,不同的场景这个值区别是很大的,这对于初学者很不友好,非常容易陷入【在用多线程,但是用错了或者用的不够好】这么一种境地。

后来 Python 引入了新的并发模型: aysncio,本小节给大家解释下最新的 asyncio 方案为什么是一个更优的选择。首先还是看《Understanding the Python GIL》里面的一页 PPT:

我们回忆一下,它提到当只有单个线程时,实际上不会触发 GIL,这个独立的线程可以一直执行下去。这也是 asyncio 找到的切入点:因为是单进程单线程的,所以理论上不受 GIL 的限制。在事件驱动的机制下,可以更好的利用单线程的性能,尤其是通过 await 关键词可以让开发者自己决定调度方案,而不是多线程那种由 GIL 来控制。

那设想一下,在最美好的情况下,所有 await 的地方都是可能的 I/O 阻塞的。那么在执行时,遇到 I/O 阻塞就可以切换协程,执行其他可以继续执行的任务,所以,这个线程一直都在工作而不会阻塞,可以说利用率达到 100%!这是多线程方案下永远不可及的。

讲到这个,我们再回去重新整理和理解一遍,先出基本理论开始。

协程

协程是一种特殊函数,这个函数在本来的 def 关键字前面加了 async 关键字,本质上它是生成器函数,可以生成值或者接收外面发送 (通过 send 方法) 来的值,但是它最重要的特点是它可以在需要时保存上下文 (或者说状态),挂起自己并将控制权交给调用者,由于它保存了挂起时的上下文,在未来可以接着被执行。

其实在调用协程是,它并不会立刻执行:

In : async def a():
...:     print('Called')
...:

In : a()  # 并未执行,只是返回了协程对象
Out: <coroutine object a at 0x10444aa40>

In : await a()  # 使用await才会真的执行
Called

异步和并发

异步 (asynchronous)、非阻塞 (non-blocking)、并发 (concurrent) 是很容易让人产生迷惑的词。结合 asyncio 场景,我的理解是:

  1. 协程是异步执行的,在 asyncio 中,协程可以在等待执行结果时把自己【暂停】,以便让其他协程同时运行。
  2. 异步让执行不需要等待阻塞的逻辑完成就可以先让其他代码同时运行,所以这样就不会【阻塞】其他代码,那么这就是【非阻塞】的代码
  3. 使用异步代码编写的程序执行时,看起来其中的任务都在同时执行和完成 (因为会在等待中切换),所以看起来是【并发】的

事件循环 (EventLoop)

Event Loop 这个概念其实我理解了很多年,从 Twisted 时代开始。我一直觉得它非常神秘复杂,现在看来其实想多了。对于初学者,不如换个思路,它的重点就是事件 + 循环: Loop 是一个环,每个任务作为一个事件放到这个环上,事件会不断地循环,在符合条件的情况下触发执行事件。它的特点如下:

  1. 一个事件循环运行在一个线程中
  2. Awaitables 对象 (协程、Task、Future 下面都会提到) 都可以注册到事件循环上
  3. 如果协程中调用了另外一个协程 (通过 await), 这个协程会挂起,发生上下文切换转而去执行另外这个协程,如此循环
  4. 如果协程执行时遇到 I/O 阻塞,这个协程会带着上下文挂起,然后把控制权交还给 EventLoop
  5. 既然是 loop。注册的全部事件执行完毕后,循环会重新开始

Future/Task

asyncio.Future我觉得像 Javascript 里面的Promise, 它是一个占位对象,代表一件还没有做完的事情,在未来才会实现或者完成 (当然还可能由于内部出错而抛出异常)。它和上面提的concurrent.futures方案中实现的concurrent.futures.Futures很像,但是针对 asyncio 的事件循环做了很多定制。asyncio.Future它仅仅是一个数据的容器。

asyncio.Taskasyncio.Future的子类,它用于在事件循环中运行协程。

在官方文档中提到了一个非常直观的例子,我这里改写它在 IPython 里面执行并说明:

In : async def set_after(fut):  # 创建一个协程,他会异步的sleep3秒,然后给future对象设置结果
...:     await asyncio.sleep(3)
...:     fut.set_result('Done')
...:

In : loop = asyncio.get_event_loop()  # 获取当前的事件循环

In : fut = loop.create_future()  # 在事件循环中创建一个Future

In : fut  # 此时它还是默认的pending状态,因为没有调用它
Out: <Future pending>

In : task = loop.create_task(set_after(fut))  # 在事件循环中创建(或者说注册)了一个任务

In : task  # 马上输入它,此时刚创建任务,还在执行中
Out: <Task pending name='Task-3044' coro=<set_after() running at <ipython-input-51-1fd5c9e97768>:2> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1054d32b0>()]>>

In : fut  # 马上输入它,此时刚创建任务,还没有执行完所以future没有变化
Out: <Future pending>

In : task  # 过了三秒,任务执行完成了
Out: <Task finished name='Task-3044' coro=<set_after() done, defined at <ipython-input-51-1fd5c9e97768>:1> result=None>

In : fut  # Future也已经设置了结果,所以状态是finished
Out: <Future finished result='Done'>

可以感受到:

  1. Future 对象不是任务,就是存放状态的一个容器
  2. create_task 会让事件循环调度协程的执行
  3. 创建任务可以用 ensure_futurecreate_taskensure_future 是一个更高级封装的函数,但是 Python3.7 以上版本应该使用 create_task

接着是了解 await 的作用。如果协程中 await 一个 Future 对象,Task 会暂停协程的执行并等待 Future 的完成。而当 Future 完成后,包装协程的执行将继续:

In : async def a():
...:     print('IN a')
...:     await b()
...:     await c()
...:     print('OUT a')
...:

In : async def b():
...:     print('IN b')
...:     await d()
...:     print('OUT b')
...:
...:

In : async def c():
...:     print('IN c')
...:     await asyncio.sleep(1)
...:     print('OUT c')
...:
...:

In : async def d():
...:     print('IN d')
...:     await asyncio.sleep(1)
...:     print('OUT d')
...:

In : asyncio.run(a())
IN a
IN b
IN d
OUT d
OUT b
IN c
OUT c
OUT a

这个例子中,a 的入口函数,其中调用 b 和 c,b 又会调用 d。await 会让对应的协程获取执行权限,协程内 await 的其他协程都执行完毕才会释放权限,所以注意这个更像 DFS (深度优先搜索),所以执行顺序是 a->b->d->c。

所以这里就得出结论:

事件循环负责协程的协作调度:事件循环一次运行一个任务。 当一个任务等待一个 Awaitables 对象完成时,事件循环会运行其他任务、回调或执行 IO 操作。

asyncio 方案

在 asyncio 方案里,凡是涉及 I/O 阻塞操作的库都要使用 aio 生态中的库,所以已经不能再使用 requests 库,而是需要使用 aiohttp,另外文件操作需要使用 aiofiles。最终代码如下 (这个 2 个包需要下载再使用):

import aiofiles
import asyncio
import aiohttp

async def fetch(session, page):
    r = await session.get(f'{url}{page*25}', headers=headers)
    async with aiofiles.open(f'top250-{page}.html', mode='w') as f:
        await f.write(await r.text())

async def main():
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession(loop=loop) as session:
        tasks = [asyncio.ensure_future(fetch(session, p)) for p in range(25)]
        await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main())

看一下效率:

 time python use_asyncio.py
python use_asyncio.py  0.20s user 0.04s system 34% cpu 0.684 total

所以 asyncio 的优点如下:

  1. asyncio 用好了,是这些并发方案中最快的
  2. 它支持数千级别的活动连接 ,这对于 websockets 和 MQTT 之类的场景下性能可以表现的很好,而多线程方案中在这个规模的线程数量下会出现严重的性能问题。
  3. 多线程方案下线程切换是隐式的,我们无法确认它何时会切换线程的执行权,所以非常容易出现竞态条件 (Race Condition)。而 asyncio 方案里协程的切换是显式、明确的,开发者可以明确地获知或者指定执行的顺序

并发和并行

我之前翻到了一个对比这些方案的说法 (延伸链接 4),其中也提到了并发和并行,说的特别形象,我加以说明:

  1. 多进程。10 个厨房,10 个厨子,10 道菜。也是 1 个厨房 1 厨子做 1 道菜。
  2. 多线程。1 个厨房,10 个厨子,10 道菜。因为厨房比较小,只能大家一起挤在里面,事实上是轮着做,而且一个厨师在做的时候其他人只能等着轮到自己,轮换厨师后,之前的那个厨师没做完的一小部分任务可以继续完成。
  3. asyncio。1 个厨房,1 个厨子,10 道菜。听起来好像这就是一个顺序执行,但事实上,当某道菜需要炖或者其他什么耗时的烹饪方法时,可以同时做其他的菜或者做准备,最美好的场景是这个厨师一直在忙着做。

对于并发和并行我推荐看一下延伸阅读连接 3 的文章。并发 (Concurrency) 允许同时执行多个任务,这些任务可能访问相同的共享资源,例如硬盘、网络以及对应的那个单核 CPU。既然会出现访问共享资源,就可能出现竞态条件,所以某个时间点事实上只有一个任务在执行,在本质上目标是当一个任务被迫等待外部资源时,通过在它们之间切换来防止任务相互阻塞,系统会有机制保证这些任务都在推进。并行 (Parallelism) 是指多个任务在独立分区的资源 (如多个 CPU 内核) 上并行运行,这样可以最大限度地利用硬件资源。

代码目录

本文代码可以在 mp 项目 找到

延伸阅读

  1. https://speakerdeck.com/dabeaz/understanding-the-python-gil
  2. https://www.datacamp.com/tutorial/python-global-interpreter-lock
  3. https://www.infoworld.com/article/3632284/python-concurrency-and-parallelism-explained.html
  4. https://leimao.github.io/blog/Python-Concurrency-High-Level/