最近想着重写一下自己很久前写的CMS
指纹识别器,当时采用了Python
的多线程来提高效率,由于GIL锁
的存在,提升并不大。在查找资料的时候发现了Python
协程这个高级特性,花了几天时间学习,在此整理一下。
1. 可迭代、迭代器、生成器 可迭代 !==
迭代器 !==
生成器
1.1 可迭代 在Python3
中,有很多可迭代的对象,比如字符串、列表、字典等,但它们并不是迭代器。 可以借助collections
这个模块的isinstance()
方法,来判断这个对象是否可迭代(Iterable
),是否是迭代器(Iterator
),是否是生成器(Generator
)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import collectionsfrom collections.abc import Iterable, Iterator, Generator tstr = "MengSec" print("字符串:{}" .format (tstr)) print(isinstance (tstr, Iterable)) print(isinstance (tstr, Iterator)) print(isinstance (tstr, Generator)) tlist = [1 , 2 , 3 ] print("列表:{}" .format (tlist)) print(isinstance (tlist, Iterable)) print(isinstance (tlist, Iterator)) print(isinstance (tlist, Generator)) tdict = {"name" : "MengChen" } print("字典:{}" .format (tdict)) print(isinstance (tdict, Iterable)) print(isinstance (tdict, Iterator)) print(isinstance (tdict, Generator))
输出
1 2 3 4 5 6 7 8 9 10 11 12 字符串:MengSecTrue False False 列表:[1 , 2 , 3 ]True False False 字典:{'name ': 'MengChen '}True False False
由输出可知,这些对象都是可迭代的,但不是迭代器,也不是生成器。但是它们都可以使用for来循环。
可迭代对象,是其内部实现了__iter__()
或者__getitem__()
魔术方法。
1.2 迭代器 与可迭代的对象相比,迭代器其实是在内部多实现了一个__next__()
方法,在迭代器中,不仅可以使用for
循环来获取元素,也可以使用next()
来获取元素。
迭代器遵循Python的迭代器协议,也就是对象要实现__iter()__
和next()
方法,在Python3中要实现__next__()
, __iter()__
方法返回迭代器对象本身,next()
方法返回容器的下一个元素,在没有后续元素时抛出StopIteration
异常
1 2 3 4 5 6 7 8 9 10 from collections.abc import Iterator List = [1 , 2 , 3 , 4 ] print(isinstance (List, Iterator)) List_Iterator = iter (List) print(isinstance (List_Iterator, Iterator)) print(next (List_Iterator)) print(next (List_Iterator)) print(next (List_Iterator)) print(next (List_Iterator))
输出
Python
的for
循环实质上是先通过内置函数iter()
获得一个迭代器,然后再不断调用next()
函数实现的;
1.3 生成器 生成器,可以理解为,当需要它其中的元素时,才会经过计算生成该元素。从而在内存中节省大量的空间。 迭代器可以使用next()
和for
来遍历,而生成器是在它的基础上,又实现了一个yield
,也可以说生成器是一种特殊的迭代器。yield
可以理解为一种中断,在每次调用next()
时,函数都会中断在yield
处,并且返回当前的值。当没有遇到yield
时,程序会抛出异常StopIteration
。
创建一个生成器主要有两种办法
使用列表生成式
需要注意与列表推导式的区别
1 2 3 4 5 6 7 8 9 10 from collections.abc import Iterable, Iterator, Generator List1 = [x for x in range (5 )] print(isinstance (List1, Iterable)) print(isinstance (List1, Iterator)) print(isinstance (List1, Generator)) List2 = (x for x in range (5 )) print(isinstance (List2, Iterable)) print(isinstance (List2, Iterator)) print(isinstance (List2, Generator))
输出
1 2 3 4 5 6 True False False True True True
实现yield
的函数
1 2 3 4 5 6 7 8 def test (n ): now = 0 while now < n: yield now now += 1 gen = test(10 ) print(isinstance (gen, Generator))
生成器在其生命周期中,拥有四个状态
GEN_CREATED
# 等待开始执行GEN_RUNNING
# 解释器正在执行(只有在多线程应用中才能看到这个状态)GEN_SUSPENDED
# 在yield表达式处暂停GEN_CLOSED
# 执行结束
1.4 生成器->协程 在生成器中,使用yield
来实现了暂停函数执行的功能,那么在函数暂停执行的时候,是否可以给它发送某些信息呢。这在编程模型上,类似于实现了子程序,经过后面的发展,诞生了协程。
在维基百科中,协程是这么定义的:
协程是计算机程序的一类组件,推广了非抢先多任务的子程序,允许执行被挂起与被恢复
给暂停中的函数传递信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from collections.abc import Iterable, Iterator, Generatordef test (N ): index = 0 while index < N: jump = yield index if jump is None : jump = 1 index += jumpif __name__ == '__main__' : a = test(5 ) print(next (a)) print(a.send(2 )) print(next (a)) print(a.send(-3 ))
2. 协程 在Python3.4
中,引入了asyncio
这个标准库,它直接内置了对异步IO
的支持。
2.1 如何创建协程 借助asyncio
这个标准库,我们使用async
关键字可以很方便的定义或者创建协程(Coroutine
)。
直接上代码
1 2 3 4 5 6 7 8 9 from collections.abc import Generator, Coroutineasync def test (a ): print("hello, " + a)if __name__ == '__main__' : coroutine = test("MengSec" ) print(coroutine) print(isinstance (coroutine, Generator)) print(isinstance (coroutine, Coroutine))
输出
1 2 3 4 <coroutine object test at 0 x106449848>False True sys: 1 : RuntimeWarning: coroutine
可以看到,协程对象coroutine
中的函数并没有执行,使用async
关键词创建的协程对象不是生成器。而且程序还返回了一个Warning
。
在前面我们知道了,协程是在生成器的基础上实现的。通过 @asyncio.coroutine
装饰器,可以将一个生成器函数标记 为协程,可以将其直接当做协程使用,但是它本质上还是一个生成器Generator
。
拿前面的一个例子修改一下,上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from collections.abc import Generator, Coroutineimport asyncio@asyncio.coroutine def test (n ): now = 0 while now < n: yield now now += 1 generator = test(10 ) print(generator) print(isinstance (generator, Generator)) print(isinstance (generator, Coroutine))
输出
1 2 3 <generator object test at 0x1051ff840 >True False
2.2 协程是如何工作的 首先列出在asyncio
中贯穿始终的概念
event_loop
事件循环:程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数,事件循环是asyncio的核心。事件循环运行异步任务和回调,执行网络IO操作以及运行子进程。
coroutine
协程:协程对象,指一个使用async
关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
future
对象: 代表将来执行或没有执行的任务的结果。它和task
上没有本质的区别
task
任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task
对象是 Future
的子类,它将coroutine
和Future
联系在一起,将coroutine
封装成一个Future
对象。
async/await
关键字:python3.5
中用于定义协程的关键字,async
定义一个协程,await
用于挂起阻塞的异步调用接口。
协程的工作流程
创建一个协程对象
将协程转换为task
任务
定义event_loop
容器
将task
任务放入event_loop
容器中触发
上代码
1 2 3 4 5 6 7 8 9 import asyncioasync def test (a ): print("hello, " + a)if __name__ == '__main__' : coroutine = test("MengSec" ) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) loop.run_until_complete(task)
输出
2.3 如何获取协程函数的返回值 Task
对象是Future
对象的子类,它保存了协程函数运行的状态。我们可以通过它来获取协程函数的返回值,具体来说有两种方式
2.3.1 直接获取Task的结果 当协程函数运行结束后,我们需要得到其返回值,第一种方式就是等到task
状态为finish
时,调用task
的result
方法获取返回值。
修改一下前面的例子
1 2 3 4 5 6 7 8 9 10 11 import asyncioasync def test (a ): print("hello, " + a) return "Returned value" if __name__ == '__main__' : coroutine = test("MengSec" ) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) loop.run_until_complete(task) print(task.result())
输出
1 2 hello, MengSec Returned value
2.3.2 绑定回调函数 回调的实现有两种。
一种是利用同步编程实现的回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioimport timeasync def test (a ): print("执行一个非常耗时的IO操作" ) time.sleep(2 ) return "暂停了{}秒" .format (a)if __name__ == '__main__' : coroutine = test("2" ) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) loop.run_until_complete(task) print(task.result())
输出
另一种是通过asyncio
自带的添加回调函数的功能来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioimport timeasync def test (a ): print("执行一个非常耗时的IO操作" ) time.sleep(2 ) return "暂停了{}秒" .format (a)def callback (future ): print("正在执行回调函数,获取返回的结果是:" , future.result())if __name__ == '__main__' : coroutine = test("2" ) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) task.add_done_callback(callback) loop.run_until_complete(task)
输出
1 2 执行一个非常耗时的IO操作 正在执行回调函数,获取返回的结果是: 暂停了2秒
2.4 协程中的并发 首先理解一下并发和并行
并发通常指有多个任务需要同时进行,并行则是同一时刻有多个任务执行。用多线程、多进程、协程来说,协程实现并发,多线程与多进程实现并行
asyncio
实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await
,然后其他协程继续工作。
第一步,创建包含多个协程的列表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioasync def test (): print("Before IO..." ) await asyncio.sleep(1 ) print("After IO..." ) return "任务执行完毕" a = test() b = test() c = test() tasks = [ asyncio.ensure_future(a), asyncio.ensure_future(b), asyncio.ensure_future(c), ]
第二步,将这些协程添加到事件循环中。
同样有两种办法
使用 asyncio.wait()
1 2 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
使用 asyncio.gather()
1 2 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks))
最后的结果,使用task.result
来查看
1 2 for task in tasks: print("任务执行结果为: " , task.result())
完整代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioasync def test (): print("Before IO..." ) await asyncio.sleep(1 ) print("After IO..." ) return "任务执行完毕" a = test() b = test() c = test() tasks = [ asyncio.ensure_future(a), asyncio.ensure_future(b), asyncio.ensure_future(c), ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))for task in tasks: print("任务执行结果为: " , task.result())
输出
1 2 3 4 5 6 7 8 9 Before IO...Before IO...Before IO...After IO...After IO...After IO... 任务执行结果为: 任务执行完毕 任务执行结果为: 任务执行完毕 任务执行结果为: 任务执行完毕
3. 协程爬虫 我们日常写爬虫使用的requests
库,并不支持异步,协程在本质上运行的时候,整个事件循环还是跑在单线程上的,requests
等库在进行网络请求的时候,会将整个线程阻塞,导致事件循环不能继续,也就不能达到协程异步的效果了。
3.1 使用aiohttp 不过python
中有aiohttp
这个库来支持异步爬虫的编写。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioimport aiohttpasync def run (url ): print("开始执行爬虫..." ,url) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: print(resp.url) url_list = ["https://www.baidu.com" ,"https://mengsec.com" ] tasks = [asyncio.ensure_future(run(url)) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
输出
1 2 3 4 开始执行爬虫... https:// www.baidu.com 开始执行爬虫... https://m engsec.com https://m engsec.com https:// www.baidu.com
3.2 使用requests
函数(例如io读写,requests网络请求)阻塞了客户代码与asycio事件循环的唯一线程,因此在执行调用时,整个应用程序都会冻结。这个问题的解决方法是,使用事件循环对象的 run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用对象发给它执行。
先看一下官方demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioimport concurrent.futuresdef blocking_io (): with open ('/dev/urandom' , 'rb' ) as f: return f.read(100 )def cpu_bound (): return sum (i * i for i in range (10 ** 7 ))async def main (): loop = asyncio.get_running_loop() result = await loop.run_in_executor( None , blocking_io) print('default thread pool' , result) with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool' , result) with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool' , result) asyncio.run(main())
输出
1 2 3 default thread pool b'\xe7 \xdb+\x18 \xfaB\xfc-\xc3 \x9 c\x0 b1 \xb8 \xbdMw\x8 fK\x93 \x15 Q$\xe8 \xd8 \x90 \xc1 \xce\x8 fB\xb8 \x1 b\x05 \xa2 \x90 \xa1 \x0 cv\xc9 \x15 E\xa3 \x94 \xa6 \xc8 J\xc9 \x1 bP\xed\xc47 \x7 f\x98 ,\x8 bc\xb7 \xf4 p1 \xd6 \xb4 p\x03 \xbaQ\xb0 \xbeC\xe6 \x08 \xbb]0 n\xfe9 \xe96 \xaa%\xc8 \x8 f1 \xae\x92 \xde\x0 bv\x83 \x86 \x0 c\xa1 \x8 b\x1 c\xbf\x16 \xdb\xa7 'custom thread pool b'\xd8 \x91 \x83 N\x88 \xce>\xb4 sPA\xae\xcf\xdd/\x94 j\xdc1 Uj\xf1 P\xfcv\x84 \xf1 \x90 z\x1 a~8 S\x9 e\x146 \xb7 o\xc0 \x03 b\x1 ee\x18 oZ\xeeG\xb5 *S\r\xadu\xc3 \x8 f\\\xb2 q\x1 b\xe9 S<!\xde\xff\x02 \xba\x03 >\xd4 \x04 \x80 \xfa5 \xc3 \x1 a\x11 \xef\xd4 \xef\xd6 ^]\x1 dv\x8 eC\x96 t\xcb\xcb:r\x84 \x1 d\xb5 \xecC\\'custom process pool 333333283333335000000
如果我们想在异步爬虫中使用最常用的requests
库,可以使用run_in_executor()
方法.新建一个线程来执行网络请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioimport requestsimport timeasync def run (url ): print("开始执行爬虫..." ,url) print("开始执行时间:" , time.time()) loop = asyncio.get_event_loop() response = await loop.run_in_executor(None , requests.get, url) print(response.url) print("结束执行时间:" , time.time()) url_list = ["https://www.baidu.com" ,"https://mengsec.com" ] tasks = [asyncio.ensure_future(run(url)) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
输出
1 2 3 4 5 6 7 8 开始执行爬虫... https:// www.baidu.com 开始执行时间: 1551690906.5337 开始执行爬虫... https://m engsec.com 开始执行时间: 1551690906.534755 https:// www.baidu.com/ 结束执行时间: 1551690906.618516 https://m engsec.com/ 结束执行时间: 1551690908.009247
从输出来看,requests
库的网络请求成功异步执行了。
在后面,我们就可以借助run_in_executor()
来使用一些常用的但不支持异步的库来实现自己的目的了。
4. 参考