聲明:本文針對的是python3.4以后的版本的,因為從3.4開始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。比如在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,但是在3.5以后的版本中使用async、await兩個關(guān)鍵字代替,雖然語法上稍微有所差異,但是原理是一樣的。本文用最通俗的語言解釋了pythonasyncio背后的一些核心概念,簡要解析了asyncio的設(shè)計架構(gòu),并給出了使用python進行asyncio異步編程的一般模板。本文較長:閱讀全文約30min。
一 一些最重要的概念 1.1 協(xié)程(coroutine)——本質(zhì)就是一個 函數(shù) 1.2 事件循環(huán)——event_loop 1.3 什么是awaitable對象——即可暫停 等待的對象 1.4 什么是task任務(wù) 1.5 什么是future? 二 asyncio的基本架構(gòu) 2.1 常見的一些高層API方法 2.2 Task 類詳解 2.3 異步函數(shù)的結(jié)果獲取 三 asyncio異步編程的基本模板
3.1 python3.7之前的版本 3.1.1 例子一:無參數(shù)、無返回值 3.1.2 例子二:有參數(shù)、有返回值
3.1.3 總結(jié):四步走(針對python3.7 之前的版本) 3.2 python3.7版本 3.2.1 例子一:無參數(shù)、無返回值 3.2.2 例子二:有參數(shù)、有返回值 3.2.3 總結(jié):兩步走(針對python3.7)
四 協(xié)程編程的優(yōu)點
協(xié)程(coroutine)——本質(zhì)就是一個函數(shù)
所謂的“協(xié)程”就是一個函數(shù),這個函數(shù)需要有兩個基本的組成要素,第一,需要使用@asyncio.coroutine進行裝飾;第二,函數(shù)體內(nèi)一定要有yield from 返回的的generator,或者是說使用yield from 返回另一個協(xié)程對象。 當(dāng)然,這兩個條件并不是硬性規(guī)定的,如果沒有這兩個條件,依然是函數(shù),只不過是普通函數(shù)而已。 怎么判斷一個函數(shù)是不是協(xié)程?通過asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction(func)加以判斷,返回true,則是。
那么協(xié)程函數(shù)有什么作用呢?
(1)result = yield from future
作用一:返回future的結(jié)果。什么是future?后面會講到。當(dāng)協(xié)程函數(shù)執(zhí)行到這一句,協(xié)程會被懸掛起來,知道future的結(jié)果被返回。如果是future被中途取消,則會觸發(fā)CancelledError異常。由于task是future的子類,后面也會介紹,關(guān)于future的所有應(yīng)用,都同樣適用于task
(2)result = yield from coroutine 等候另一個協(xié)程函數(shù)返回結(jié)果或者是觸發(fā)異常
(3)result= yield from task 返回一個task的結(jié)果
(4)return expression 作為一個函數(shù),他本身也是可以返回某一個結(jié)果的
(5)raise exception 協(xié)程函數(shù),不是像普通函數(shù)那樣直接調(diào)用運行的,必須添加到事件循環(huán)中,然后由事件循環(huán)去運行,單獨運行協(xié)程函數(shù)是不會有結(jié)果的,看一個簡單的例子:
import time import asyncio async def say_after_time(delay,what): await asyncio.sleep(delay) print(what)
async def main(): print(f'開始時間為: {time.time()}') await say_after_time(1,'hello') await say_after_time(2,'world') print(f'結(jié)束時間為: {time.time()}')
loop=asyncio.get_event_loop() #創(chuàng)建事件循環(huán)對象 #loop=asyncio.new_event_loop() #與上面等價,創(chuàng)建新的事件循環(huán) loop.run_until_complete(main()) #通過事件循環(huán)對象運行協(xié)程函數(shù) loop.close()
在python3.6版本中,如果我們單獨像執(zhí)行普通函數(shù)那樣執(zhí)行一個協(xié)程函數(shù),只會返回一個coroutine對象(python3.7)如下所示: >>> main() <coroutine object main at 0x1053bb7c8>
(1)獲取事件循環(huán)對象的幾種方式: 下面幾種方式可以用來獲取、設(shè)置、創(chuàng)建事件循環(huán)對象loop loop=asyncio.get_running_loop() 返回(獲?。┰诋?dāng)前線程中正在運行的事件循環(huán),如果沒有正在運行的事件循環(huán),則會顯示錯誤;它是python3.7中新添加的loop=asyncio.get_event_loop() 獲得一個事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個新的事件循環(huán)loop; loop=asyncio.set_event_loop(loop) 設(shè)置一個事件循環(huán)為當(dāng)前線程的事件循環(huán); loop=asyncio.new_event_loop() 創(chuàng)建一個新的事件循環(huán)
(2)通過事件循環(huán)運行協(xié)程函數(shù)的兩種方式:
①方式一:創(chuàng)建事件循環(huán)對象loop,即asyncio.get_event_loop(),通過事件循環(huán)運行協(xié)程函數(shù) ②方式二:直接通過asyncio.run(function_name)運行協(xié)程函數(shù)。但是需要注意的是,首先run函數(shù)是python3.7版本新添加的,前面的版本是沒有的;其次,這個run函數(shù)總是會創(chuàng)建一個新的事件循環(huán)并在run結(jié)束之后關(guān)閉事件循環(huán),所以,如果在同一個線程中已經(jīng)有了一個事件循環(huán),則不能再使用這個函數(shù)了,因為同一個線程不能有兩個事件循環(huán),而且這個run函數(shù)不能同時運行兩次,因為他已經(jīng)創(chuàng)建一個了。即同一個線程中是不允許有多個事件循環(huán)loop的。 asyncio.run()是python3.7 新添加的內(nèi)容,也是后面推薦的運行任務(wù)的方式,因為它是高層API,后面會講到它與asyncio.run_until_complete()的差異性,run_until_complete()是相對較低層的API。
注意:到底什么是事件循環(huán)?如何理解?
可以這樣理解:線程一直在各個協(xié)程方法之間永不停歇的游走,遇到一個yield from 或者await就懸掛起來,然后又走到另外一個方法,依次進行下去,知道事件循環(huán)所有的方法執(zhí)行完畢。實際上loop是BaseEventLoop的一個實例,我們可以查看定義,它到底有哪些方法可調(diào)用。 什么是awaitable對象——即可暫停等待的對象
有三類對象是可等待的,即 coroutines, Tasks, and Futures. coroutine:本質(zhì)上就是一個函數(shù),一前面的生成器yield和yield from為基礎(chǔ),不再贅述; Tasks: 任務(wù),顧名思義,就是要完成某件事情,其實就是對協(xié)程函數(shù)進一步的封裝; Future:它是一個“更底層”的概念,他代表一個一步操作的最終結(jié)果,因為一步操作一般用于耗時操作,結(jié)果不會立即得到,會在“將來”得到異步運行的結(jié)果,故而命名為Future。 三者的關(guān)系,coroutine可以自動封裝成task,而Task是Future的子類。 如前所述,Task用來 并發(fā)調(diào)度的協(xié)程,即對協(xié)程函數(shù)的進一步包裝?那為什么還需要包裝呢?因為單純的協(xié)程函數(shù)僅僅是一個函數(shù)而已,將其包裝成任務(wù),任務(wù)是可以包含各種狀態(tài)的,異步編程最重要的就是對異步操作狀態(tài)的把控了。
(1)創(chuàng)建任務(wù)(兩種方法):
方法一:task = asyncio.create_task(coro()) # 這是3.7版本新添加的
方法二:task = asyncio.ensure_future(coro())
也可以使用 loop.create_future() loop.create_task(coro) 也是可以的。
備注:關(guān)于任務(wù)的詳解,會在后面的系列文章繼續(xù)講解,本文只是概括性的說明。
(2)獲取某一個任務(wù)的方法:
方法一:task=asyncio.current_task(loop=None) 返回在某一個指定的loop中,當(dāng)前正在運行的任務(wù),如果沒有任務(wù)正在運行,則返回None; 如果loop為None,則默認為在當(dāng)前的事件循環(huán)中獲取,
方法二:asyncio.all_tasks(loop=None) 返回某一個loop中還沒有結(jié)束的任務(wù) Future是一個較低層的可等待(awaitable)對象,他表示的是異步操作的最終結(jié)果,當(dāng)一個Future對象被等待的時候,協(xié)程會一直等待,直到Future已經(jīng)運算完畢。 Future是Task的父類,一般情況下,已不用去管它們兩者的詳細區(qū)別,也沒有必要去用Future,用Task就可以了,返回 future 對象的低級函數(shù)的一個很好的例子是 loop.run_in_executor().
前面介紹了asyncio里面最為核心的幾個概念,如果能夠很好地理解這些概念,對于學(xué)習(xí)協(xié)程是非常有幫助的,但是按照我個人的風(fēng)格,我會先說asyncio的架構(gòu),理解asyncio的設(shè)計架構(gòu)有助于更好地應(yīng)用和理解。
asyncio分為高層API和低層API,我們都可以使用,就像我前面在講matplotlib的架構(gòu)的時候所講的一樣,我們前面所講的Coroutine和Tasks屬于高層API,而Event Loop 和Future屬于低層API。當(dāng)然asyncio所涉及到的功能遠不止于此,我們只看這么多。下面是是高層API和低層API的概覽:
High-level APIs
●Coroutines and Tasks(本文要寫的) ●Streams ●Synchronization Primitives ●Subprocesses ●Queues ●Exceptions
Low-level APIs
●Event Loop(下一篇要寫的) ●Futures ●Transports and Protocols ●Policies ●Platform Support
所謂的高層API主要是指那些asyncio.xxx()的方法
(1)運行異步協(xié)程 asyncio.run(coro, *, debug=False) #運行一個一步程序,參見上面
(2)創(chuàng)建任務(wù) task=asyncio.create_task(coro) #python3.7 ,參見上面 task = asyncio.ensure_future(coro())
(3)睡眠 await asyncio.sleep(delay, result=None, *, loop=None) 這個函數(shù)表示的是:當(dāng)前的那個任務(wù)(協(xié)程函數(shù))睡眠多長時間,而允許其他任務(wù)執(zhí)行。這是它與time.sleep()的區(qū)別,time.sleep()是當(dāng)前線程休息,注意他們的區(qū)別哦。 另外如果提供了參數(shù)result,當(dāng)當(dāng)前任務(wù)(協(xié)程)結(jié)束的時候,它會返回; loop參數(shù)將會在3.10中移除,這里就不再說了。
(4)并發(fā)運行多個任務(wù) await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) 它本身也是awaitable的。 *coros_or_futures是一個序列拆分操作,如果是以個協(xié)程函數(shù),則會自動轉(zhuǎn)換成Task。 當(dāng)所有的任務(wù)都完成之后,返回的結(jié)果是一個列表的形式,列表中值的順序和*coros_or_futures完成的順序是一樣的。 return_exceptions: False,這是他的默認值,第一個出發(fā)異常的任務(wù)會立即返回,然后其他的任務(wù)繼續(xù)執(zhí)行; True,對于已經(jīng)發(fā)生了異常的任務(wù),也會像成功執(zhí)行了任務(wù)那樣,等到所有的任務(wù)執(zhí)行結(jié)束一起將錯誤的結(jié)果返回到最終的結(jié)果列表里面。
如果gather()本身被取消了,那么綁定在它里面的任務(wù)也就取消了。
(5)防止任務(wù)取消 await asyncio.shield(*arg, *, loop=None) 它本身也是awaitable的。顧名思義,shield為屏蔽、保護的意思,即保護一個awaitable 對象防止取消,一般情況下不推薦使用,而且在使用的過程中,最好使用try語句塊更好。
try: res = await shield(something()) except CancelledError: res = None
(6)設(shè)置timeout——一定要好好理解 await asyncio. wait_for (aw, timeout, *, loop=None) 如果aw是一個協(xié)程函數(shù),會自動包裝成一個任務(wù)task。參見下面的例子: import asyncio
async def eternity(): print('我馬上開始執(zhí)行') await asyncio.sleep(3600) #當(dāng)前任務(wù)休眠1小時,即3600秒 print('終于輪到我了')
async def main(): # Wait for at most 1 second try: print('等你3秒鐘哦') await asyncio.wait_for(eternity(), timeout=3) #休息3秒鐘了執(zhí)行任務(wù) except asyncio.TimeoutError: print('超時了!')
asyncio.run(main())
'''運行結(jié)果為: 等你3秒鐘哦 我馬上開始執(zhí)行 超時了! '''
為什么?首先調(diào)用main()函數(shù),作為入口函數(shù),當(dāng)輸出‘等你3秒鐘哦’之后,main掛起,執(zhí)行eternity,然后打印‘我馬上開始執(zhí)行’,然后eternity掛起,而且要掛起3600秒,大于3,這時候出發(fā)TimeoutError。修改一下:‘’ import asyncio
async def eternity(): print('我馬上開始執(zhí)行') await asyncio.sleep(2) #當(dāng)前任務(wù)休眠2秒鐘,2<3 print('終于輪到我了')
async def main(): # Wait for at most 1 second try: print('等你3秒鐘哦') await asyncio.wait_for(eternity(), timeout=3) #給你3秒鐘執(zhí)行你的任務(wù) except asyncio.TimeoutError: print('超時了!')
asyncio.run(main())
'''運行結(jié)果為: 等你3秒鐘哦 我馬上開始執(zhí)行 終于輪到我了 '''
總結(jié):當(dāng)異步操作需要執(zhí)行的時間超過waitfor設(shè)置的timeout,就會觸發(fā)異常,所以在編寫程序的時候,如果要給異步操作設(shè)置timeout,一定要選擇合適,如果異步操作本身的耗時較長,而你設(shè)置的timeout太短,會涉及到她還沒做完,就拋出異常了。
(7)多個協(xié)程函數(shù)時候的等候
await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 與上面的區(qū)別是,第一個參數(shù)aws是一個集合,要寫成集合set的形式,比如: {func(),func(),func3()} 表示的是一系列的協(xié)程函數(shù)或者是任務(wù),其中協(xié)程會自動包裝成任務(wù)。事實上,寫成列表的形式也是可以的。
注意:該函數(shù)的返回值是兩個Tasks/Futures的集合: (done, pending) 其中done是一個集合,表示已經(jīng)完成的任務(wù)tasks;pending也是一個集合,表示還沒有完成的任務(wù)。 常見的使用方法為:done, pending = await asyncio.wait(aws)
參數(shù)解釋: timeout (a float or int), 同上面的含義一樣,需要注意的是,這個不會觸發(fā)asyncio.TimeoutError異常,如果到了timeout還有任務(wù)沒有執(zhí)行完,那些沒有執(zhí)行完的tasks和futures會被返回到第二個集合pending里面。 return_when參數(shù),顧名思義,他表示的是,什么時候wait函數(shù)該返回值。只能夠去下面的幾個值。
Constant | Description | FIRST_COMPLETED | first_completes.當(dāng)任何一個task或者是future完成或者是取消,wait函數(shù)就返回 | FIRST_EXCEPTION | 當(dāng)任何一個task或者是future觸發(fā)了某一個異常,就返回,.如果是所有的task和future都沒有觸發(fā)異常,則等價與下面的 ALL_COMPLETED . | ALL_COMPLETED | 當(dāng)所有的task或者是future都完成或者是都取消的時候,再返回。 |
如下面例子所示: import asyncio import time
a=time.time()
async def hello1(): #大約2秒 print('Hello world 01 begin') yield from asyncio.sleep(2) print('Hello again 01 end')
async def hello2(): #大約3秒 print('Hello world 02 begin') yield from asyncio.sleep(3) print('Hello again 02 end')
async def hello3(): #大約4秒 print('Hello world 03 begin') yield from asyncio.sleep(4) print('Hello again 03 end')
async def main(): #入口函數(shù) done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED) for i in done: print(i) for j in pending: print(j)
asyncio.run(main()) #運行入口函數(shù)
b=time.time() print('---------------------------------------') print(b-a)
'''運行結(jié)果為: Hello world 02 begin Hello world 01 begin Hello world 03 begin Hello again 01 end <Task finished coro=<hello1() done, defined at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:46> result=None> <Task pending coro=<hello3() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001FA8D394438>()]>> <Task pending coro=<hello2() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001FA8D394378>()]>> --------------------------------------- 2.033155679702759 '''
從上面可以看出,hello1()試運行結(jié)束了的,hello2()和hello3()還沒結(jié)束。 (8)asyncio.as_completed()函數(shù)
這個函數(shù)我沒有找到合適的中文名稱去描述,所以哪位大神如果知道,望告知,不勝感激!它的函數(shù)原型如下: asyncio.as_completed(aws, *, loop=None, timeout=None) 第一個參數(shù)aws:同上面一樣,是一個集合{}集合里面的元素是coroutine、task或者future 第三個參數(shù)timeout:意義和上面講的的一樣 那到底什么作用呢?其實很簡單,個人感覺有點雞肋,從一個例子看起:
import asyncio import time import threading
a=time.time()
@asyncio.coroutine def hello1(): print('Hello world 01 begin') yield from asyncio.sleep(5) #大約5秒 print('Hello again 01 end') return '哈哈1'
@asyncio.coroutine def hello2(): print('Hello world 02 begin') yield from asyncio.sleep(3) #大約3秒 print('Hello again 02 end') return '哈哈2'
@asyncio.coroutine def hello3(): print('Hello world 03 begin') yield from asyncio.sleep(4) #大約4秒 print('Hello again 03 end') return '哈哈3'
async def main(): s=asyncio.as_completed({hello1(),hello2(),hello3()}) for f in s: result=await f print(result)
asyncio.run(main())
b=time.time() print('---------------------------------------') print(b-a)
'''運行結(jié)果為: Hello world 03 begin Hello world 01 begin Hello world 02 begin Hello again 01 end 哈哈1 Hello again 02 end 哈哈2 Hello again 03 end 哈哈3 --------------------------------------- 4.0225794315338135 '''
結(jié)論:asyncio.as_completed()函數(shù)返回的是一個可迭代(iterator)的對象,對象的每個元素就是一個future對象,很多小伙伴說,這不是相當(dāng)于沒變嗎?其實返回的future集合是對參數(shù)的future集合重新組合,組合的順序就是,最先執(zhí)行完的協(xié)程函數(shù)(coroutine、task、future)最先返回,從上面的代碼可知,參數(shù)為 aws={hello1(),hello2(),hello3()},因為hello1大約花費5秒、hello2大約花費3秒、hello3大約花費4秒。返回的結(jié)果為 s={hello2()、hello3()、hello(1)},因為hello2時間最短,故而放在前面,hello1時間最長,故而放在最后面。然后對返回的集合s開始迭代。
先來看一下Task類的簡單介紹(英文原文文檔)。

上面的文字描述中推出了幾個非常重要的信息,特在此總結(jié)如下:
(1)他是作為一個python協(xié)程對象,和Future對象很像的這么一個對象,但不是線程安全的;他繼承了Future所有的API,,除了Future.set_result()和Future.set_Exception();
(2)使用高層API asyncio.ccreate_task()創(chuàng)建任務(wù),或者是使用低層API loop.create_task()或者是loop.ensure_future()創(chuàng)建任務(wù)對象;
(3)相比于協(xié)程函數(shù),任務(wù)時有狀態(tài)的,可以使用Task.cancel()進行取消,這會觸發(fā)CancelledError異常,使用cancelled()檢查是否取消。
下面介紹Task類常見的一些使用函數(shù)
(1)cancel() Request the Task to be cancelled.
其實前面已經(jīng)有所介紹,最好是使用他會出發(fā)CancelledError異常,所以需要取消的協(xié)程函數(shù)里面的代碼最好在try-except語句塊中進行,這樣方便觸發(fā)異常,打印相關(guān)信息,但是Task.cancel()沒有辦法保證任務(wù)一定會取消,而Future.cancel()是可以保證任務(wù)一定取消的??梢詤⒁娤旅娴囊粋€例子:
import asyncio
async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) #模擬一個耗時任務(wù) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep')
async def main(): #通過協(xié)程創(chuàng)建一個任務(wù),需要注意的是,在創(chuàng)建任務(wù)的時候,就會跳入到異步開始執(zhí)行 #因為是3.7版本,創(chuàng)建一個任務(wù)就相當(dāng)于是運行了異步函數(shù)cancel_me task = asyncio.create_task(cancel_me()) #等待一秒鐘 await asyncio.sleep(1) print('main函數(shù)休息完了') #發(fā)出取消任務(wù)的請求 task.cancel() try: await task #因為任務(wù)被取消,觸發(fā)了異常 except asyncio.CancelledError: print('main(): cancel_me is cancelled now')
asyncio.run(main())
'''運行結(jié)果為: cancel_me(): before sleep main函數(shù)休息完了 cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now '''
運行過程分析:
首先run函數(shù)啟動主函數(shù)入口main,在main中,因為第一句話就是調(diào)用異步函數(shù)cancel_me()函數(shù),所以先打印出了第一句話; 然后進入cancel_me中的try語句,遇到await,暫停,這時候返回main中執(zhí)行,但是有在main中遇到了await,也會暫停,但是由于main中只需要暫停1秒,而cancel_me中要暫停3600秒,所以等到main的暫停結(jié)束后,接著運行main,所以打印出第二句話; 接下來遇到取消任務(wù)的請求task.cancel(),然后繼續(xù)執(zhí)行main里面的try,又遇到了await,接著main進入暫停,接下來進入到cancel_me函數(shù)中,但是由于main中請求了取消任務(wù),所以那個耗時3600秒的任務(wù)就不再執(zhí)行了,直接觸發(fā)了Cancelled_Error異常,打印出第三句話,接下來又raise一個異常信息; 接下來執(zhí)行cancel_me的finally,打印出第四句話,此時cancel_me執(zhí)行完畢,由于他拋出了一個異常,返回到主程序main中,觸發(fā)異常,打印出第五句話。
(2)done() 當(dāng)一個被包裝得協(xié)程既沒有觸發(fā)異常、也沒有被取消的時候,意味著它是done的,返回true。
(3)result() 返回任務(wù)的執(zhí)行結(jié)果, 當(dāng)任務(wù)被正常執(zhí)行完畢,則返回結(jié)果; 當(dāng)任務(wù)被取消了,調(diào)用這個方法,會觸發(fā)CancelledError異常; 當(dāng)任務(wù)返回的結(jié)果是無用的時候,則調(diào)用這個方法會觸發(fā)InvalidStateError; 當(dāng)任務(wù)出發(fā)了一個異常而中斷,調(diào)用這個方法還會再次觸發(fā)這個使程序中斷的異常。
(4)exception()
返回任務(wù)的異常信息,觸發(fā)了什么異常,就返回什么異常,如果任務(wù)是正常執(zhí)行的無異常,則返回None; 當(dāng)任務(wù)被取消了,調(diào)用這個方法會觸發(fā)CancelledError異常; 當(dāng)任務(wù)沒有做完,調(diào)用這個方法會觸發(fā)InvalidStateError異常。 下面還有一些不常用的方法,如下:
(5)add_done_callback(callback, *, context=None) (6)remove_done_callback(callback) (7)get_stack(*, limit=None) (8)print_stack(*, limit=None, file=None) (9)all_tasks(loop=None),這是一個類方法 (10)current_task(loop=None),這是一個類方法
對于異步編程、異步函數(shù)而言,最重要的就是異步函數(shù)調(diào)用結(jié)束之后,獲取異步函數(shù)的返回值,我們可以有以下幾種方式來獲取函數(shù)的返回值,第一是直接通過Task.result()來獲??;第二種是綁定一個回調(diào)函數(shù)來獲取,即函數(shù)執(zhí)行完畢后調(diào)用一個函數(shù)來獲取異步函數(shù)的返回值。
(1)直接通過result來獲取
import asyncio import time
async def hello1(a,b): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print('Hello again 01 end') return a+b
coroutine=hello1(10,5) loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) task=asyncio.ensure_future(coroutine) #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表 loop.run_until_complete(task) #第三步:通過事件循環(huán)運行 print('-------------------------------------') print(task.result()) loop.close()
'''運行結(jié)果為 Hello world 01 begin Hello again 01 end ------------------------------------- 15 '''
(2)通過定義回調(diào)函數(shù)來獲取 import asyncio import time
async def hello1(a,b): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print('Hello again 01 end') return a+b
def callback(future): #定義的回調(diào)函數(shù) print(future.result())
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) task=asyncio.ensure_future(hello1(10,5)) #第二步:將多個協(xié)程函數(shù)包裝成任務(wù) task.add_done_callback(callback) #并被任務(wù)綁定一個回調(diào)函數(shù)
loop.run_until_complete(task) #第三步:通過事件循環(huán)運行 loop.close() #第四步:關(guān)閉事件循環(huán)
'''運行結(jié)果為: Hello world 01 begin Hello again 01 end 15 '''
注意:所謂的回調(diào)函數(shù),就是指協(xié)程函數(shù)coroutine執(zhí)行結(jié)束時候會調(diào)用回調(diào)函數(shù)。并通過參數(shù)future獲取協(xié)程執(zhí)行的結(jié)果。我們創(chuàng)建的task和回調(diào)里的future對象,實際上是同一個對象,因為task是future的子類。
事實上,在使用asyncio進行異步編程的時候,語法形式往往是多樣性的,雖然理解異步編程的核心思想很重要,但是實現(xiàn)的時候終究還是要編寫語句的,本次給出的模板,是兩個不同的例子,例子一是三個異步方法,它們都沒有參數(shù),沒有返回值,都模擬一個耗時任務(wù);例子二是三個異步方法,都有參數(shù),都有返回值。
import asyncio import time
a=time.time()
async def hello1(): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print('Hello again 01 end')
async def hello2(): print('Hello world 02 begin') await asyncio.sleep(2) #模擬耗時任務(wù)2秒 print('Hello again 02 end')
async def hello3(): print('Hello world 03 begin') await asyncio.sleep(4) #模擬耗時任務(wù)4秒 print('Hello again 03 end')
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) tasks = [hello1(), hello2(),hello3()] #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表 loop.run_until_complete(asyncio.wait(tasks)) #第三步:通過事件循環(huán)運行 loop.close() #第四步:取消事件循環(huán)
'''運行結(jié)果為: Hello world 02 begin Hello world 03 begin Hello world 01 begin Hello again 02 end Hello again 01 end Hello again 03 end '''
import asyncio import time
async def hello1(a,b): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print('Hello again 01 end') return a+b
async def hello2(a,b): print('Hello world 02 begin') await asyncio.sleep(2) #模擬耗時任務(wù)2秒 print('Hello again 02 end') return a-b
async def hello3(a,b): print('Hello world 03 begin') await asyncio.sleep(4) #模擬耗時任務(wù)4秒 print('Hello again 03 end') return a*b
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) task1=asyncio.ensure_future(hello1(10,5)) task2=asyncio.ensure_future(hello2(10,5)) task3=asyncio.ensure_future(hello3(10,5)) tasks = [task1,task2,task3] #第二步:將多個協(xié)程函數(shù)包裝成任務(wù)列表 loop.run_until_complete(asyncio.wait(tasks)) #第三步:通過事件循環(huán)運行 print(task1.result()) #并且在所有的任務(wù)完成之后,獲取異步函數(shù)的返回值 print(task2.result()) print(task3.result()) loop.close() #第四步:關(guān)閉事件循環(huán)
'''運行結(jié)果為: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''
總結(jié):四步走(針對python3.7之前的版本) 第一步·:構(gòu)造事件循環(huán)
loop=asyncio.get_running_loop() #返回(獲?。┰诋?dāng)前線程中正在運行的事件循環(huán),如果沒有正在運行的事件循環(huán),則會顯示錯誤;它是python3.7中新添加的
loop=asyncio.get_event_loop() #獲得一個事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個新的事件循環(huán)loop;
loop=asyncio.set_event_loop(loop) #設(shè)置一個事件循環(huán)為當(dāng)前線程的事件循環(huán);
loop=asyncio.new_event_loop() #創(chuàng)建一個新的事件循環(huán)
第二步:將一個或者是多個協(xié)程函數(shù)包裝成任務(wù)Task #高層API task = asyncio.create_task(coro(參數(shù)列表)) # 這是3.7版本新添加的 task = asyncio.ensure_future(coro(參數(shù)列表))
#低層API loop.create_future(coro) loop.create_task(coro)
'''需要注意的是,在使用Task.result()獲取協(xié)程函數(shù)結(jié)果的時候,使用asyncio.create_task()卻會顯示錯 但是使用asyncio.ensure_future卻正確,本人暫時不知道原因,哪位大神知道,望告知,不勝感激!'''
第三步:通過事件循環(huán)運行 loop.run_until_complete(asyncio.wait(tasks)) #通過asyncio.wait()整合多個task
loop.run_until_complete(asyncio.gather(tasks)) #通過asyncio.gather()整合多個task
loop.run_until_complete(task_1) #單個任務(wù)則不需要整合
loop.run_forever() #但是這個方法在新版本已經(jīng)取消,不再推薦使用,因為使用起來不簡潔
''' 使用gather或者wait可以同時注冊多個任務(wù),實現(xiàn)并發(fā),但他們的設(shè)計是完全不一樣的,在前面的2.1.(4)中已經(jīng)討論過了,主要區(qū)別如下: (1)參數(shù)形式不一樣 gather的參數(shù)為 *coroutines_or_futures,即如這種形式 tasks = asyncio.gather(*[task1,task2,task3])或者 tasks = asyncio.gather(task1,task2,task3) loop.run_until_complete(tasks) wait的參數(shù)為列表或者集合的形式,如下 tasks = asyncio.wait([task1,task2,task3]) loop.run_until_complete(tasks) (2)返回的值不一樣 gather的定義如下,gather返回的是每一個任務(wù)運行的結(jié)果, results = await asyncio.gather(*tasks) wait的定義如下,返回dones是已經(jīng)完成的任務(wù),pending是未完成的任務(wù),都是集合類型 done, pending = yield from asyncio.wait(fs) (3)后面還會講到他們的進一步使用 '''
簡單來說:async.wait會返回兩個值:done和pending,done為已完成的協(xié)程Task,pending為超時未完成的協(xié)程Task,需通過future.result調(diào)用Task的result。而async.gather返回的是已完成Task的result。 第四步:關(guān)閉事件循環(huán) loop.close()
''' 以上示例都沒有調(diào)用 loop.close,好像也沒有什么問題。所以到底要不要調(diào) loop.close 呢? 簡單來說,loop 只要不關(guān)閉,就還可以再運行: loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close() 但是如果關(guān)閉了,就不能再運行了: loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此處異常 建議調(diào)用 loop.close,以徹底清理 loop 對象防止誤用 '''
在最新的python3.7版本中,asyncio又引進了一些新的特性和API import asyncio import time
async def hello1(): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print('Hello again 01 end')
async def hello2(): print('Hello world 02 begin') await asyncio.sleep(2) #模擬耗時任務(wù)2秒 print('Hello again 02 end')
async def hello3(): print('Hello world 03 begin') await asyncio.sleep(4) #模擬耗時任務(wù)4秒 print('Hello again 03 end')
async def main(): results=await asyncio.gather(hello1(),hello2(),hello3()) for result in results: print(result) #因為沒返回值,故而返回None
asyncio.run(main())
'''運行結(jié)果為: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end None None None '''
import asyncio import time
async def hello1(a,b): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時任務(wù)3秒 print('Hello again 01 end') return a+b
async def hello2(a,b): print('Hello world 02 begin') await asyncio.sleep(2) #模擬耗時任務(wù)2秒 print('Hello again 02 end') return a-b
async def hello3(a,b): print('Hello world 03 begin') await asyncio.sleep(4) #模擬耗時任務(wù)4秒 print('Hello again 03 end') return a*b
async def main(): results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5)) for result in results: print(result)
asyncio.run(main())
'''運行結(jié)果為: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''
第一步:構(gòu)建一個入口函數(shù)main 他也是一個異步協(xié)程函數(shù),即通過async定義,并且要在main函數(shù)里面await一個或者是多個協(xié)程,同前面一樣,我可以通過gather或者是wait進行組合,對于有返回值的協(xié)程函數(shù),一般就在main里面進行結(jié)果的獲取。
第二步:啟動主函數(shù)main 這是python3.7新添加的函數(shù),就一句話,即 asyncio.run(main())
注意: 不再需要顯式的創(chuàng)建事件循環(huán),因為在啟動run函數(shù)的時候,就會自動創(chuàng)建一個新的事件循環(huán)。而且在main中也不需要通過事件循環(huán)去掉用被包裝的協(xié)程函數(shù),只需要向普通函數(shù)那樣調(diào)用即可 ,只不過使用了await關(guān)鍵字而已。
1、無cpu分時切換線程保存上下文問題(協(xié)程上下文怎么保存) 2、遇到io阻塞切換(怎么實現(xiàn)的) 3、無需共享數(shù)據(jù)的保護鎖(為什么) 4、系列文章下篇預(yù)告——介紹低層的API,事件循環(huán)到底是怎么實現(xiàn)的以及future類的實現(xiàn)。
|