asyncio

cooolr 于 2023-01-30 发布

asyncio库: 使用事件循环驱动的协程实现并发

线程与协程对比

asyncio包使用的”协程”是较严格的定义。适合asyncio API的协程在定义体中必须使用yield from, 而不能使用yield. 此外,适合asyncio的协程要由调用方驱动,并由调用方通过yield from调用; 或者把协程传给asyncio包中的某个函数,例如asyncio.async(), 从而驱动协程。最后,@asyncio.coroutine装饰器应该用在协程上。

示例 18-2 通过协程以动画形式显示文本式旋转指针

import asyncio
import itertools
import sys

@asyncio.coroutine
def spin(msg):
    write,flush=sys.stdout.write,sys.stdout.flush
    for char in itertools.cycle('|/-\\\\'):
        status=char+' '+msg
        write(status)
        flush()
        write('\\x08'*len(status))
        try:
            yield from asyncio.sleep(0.1) #3
        except asyncio.CancelledError:    #4
            break
    write(' '*len(status)+'\\x08'*len(status))

@asyncio.coroutine
 def slow_function():
      yield from asyncio.sleep(3)
      return 42

@asyncio.coroutine
def supervisor():
    spinner=asyncio.async(spin('thinking!')) #8
    print('spinner object:',spinner)
    result=yield from slow_function()
    spinner.cancel() #11
    return result

def main():
    loop=asyncio.get_event_loop() #12
    result=loop.run_until_complete(supervisor()) #13
    loop.close()
    print('Answer:',result)

if __name__=='__main__':
    main()

3 使用yield from asyncio.sleep(0.1)代替time.sleep(0.1), 这样的休眠不会阻塞事件循环。 4 如果spin函数苏醒后抛出asyncio.CancelledError异常,其原因是发出了取消请求,因此退出循环。 8 asyncio.async(…)函数排定spin协程的运行时间,使用一个Task对象(Future的子类)包装spin协程,并立即返回。 11 Task对象可以取消;取消后会在协程当前暂停的yield处抛出asyncio.CancelledError异常,协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消。 12 获取时间循环的引用。 13 驱动supervisor协程,让它运行完毕,这个协程的返回值是这次调用的返回值。

# asyncio.Task对象差不多与threading.Thread对象等效。 # Task对象不由自己动手实例化,而是通过把协程传给asyncio.async(…)函数或loop.create_task(…)方法获取。 # 获取的Task对象已经排定了运行时间(例如,由asyncio.async函数排定). # 如果想终止任务,可以使用Task.cancel()实例方法,在协程内部抛出CancelledError异常。协程可以在暂停的yield处捕获这个异常,处理终止请求。 # supervisor协程必须在main函数中由loop.run_until_complete方法执行。 # 协程自身就会同步,因为在任何时刻只有一个协程运行。想要交出控制权时,可以使用yield或yield from把控制权交还给调度程序。这就是能够安全地取消协程地原因:按照定义,协程只能在暂停地yield处取消,因此可以处理CancelledError异常,执行清理操作。

asyncio.Future与concurrent.futures.Future

如上所述,future只是调度执行某物地结果。在asyncio包中,BaseEventLoop.create_task(…)方法接受一个协程,排定它地运行时间,然后返回一个asyncio.Task实例—-也是asyncio.Future类的实例,因为Task是Future的子类,用于包装协程。这与调用Executor.submit(…)方法创建concurrent.futures.Future实例是一个道理。

获取asyncio.Future对象的结果通常使用yield from,从中产出结果。而不是.result()方法。 使用yield from处理future,等待future运行完毕这一步无需我们关心,而且不会阻塞事件循环,因为在asyncio包中,yield from的作用就是把控制权还给事件循环。

从future、任务和协程中产出

在asyncio包中,future和协程关系紧密,因为可以使用yield from从asyncio.Future对象中产出结果。这意味着,如果foo是协程函数(调用后返回协程对象), 抑或是返回Future或Task实例的普通函数,那么可以这样写: res=yield from foo(). 这是asyncio包的API中很多地方可以互换协程与future的原因之一。

为了执行这些操作,必须排定协程的运行时间,然后使用asyncio.Task对象包装协程,对协程来说,获取Task对象有两种主要方式。

BaseEventLoop.create_task(coro) 这个方法排定协程的执行时间,返回一个asyncio.Task对象,如果在自定义的BaseEventLoop子类调用,返回的对象可能是外部库中与Task类兼容的某个类的实例。

asyncio包中有多个函数会自动(内部使用的是asyncio.async函数)把参数指定的协程包装在asyncio.Task对象中,例如BaseEventLoop.run_until_complete(…)方法。

使用asyncio和aiohttp包下载

示例 18-5 使用asyncio和aiohttp包实现的异步下载脚本

import asyncio
import aiohttp

BASE_URL='<http://flupy.org/data/flags>'
CC=['CN','IN','US','ID','BR','PK','NG','BD','RU','JP','MX',    'PH','VN','ET','EG','DE','IR','TR','CD''FR']

@asyncio.coroutine
def get_flag(cc):
    url='{}/{cc}/{cc}.gif'.format(BASE_URL,cc=cc.lower())
    with aiohttp.ClientSession() as session:
        resp=yield from session.get(url)
        image=yield from resp.text()
        return image

@asyncio.coroutine
def download_one(cc):
    image=yield from get_flag(cc)
    print(cc,end=' ')
    with open('{}.gif'.format(cc),'wb') as f:
        f.write(image)
    return cc

def download_many():
    loop=asyncio.get_event_loop()
    to_do=[download_one(cc) for cc in CC]
    wait_coro=asyncio.wait(to_do)  #10
    res,_=loop.run_until_complete(wait_coro) #11
    loop.close()
    return len(res)

if __name__=='__main__':
    download_many()

10 虽然函数的名称是wait, 但它不是阻塞型函数。wait是一个协程,等传给它的所有协程运行完毕后结束(这是wait的默认行为) 11 执行事件循环,直到wait_coro运行结束;事件循环运行的过程中,这个脚本会在这里阻塞。

asyncio.wait(…)协程的参数是一个由future或协程构成的可迭代对象;wait会分别把各个协程包装进一个Task对象。最终的结果是,wait处理的所有对象都通过某种方式变成Future类的实例。wait是协程函数,因此返回的是一个协程或生成器对象;wait_coro变量中存储的正是这种对象。为了驱动协程,我们把协程传给loop.run_until_complete(…)方法。

loop.run_until_complete方法的参数是一个future或协程。如果是协程,run_until_complete方法与wait函数一样,把协程包装进一个Task对象中。协程、future和任务都能有yield from驱动,这正是run_until_complete方法对wait函数返回的wait_coro对象做的事。wait_coro运行结束后返回一个元组,第一个元素是一系列结束的future, 第二个元素是一系列未结束的future。在示例18-5中,第二个元素始终为空,因此我们把它赋值给-,将其忽略. 但是wait函数有两个关键字参数,如果设定了可能会返回未结束的future, 这两个参数是timeout和return-when.

我们编写的协程链条始终通过把最外层委派生成器传给asyncio包API中的某个函数(如loop.run_until_complete)驱动。也就是说,使用asyncio包时,我们编写的代码不通过调用next(…)或.send(…)方法驱动协程—-这一点由asyncio包实现的事件循环去做。

我们编写的协程链条最终通过yield from把职责委派给asyncio包中的某个协程函数或协程方法(例如yield from asyncio.sleep(1)),或者其他库中实现的高层协议的协程(例如resp=yield from aiohttp.request(‘GET’,url)).

也就是说,最内层的子生成器是库中真正执行I/O操作的函数,而不是我们自己编写的函数。

概括起来就是,使用asyncio包时,我们编写的异步代码中包含由asyncio本身驱动的协程(即委派生成器),而生成器最终把职责委托给asyncio包或第三方库(aiohttp)中的协程。这种方式相当于架起了管道,让asyncio事件循环(通过我们编写的协程)驱动执行低层异步I/O操作的库函数。

使用asyncio.as_completed函数

在示例18-5中,把一个协程列表传给asyncio.wait函数,经由loop.run_until_complete方法驱动,全部协程运行完毕后,这个函数会返回所有下载结果。可是,为了更新进度条,各个协程运行结束后就要立即获取结果。在线程版示例中为了集成进度条,我们使用的是as_completed生成器函数,幸好,asyncio包提供了这个生成器函数的相应版本。

为了使用asyncio包实现flags示例,我们要重写几个函数,重写后的函数可以供concurrent.futures版重用。之所以要重写,是因为在使用asyncio包的程序中只有一个主线程,而在这个线程中不能有阻塞型调用,因为事件循环也在这个线程中运行。所以,要重写get_flags函数,使用yield from访问网络。现在,由于get_flags是协程,download_one函数必须使用yield from驱动它,因此download_one自己也要变成协程。

之前,在示例18-5中,download_one由download_many驱动:download_one函数由asyncio.wait函数调用,然后传给loop.run_until_complete方法。 现在,为了报告进度并处理错误,我们要更精确地控制,所以把download_many函数中大多数逻辑移到一个新的协程download_core中,只在download_many函数中设置事件循环,以及调度download_coro协程。

示例 18-7

import asyncio
import aiohttp
import tqdm

BASE_URL='<http://flupy.org/data/flags>'
CC=['CN','IN','US','ID','BR','PK','NG','BD','RU','JP','MX',    'PH','VN','ET','EG','DE','IR','TR','CD''FR']

def save_flag(cc,image):
    with open('{}.gif'.format(cc),'wb') as f:
        f.write(image)

@asyncio.coroutine
def get_flags(cc):
    url='{}/{cc}/{cc}.gif'.format(BASE_URL,cc=cc.lower())
    session=aiohttp.ClientSession()
    try:
        resp=yield from session.get(url)
        image=yield from resp.text()
    except:
        image=None
    finally:
        session.close()
    return image

@asyncio.coroutine
def download_one(cc,semaphore):
    with (yield from semaphore):
        image=yield from get_flags(cc)
        print(cc,end=' ')
        save_flag(cc,image)
        return cc

@asyncio.coroutine
def download_coro():
    semaphore=asyncio.Semaphore(20) #1
    to_do=[download_one(cc,semaphore) for cc in CC]
    item_coro=asyncio.as_completed(to_do) #2
    rets=[]
    for future in tqdm.tqdm(item_coro,total=len(to_do)): #3
        ret=yield from future #4
        rets.append(ret)
    return rets

def download_many():
    loop=asyncio.get_event_loop()
    ret=loop.run_until_complete(download_coro())
    print(len(ret))
    loop.close()

if __name__=='__main__':
    download_many()

1 创建一个asyncio.Semaphore实例,最多允许激活20个协程 2 获取一个迭代器,这个迭代器会在future运行结束后返回future 3 把迭代器传给tqdm函数,显示进度 4 获取asyncio.Future对象的结果,最简单的方法是使用yield from, 而不是调用future.result()方法

使用Executor对象,防止阻塞事件循环

访问本地文件系统会阻塞,在示例18-7中,保存文件会阻塞,保存文件阻塞了客户代码与asyncio事件循环共用的唯一线程,因此保存文件时,整个应用程序都会冻结。 这个问题的解决方法是,使用事件循环对象run_in_executor方法。

asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用的对象发给它执行。若想在示例中使用这个功能,download_one协程只有几行代码需要改动。


@asyncio.coroutine
def download_one(cc,semaphore):
    with (yield from semaphore):
        image=yield from get_flags(cc)
        print(cc,end=' ')
        loop=asyncio.get_event_loop() #1
        loop.run_in_executor(None,save_flag,cc,image) #2
        return cc

run_in_executor方法的第一个参数是Executor实例;如果设为None,使用事件循环的默认ThreadPoolExecutor实例。余下的参数是可调用的对象以及参数。

not end…