由于 asyncio 有非常多的内容,且对 Python 工程师非常重要,我将分为三篇文章来介绍它。本篇还不是关于使用 asyncio 进行网络编程的文章,而是继续并发主题,看看使用 asyncio 怎么实现高效的并发程序。

前言

在 Python 2 的时代,高性能的网络编程主要是使用 Twisted、Tornado 和 Gevent 这三个库,但是它们的异步代码相互之间既不兼容也不能移植。如上一节说的,Gvanrossum 希望在 Python 3 实现一个原生的基于生成器的协程库,其中直接内置了对异步 IO 的支持,这就是 asyncio,它在 Python 3.4 被引入到标准库。

Python 3.5 添加了 async 和 await 这两个关键字,分别用来替换asyncio.coroutineyield from。自此,协程成为新的语法,而不再是一种生成器类型了。事件循环与协程的引入,可以极大提高高负载下程序的 I/O 性能。除此之外还增加了async with(异步上下文管理)、async for(异步迭代器) 语法。特别说的是,在新发布的 Python 3.6 里面终于可以用 异步生成器 了! / 顺便说一下 Twisted。虽然在之前的公司 Twisted 使用的还挺广泛,它的 Reactor、Factory、Deferred、Protocol 等编程的思想很有启发性,在当时已经非常先进了,而 asyncio 也借鉴了一部分。但是它太重、大量的回调(Javascript 工程师很容易接受,比如 Deferred,小明我不喜欢)、没有及时更新的中文相关的技术文档和书籍所以学习曲线较高、没有更多的公司出来分享对应的实践,再加上协程的冲击,最近 1-2 年已经很少看到它的身影,不建议新人再去学习它了。

并发哪家强

首先需要明确一点,asyncio 使用单线程、单个进程的方式切换(通常程序等待读或写数据时就是切换上下文的时机),那这样效率高嘛?

实践是检验真理的唯一标准。我们用之前介绍的 concurrent.futures 和 asyncio 分别试验下。当然下面例子的结果仅供参考,因为无法保证被请求的网站的服务水平,这会造成对结果或多或少有影响,可以多跑几次综合的来看。

requests + ThreadPoolExecutor

import time
import requests
from concurrent.futures import ThreadPoolExecutor

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'

def fetch(a):
    r = requests.get(URL.format(a))
    return r.json()['args']['a']

start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
    for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)):
        print('fetch({}) = {}'.format(num, result))

print('Use requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

非常正统的方式,运行的效果如下:

❯ python3 scraper_thread.py
fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use requests+ThreadPoolExecutor cost: 6.493273019790649

注意,和 ThreadPoolExecutor 有关的实现都在 scraper_thread.py 一个文件中,让网络状态的影响更小,运行的效果也是在一次运行之后截取的。

asyncio + requests + ThreadPoolExecutor

现在我们加入 asyncio 再试试:

import asyncio

async def run_scraper_tasks(executor):
    loop = asyncio.get_event_loop()

    blocking_tasks = []
    for num in NUMBERS:
        task = loop.run_in_executor(executor, fetch, num)
        task.__num = num
        blocking_tasks.append(task)

    completed, pending = await asyncio.wait(blocking_tasks)
    results = {t.__num: t.result() for t in completed}
    for num, result in sorted(results.items(), key=lambda x: x[0]):
        print('fetch({}) = {}'.format(num, result))

start = time.time()
executor = ThreadPoolExecutor(3)
event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(
    run_scraper_tasks(executor)
)

print('Use asyncio+requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))

如果之前没有使用过 asyncio 的同学可能看完就一脸😳了。我们之前编写多进程和多线程代码的时候,会感觉和我们的线性思维方法是一致的,所以写起来很舒服,理解和维护也相对容易。大家要做好一些心理准备,Python 核心开发们在努力让 Python 开发者用同步编程的方式去写异步代码, 但是还是需要调整一下心态,做好迎接新的写法的准备。我来分析下这个例子:

  1. 当我们给一个函数添加了 async 关键字,就会把它变成一个异步函数。
  2. 每个线程有一个事件循环,主线程调用 asyncio.get_event_loop 时会创建事件循环,你需要把异步的任务丢给这个循环的 run_until_complete 方法,事件循环会 安排协同程序的执行 。和方法名字一样,异步的任务完成方法才会就执行完成了。
  3. 为了在 asyncio 中使用 concurrent.futures 的执行器,我这用到了 run_in_executor,它可以接收要 同步 执行的任务。
  4. 给 task 设置__num 属性,是因为后面的 completed 中的 Future 对象只包含结果,但是我们并不知道 num 是什么,所以 hack 了下,之后的例子中会有其他的方案,本文是给大家提供各种解题的思路,在合适的场景还是有用处的。
  5. await asyncio.wait (blocking_tasks) 就是协同的执行那些同步的任务,直到完成。
  6. 最后根据__num 找到和执行结果的对应关系,排序然后打印结果。

有一点得强调:async/await 是 Python 提供的异步编程 API,而 asyncio 只是一个利用 async/await API 进行异步编程的框架

运行一下看看性能有没有提升:

fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use asyncio+requests+ThreadPoolExecutor cost: 6.142597913742065

多跑几次可以发现和 requests+ThreadPoolExecutor 相比没有什么优势,就像是封装了一层,是有损耗的。

讲到这里,我们想想为什么 asyncio 的强大优势没有显示出来?

现存的一些库其实并不能原生的支持 asyncio(因为会发生阻塞或者功能不可用),比如 requests,如果要写爬虫,配合 asyncio 的应该用 aiohttp,其他的如数据库驱动等各种 Python 对应的库也都得使用对应的 aioXXX 版本了。

asyncio + aiohttp

我们看一下使用 aiohttp 会发生什么。第一步就是把 fetch 函数改成异步的:

import aiohttp


async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return data['args']['a']

其实看起来和 requests 的接口差不多,只是你要熟悉这种编程模式就好了。

start = time.time()
event_loop = asyncio.get_event_loop()
tasks = [fetch_async(num) for num in NUMBERS]
results = event_loop.run_until_complete(asyncio.gather(*tasks))

for num, result in zip(NUMBERS, results):
    print('fetch({}) = {}'.format(num, result))

代码比上个例子简单不少,这里需要注意,asyncio.gather 可以按顺序搜集异步任务执行的结果,我们就不需要用到之前提过的__num(而且也 hack 不了,因为 fetch_async 是一个生成器,不能那样添加属性)

希望能进行协程切换的地方,就需要使用 await 关键字。如上的例子中 r.json 方法会等待 I/O(也就是正在做一个网络请求),这种就可以切换去做其他的时候,之后再切换回来。

运行一下:

fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
Use asyncio+aiohttp cost: 1.8903069496154785

有木有亮瞎眼,3 倍的提升!!!

asyncio + aiohttp + ThreadPoolExecutor

接着我们再加回 ThreadPoolExecutor。之前说 asyncio 是单线程单进程的,那么我多线程同时运行,会不会翻倍 ╰(°▽°)╯

作为工程师,有想法就实践来验证下。写代码之前我们回忆一下,XXExecutor 其实就是封装了队列,但是由于 run_in_executor 并不能传入异步的函数,我们不能按照例子 2 来用。独立使用队列其实效果应该和 ThreadPoolExecutor 差不多,那我们可不可以把任务平均切分一下,尽量让每个线程拿到的任务差不多。这就是我选择NUMBERS = range(12)的原因:可以均分。

async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return a, data['args']['a']


def sub_loop(numbers):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    tasks = [fetch_async(num) for num in numbers]
    results = loop.run_until_complete(asyncio.gather(*tasks))
    for num, result in results:
        print('fetch({}) = {}'.format(num, result))


async def run(executor, numbers):
    await asyncio.get_event_loop().run_in_executor(executor, sub_loop, numbers)


def chunks(l, size):
    n = math.ceil(len(l) / size)
    for i in range(0, len(l), n):
        yield l[i:i + n]                                                     

event_loop = asyncio.get_event_loop()
tasks = [run(executor, chunked) for chunked in chunks(NUMBERS, 3)]
results = event_loop.run_until_complete(asyncio.gather(*tasks))

print('Use asyncio+aiohttp+ThreadPoolExecutor cost: {}'.format(time.time() - start))

我在解释下这个例子中的几点:

  1. 现在任务被拆分,不能用 zip (NUMBERS, results) 的方式拿到正确的 num 和结果的对应关系了,也由于由于不能给 fetch_async 加一个__num 的属性随意直接改了任务的返回值,把 num 也返回了
  2. chunks 是一个给任务分组的函数,分三份是因为 ThreadPoolExecutor 用了三个线程。
  3. 非主线程不能使用主线程的事件循环对象,所以在 sub_loop 中我对重新设置了新的对象。

见证奇迹的时刻到了:

fetch(8) = 8
fetch(9) = 9
fetch(10) = 10
fetch(11) = 11
fetch(0) = 0
fetch(1) = 1
fetch(2) = 2
fetch(3) = 3
fetch(4) = 4
fetch(5) = 5
fetch(6) = 6
fetch(7) = 7
Use asyncio+aiohttp+ThreadPoolExecutor cost: 2.66983699798584

╮(╯_╰)╭ 忧伤,就算最后没有对结果排序,依然慢了一些。还是 asyncio+aiohttp 最好了。

使用 ProcessPoolExecutor 会怎么样?

为了验证多进程模式下的上述实验的效果,我找了一台服务器,把 ThreadPoolExecutor 都替换成 ProcessPoolExecutor。我就直接贴答案了:

> python3 scraper_process.py
...
Use requests+ProcessPoolExecutor cost: 2.2943034172058105
...
Use asyncio+requests+ThreadPoolExecutor cost: 2.609675407409668
...
Use asyncio+aiohttp cost: 0.6706254482269287
...
Use asyncio+aiohttp+ThreadPoolExecutor cost: 1.690920352935791

结论就是在 Python 3,请直接原生的使用 asyncio吧。

深入 asyncio

首先我们先补充点基础知识。先说「10K 问题怎么解决」

10K 问题怎么解决

在 Nginx 没有流行起来的时候,常被提到一个词 10K(并发 1W)。在互联网的早期,网速很慢、用户群很小需求也只是简单的页面浏览,所以最初的服务器设计者们使用基于进程 / 线程模型,也就是一个 TCP 连接就是分配一个进程 (线程)。谁都没有想到现在 Web 2.0 时候用户群里和复杂的页面交互问题,而现在即时通信和实在实时互动已经很普遍了。那么你设想如果每一个用户都和服务器保持一个(甚至多个)TCP 连接才能进行实时的数据交互,别说 BAT 这种量级的网站,就是豆瓣这种比较小的网站,同时的并发连接也要过亿了。进程是操作系统最昂贵的资源,一台机器无法创建很多进程。如果要创建 10K 个进程,那么操作系统是无法承受的。就算我们不讨论随着服务器规模大幅上升带来复杂度几何级数上升的问题,采用分布式系统,只是维持 1 亿用户在线需要 10 万台服务器,成本巨大,也只有 FLAG、BAT 这样公司才有财力购买如此多的服务器。

为了解决这一问题,出现了「用同一进程 / 线程来同时处理若干连接」的思路,也就是 I/O 多路复用:

  1. select。每个连接对应一个描述符(socket),循环处理各个连接,先查下它的状态,ready 了就进行处理,不 ready 就不进行处理。但是缺点很多:

    1. 单个进程能够监视的文件描述符的数量存在最大限制
    2. 对 socket 进行扫描时是线性扫描,即采用轮询的方法,效率较低。
    3. 需要维护一个用来存放大量的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大
  2. poll。本质上和 select 没有区别,但是由于它是基于链表来存储的,没有最大连接数的限制。缺点是:

    1. 大量的的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
    2. poll 的特点是「水平触发 (只要有数据可以读,不管怎样都会通知)」,如果报告后没有被处理,那么下次 poll 时会再次报告它。
  3. epoll。它使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy 只需一次。epoll 支持水平触发和边缘触发,最大的特点在于「边缘触发」,它只告诉进程哪些刚刚变为就绪态,并且只会通知一次。使用 epoll 的优点很多:

    1. 没有最大并发连接的限制,能打开的 fd 的上限远大于 1024(1G 的内存上能监听约 10 万个端口)
    2. 效率提升,不是轮询的方式,不会随着 fd 数目的增加效率下降
    3. 内存拷贝,利用 mmap () 文件映射内存加速与内核空间的消息传递;即 epoll 使用 mmap 减少复制开销

因为 Linux 是互联网企业中使用率最高的操作系统,epoll 就成为 C10K killer、高并发、高性能、异步非阻塞这些技术的代名词了。FreeBSD 推出了 kqueue,Linux 推出了 epoll,Windows 推出了 IOCP,Solaris 推出了 /dev/poll。这些操作系统提供的功能就是为了解决 C10K 问题。epoll 技术的编程模型就是异步非阻塞回调,也可以叫做 Reactor、事件驱动、事件轮循(EventLoop)、libevent、Tornado、Node.js 这些就是 epoll 时代的产物。

看了上面一段话,是不是感觉对着一坨概念理解的更清晰了呢?Python 3.4 中还新增了一个与 asyncio 配套的新模块:selectors. 这个模块将 select、epoll、kqueue 等等系统级异步 IO 接口抽象成 Selector 类型,规定了统一的对外接口,于是程序只管使用 selector 的接口就行了。一般使用selectors.DefaultSelector就好了,它是这个模块根据你的系统自动帮你选择的最合适的 Selector。

就这样小公司也可以玩高并发了。但是时代在发展,现在大家讨论的都是 10M、100M 这种挑战,而写过 Node.js 都知道异步嵌套回调非常难写,同样的问题也存在于 Twisted:对代码的理解和调试都变得困难,维护性很低。上述的技术已经无能为力了。从前面的演化过程中,我们可以看到,根本的思路是要高效的去阻塞,让 CPU 可以干核心的任务。所以,千万级并发实现的秘密:内核不是解决方案,而是问题所在!

这意味着:

不要让内核执行所有繁重的任务。将数据包处理,内存管理,处理器调度等任务从内核转移到应用程序高效地完成。让 Linux 只处理控制层,数据层完全交给应用程序来处理。

当连接很多时,首先需要大量的进程 / 线程来做事。同时系统中的应用进程 / 线程们可能大量的都处于 ready 状态,需要系统去不断的进行快速切换,而我们知道系统上下文的切换是有代价的。虽然现在 Linux 系统的调度算法已经设计的很高效了,但对于 10M 这样大规模的场景仍然力有不足。

所以我们面临的瓶颈有两个:

  1. 进程 / 线程作为处理单元还是太厚重
  2. 系统调度的代价太高

很自然地,我们会想到,如果有一种更轻量级的进程 / 线程作为处理单元,而且它们的调度可以做到很快(最好不需要锁),那就完美了。这个时候「协程」出现了,下一小节我们继续了解它。

再谈协程

它们在实现上都是试图用一组少量的线程来实现多个任务,一旦某个任务阻塞,则可能用同一线程继续运行其他任务,避免大量上下文的切换。每个协程所独占的系统资源往往只有栈部分。而且,各个协程之间的切换,往往是用户通过代码来显式指定的(跟各种 callback 类似),不需要内核参与,可以很方便的实现异步。

协程本质上也是异步非阻塞技术,它是将事件回调进行了包装,让程序员看不到里面的事件循环。程序员就像写阻塞代码一样简单。比如调用 client->recv () 等待接收数据时,就像阻塞代码一样写。实际上是底层库在执行 recv 时悄悄保存了一个状态,比如代码行数,局部变量的值。然后就跳回到 EventLoop 中了。什么时候真的数据到来时,它再把刚才保存的代码行数,局部变量值取出来,又开始继续执行。

简单的说,进程 / 线程是操作系统充当了 EventLoop 调度,而协程是自己用 epoll 进行调度。

协程是异步非阻塞的另外一种展现形式。Golang,Erlang,Lua 协程都是这个模型。那什么是异步和非阻塞呢?

在网站可以找到很多对 I/O 模型进行对比和解释的文章,推荐阅读知友 严肃 对它们的 理解 (已获得授权):

1. 同步与异步 同步和异步关注的是消息通信机制 (synchronous communication/asynchronous communication) 所谓同步,就是在发出一个 调用 时,在没有得到结果之前,该 调用 就不返回。但是一旦调用返回,就得到返回值了。 换句话说,就是由 调用者 主动等待这个 调用 的结果。

而异步则是相反, 调用 在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在 调用 发出后, 被调用者 通过状态、通知来通知调用者,或通过回调函数处理这个调用。

典型的异步编程模型比如 Node.js,举个通俗的例子: 你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,” 我查一下 ",然后开始查啊查,等查好了(可能是 5 秒,也可能是一天)告诉你结果(返回结果)。 而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过 “回电” 这种方式来回调。

  1. 阻塞与非阻塞 阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

还是上面的例子, 你打电话问书店老板有没有《分布式系统》这本书,你如果是阻塞式调用,你会一直把自己 “挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟 check 一下老板有没有返回结果。 在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关。

事件循环

事件循环是一种处理多并发量的有效方式,在 维基百科 中它被描述为「一种等待程序分配事件或消息的编程架构」,,我们可以定义事件循环来简化使用轮询方法来监控事件。它的意义最通俗的说法就是「当 A 发生时,执行 B」。事件循环利用 poller 对象,使得程序员不用控制任务的添加、删除和事件的控制。事件循环使用回调方法来知道事件的发生。例如,有一个资源描述符 A,当一个写事件在 A 中发生就会调用一个回调函数。如下应用都实现了事件循环:

当然也包含 asyncio,他是 asyncio 提供的「中央处理设备」,支持如下操作:

  • 注册、执行和取消延迟调用(超时)
  • 创建可用于多种类型的通信的服务端和客户端的 Transports
  • 启动进程以及相关的和外部通信程序的 Transports
  • 将耗时函数调用委托给一个线程池

单线程(进程)的架构也避免的多线程(进程)修改可变状态的锁的问题。

参考资料

PS:本文全部代码可以在 微信公众号文章代码库项目 中找到。

我的知乎 Live「 Python 工程师的入门和进阶

无耻的广告: 《Python Web 开发实战》上市了!