我们今天继续深入学习 asyncio。

同步机制

asyncio 模块包含多种同步机制,每个原语的解释可以看 线程篇 ,这些原语的用法上和线程 / 进程有一些区别。

Semaphore(信号量)

并发的去爬取显然可以让爬虫工作显得更有效率,但是我们应该把抓取做的无害,这样既可以保证我们不容易发现,也不会对被爬的网站造成一些额外的压力。

在这里吐槽下,豆瓣现在几乎成了爬虫练手专用网站,我个人也不知道为啥?欢迎留言告诉我。难道是豆瓣一直秉承尊重用户的原则不轻易对用户才去封禁策略,造成大家觉得豆瓣最适合入门么?BTW,我每天在后台都能看到几十万次无效的抓取,也就是抓取程序写的有问题,但还在不停地请求着...

好吧回到正题,比如我现在要抓取http://httpbin.org/get?a=X这样的页面,X为1-10000的数字,一次性的产生1w次请求显然很快就会被封掉。那么我们可以用Semaphore控制同时的并发量(例子中为了演示,X为0-11):

import aiohttp
import asyncio

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)


async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return data['args']['a']


async def print_result(a):
    with (await sema):
        r = await fetch_async(a)
        print('fetch({}) = {}'.format(a, r))


loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)

在运行的时候可以感受到并发受到了信号量的限制,基本保持在同时处理三个请求的标准。

Lock(锁)

看下面的例子:

 cat lock.py
import asyncio
import functools


def unlock(lock):
    print('callback releasing lock')
    lock.release()


async def test(locker, lock):
    print('{} waiting for the lock'.format(locker))
    with await lock:
        print('{} acquired lock'.format(locker))
    print('{} released lock'.format(locker))


async def main(loop):
    lock = asyncio.Lock()
    await lock.acquire()
    loop.call_later(0.1, functools.partial(unlock, lock))
    await asyncio.wait([test('l1', lock), test('l2', lock)])

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

这个例子中我们首先使用 acquire 加锁,通过 call_later 方法添加一个 0.1 秒后释放锁的函数。看一下调用:

❯ python3 lock.py
l1 waiting for the lock
l2 waiting for the lock
callback releasing lock
l1 acquired lock
l1 released lock
l2 acquired lock
l2 released lock

Condition(条件)

我们根据线程篇 Condition 的例子,改成一下:

import asyncio
import functools


async def consumer(cond, name, second):
    await asyncio.sleep(second)
    with await cond:
        await cond.wait()
        print('{}: Resource is available to consumer'.format(name))



async def producer(cond):
    await asyncio.sleep(2)
    for n in range(1, 3):
        with await cond:
            print('notifying consumer {}'.format(n))
            cond.notify(n=n)
        await asyncio.sleep(0.1)

async def producer2(cond):
    await asyncio.sleep(2)
    with await cond:
        print('Making resource available')
        cond.notify_all()

async def main(loop):
    condition = asyncio.Condition()

    task = loop.create_task(producer(condition))
    consumers = [consumer(condition, name, index)
                 for index, name in enumerate(('c1', 'c2'))]
    await asyncio.wait(consumers)
    task.cancel()

    task = loop.create_task(producer2(condition))
    consumers = [consumer(condition, name, index)
                 for index, name in enumerate(('c1', 'c2'))]
    await asyncio.wait(consumers)
    task.cancel()


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

这次演示了 2 种通知的方式:

  1. 使用 notify 方法挨个通知单个消费者
  2. 使用 notify_all 方法一次性的通知全部消费者

由于 producer 和 producer2 是异步的函数,所以不能使用之前 call_later 方法,需要用 create_task 把它创建成一个任务(Task)。但是最后记得要把任务取消掉。

执行以下看看效果:

❯ python3 condition.py
notifying consumer 1
c1: Resource is available to consumer
notifying consumer 2
c2: Resource is available to consumer
Making resource available
c1: Resource is available to consumer
c2: Resource is available to consumer

Event(事件)

模仿锁的例子实现:

import asyncio
import functools


def set_event(event):
    print('setting event in callback')
    event.set()


async def test(name, event):
    print('{} waiting for event'.format(name))
    await event.wait()
    print('{} triggered'.format(name))


async def main(loop):
    event = asyncio.Event()
    print('event start state: {}'.format(event.is_set()))
    loop.call_later(
        0.1, functools.partial(set_event, event)
    )
    await asyncio.wait([test('e1', event), test('e2', event)])
    print('event end state: {}'.format(event.is_set()))

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

看起来也确实和锁的意思很像,不同的是,事件被触发时,2 个消费者不用获取锁就要尽快的执行下去了。

Queue

在 asyncio 官网上已经举例了 2 个很好的 队列例子 了,这文就不重复了。asyncio 同样支持 LifoQueue 和 PriorityQueue,我们体验下 aiohttp + 优先级队列的用法吧:

import asyncio
import random
import aiohttp

NUMBERS = random.sample(range(100), 7)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(3)


async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return data['args']['a']


async def collect_result(a):
    with (await sema):
        return await fetch_async(a)


async def produce(queue):
    for num in NUMBERS:
        print('producing {}'.format(num))
        item = (num, num)
        await queue.put(item)


async def consume(queue):
    while 1:
        item = await queue.get()
        num = item[0]
        rs = await collect_result(num)
        print('consuming {}...'.format(rs))
        queue.task_done()


async def run():
    queue = asyncio.PriorityQueue()
    consumer = asyncio.ensure_future(consume(queue))
    await produce(queue)
    await queue.join()
    consumer.cancel()


loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

看到使用了新的 ensure_future 方法,其实它和之前说的 create_task 意思差不多,都是为了把一个异步的函数变成一个协程的 Task。它们的区别是:

  1. create_task 是 AbstractEventLoop 的抽象方法,不同的 loop 可以实现不同的创建 Task 方法,这里用的是 BaseEventLoop 的实现。
  2. ensure_future 是 asyncio 封装好的创建 Task 的函数,它还支持一些参数,甚至指定 loop。一般应该使用它,除非用到后面提到的 uvloop 这个第三方库。

这个例子中,首先我们从 0-99 中随机取出 7 个数字,放入优先级队列,看看消费者是不是按照从小到大的顺序执行的呢?

❯ python3 prioqueue.py
producing 6
producing 4
producing 22
producing 48
producing 9
producing 90
producing 40
consuming 4...
consuming 6...
consuming 9...
consuming 22...
consuming 40...
consuming 48...
consuming 90...

确实是这样的。

说到这里,我们稍微偏个题,看看 Task 是什么?

深入 Task

Task 类用来管理协同程序运行的状态。根据源码,我保留核心,实现一个简单的 Task 类帮助大家理解:

import asyncio


class Task(asyncio.futures.Future):
    def __init__(self, gen, *,loop):
        super().__init__(loop=loop)
        self._gen = gen
        self._loop.call_soon(self._step)

    def _step(self, val=None, exc=None):
        try:
            if exc:
                f = self._gen.throw(exc)
            else:
                f = self._gen.send(val)
        except StopIteration as e:
            self.set_result(e.value)
        except Exception as e:
            self.set_exception(e)
        else:
            f.add_done_callback(
                 self._wakeup)

    def _wakeup(self, fut):
        try:
            res = fut.result()
        except Exception as e:
            self._step(None, e)
        else:
            self._step(res, None)

如果_step 方法没有让协程执行完成,就会添加回调,_wakeup 又会继续执行_step... 直到协程程序完成,并 set_result。

写个使用它的例子:

async def foo():
    await asyncio.sleep(2)
    print('Hello Foo')


async def bar():
    await asyncio.sleep(1)
    print('Hello Bar')


loop = asyncio.get_event_loop()
tasks = [Task(foo(), loop=loop),
         loop.create_task(bar())]
loop.run_until_complete(
        asyncio.wait(tasks))
loop.close()

第一个任务是用我们自己的 Task 创建的,第二个是用 BaseEventLoop 自带的 create_task。

运行一下:

 python3 task.py
Hello Bar
Hello Foo

自定义的 Task 类和 asyncio 自带的是可以好好协作的。

深入事件循环

asyncio 根据你的操作系统信息会帮你选择默认的事件循环类,在*nix 下使用的类继承于 BaseEventLoop,在上面已经提到了。和 Task 一样,我们剥离出一份最核心的实现:

import asyncio
from collections import deque


def done_callback(fut):
    fut._loop.stop()


class Loop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def create_task(self, coro):
        Task = asyncio.tasks.Task
        task = Task(coro, loop=self)
        return task

    def run_until_complete(self, fut):
        tasks = asyncio.tasks
        # 获取任务
        fut = tasks.ensure_future(
                    fut, loop=self)
        # 增加任务到self._ready
        fut.add_done_callback(done_callback)
        # 跑全部任务
        self.run_forever()
        # 从self._ready中移除
        fut.remove_done_callback(done_callback)

    def run_forever(self):
        try:
            while 1:
                self._run_once()
                if self._stopping:
                    break
        finally:
            self._stopping = False

    def call_soon(self, cb, *args):
        self._ready.append((cb, args))

    def _run_once(self):
        ntodo = len(self._ready)
        for i in range(ntodo):
            t, a = self._ready.popleft()
            t(*a)

    def stop(self):
        self._stopping = True

    def close(self):
        self._ready.clear()

    def call_exception_handler(self, c):
        pass

    def get_debug(self):
        return False

其中 call_exception_handler 和 get_debug 是必须存在的。

写个例子用一下:

async def foo():
    print('Hello Foo')


async def bar():
    print('Hello Bar')

loop = Loop()
tasks = [loop.create_task(foo()),
         loop.create_task(bar())]
loop.run_until_complete(
        asyncio.wait(tasks))
loop.close()

执行:

❯ python3 loop.py
Hello Foo
Hello Bar

也可以和 asyncio.wait 正常协作了。

PS:本文全部代码可以在 微信公众号文章代码库项目 中找到。