Python3 高级特性学习——协程

最近想着重写一下自己很久前写的CMS指纹识别器,当时采用了Python的多线程来提高效率,由于GIL锁的存在,提升并不大。在查找资料的时候发现了Python协程这个高级特性,花了几天时间学习,在此整理一下。

1. 可迭代、迭代器、生成器

可迭代 !== 迭代器 !== 生成器

1.1 可迭代

Python3中,有很多可迭代的对象,比如字符串、列表、字典等,但它们并不是迭代器。
可以借助collections这个模块的isinstance()方法,来判断这个对象是否可迭代(Iterable),是否是迭代器(Iterator),是否是生成器(Generator)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import collections
from collections.abc import Iterable, Iterator, Generator

# 字符串
tstr = "MengSec"
print("字符串:{}".format(tstr))
print(isinstance(tstr, Iterable))
print(isinstance(tstr, Iterator))
print(isinstance(tstr, Generator))

# 列表
tlist = [1, 2, 3]
print("列表:{}".format(tlist))
print(isinstance(tlist, Iterable))
print(isinstance(tlist, Iterator))
print(isinstance(tlist, Generator))

# 字典
tdict = {"name": "MengChen"}
print("字典:{}".format(tdict))
print(isinstance(tdict, Iterable))
print(isinstance(tdict, Iterator))
print(isinstance(tdict, Generator))

输出

1
2
3
4
5
6
7
8
9
10
11
12
字符串:MengSec
True
False
False
列表:[1, 2, 3]
True
False
False
字典:{'name': 'MengChen'}
True
False
False

由输出可知,这些对象都是可迭代的,但不是迭代器,也不是生成器。但是它们都可以使用for来循环。

可迭代对象,是其内部实现了__iter__()或者__getitem__()魔术方法。

1.2 迭代器

与可迭代的对象相比,迭代器其实是在内部多实现了一个__next__()方法,在迭代器中,不仅可以使用for循环来获取元素,也可以使用next()来获取元素。

迭代器遵循Python的迭代器协议,也就是对象要实现__iter()__next()方法,在Python3中要实现__next__()__iter()__ 方法返回迭代器对象本身,next()方法返回容器的下一个元素,在没有后续元素时抛出StopIteration异常

1
2
3
4
5
6
7
8
9
10
from collections.abc import Iterator

List = [1, 2, 3, 4] # 定义一个可迭代的列表
print(isinstance(List, Iterator)) # 判断是否是迭代器
List_Iterator = iter(List) # 使用iter()方法将其转换为迭代器
print(isinstance(List_Iterator, Iterator)) # 判断是否为迭代器
print(next(List_Iterator)) # 使用next()方法来获取元素
print(next(List_Iterator))
print(next(List_Iterator))
print(next(List_Iterator))

输出

1
2
3
4
5
6
False
True
1
2
3
4

Pythonfor循环实质上是先通过内置函数iter()获得一个迭代器,然后再不断调用next()函数实现的;

1.3 生成器

生成器,可以理解为,当需要它其中的元素时,才会经过计算生成该元素。从而在内存中节省大量的空间。
迭代器可以使用next()for来遍历,而生成器是在它的基础上,又实现了一个yield,也可以说生成器是一种特殊的迭代器。
yield可以理解为一种中断,在每次调用next()时,函数都会中断在yield处,并且返回当前的值。当没有遇到yield时,程序会抛出异常StopIteration

创建一个生成器主要有两种办法

  1. 使用列表生成式

需要注意与列表推导式的区别

1
2
3
4
5
6
7
8
9
10
from collections.abc import Iterable, Iterator, Generator

List1 = [x for x in range(5)] # 列表推导式
print(isinstance(List1, Iterable))
print(isinstance(List1, Iterator))
print(isinstance(List1, Generator))
List2 = (x for x in range(5)) # 列表生成式
print(isinstance(List2, Iterable))
print(isinstance(List2, Iterator))
print(isinstance(List2, Generator))

输出

1
2
3
4
5
6
True
False
False
True
True
True

  1. 实现yield的函数
1
2
3
4
5
6
7
8
def test(n):
now = 0
while now < n:
yield now
now += 1

gen = test(10)
print(isinstance(gen, Generator)) # 输出True

生成器在其生命周期中,拥有四个状态

GEN_CREATED # 等待开始执行
GEN_RUNNING # 解释器正在执行(只有在多线程应用中才能看到这个状态)
GEN_SUSPENDED # 在yield表达式处暂停
GEN_CLOSED # 执行结束

1.4 生成器->协程

在生成器中,使用yield来实现了暂停函数执行的功能,那么在函数暂停执行的时候,是否可以给它发送某些信息呢。这在编程模型上,类似于实现了子程序,经过后面的发展,诞生了协程。

在维基百科中,协程是这么定义的:

协程是计算机程序的一类组件,推广了非抢先多任务的子程序,允许执行被挂起与被恢复

给暂停中的函数传递信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from collections.abc import Iterable, Iterator, Generator

def test(N):
index = 0
while index < N:
# 通过send()发送的信息将赋值给jump
jump = yield index
if jump is None:
jump = 1
index += jump

if __name__ == '__main__':
a = test(5)
print(next(a)) # 0 < 5 在yield处阻塞
print(a.send(2)) # 继续往下执行 此时jump == 2。则index = 0 + 2 = 2,然后阻塞
print(next(a)) # 没有给jump传值,此时jump == 1。index = 2 + 1 = 3,然后阻塞
print(a.send(-3)) # 给jump传值为-3,继续往下执行,index = 3 - 3 = 0,此时继续阻塞在yield处,程序结束。

2. 协程

Python3.4中,引入了asyncio这个标准库,它直接内置了对异步IO的支持。

2.1 如何创建协程

借助asyncio这个标准库,我们使用async关键字可以很方便的定义或者创建协程(Coroutine)。

直接上代码

1
2
3
4
5
6
7
8
9
from collections.abc import Generator, Coroutine

async def test(a):
print("hello, " + a)
if __name__ == '__main__':
coroutine = test("MengSec") # 生成一个协程对象
print(coroutine)
print(isinstance(coroutine, Generator)) # 判断协程是不是生成器
print(isinstance(coroutine, Coroutine)) # 判断是不是协程

输出

1
2
3
4
<coroutine object test at 0x106449848>
False
True
sys:1: RuntimeWarning: coroutine 'test' was never awaited

可以看到,协程对象coroutine中的函数并没有执行,使用async关键词创建的协程对象不是生成器。而且程序还返回了一个Warning

在前面我们知道了,协程是在生成器的基础上实现的。通过@asyncio.coroutine装饰器,可以将一个生成器函数标记为协程,可以将其直接当做协程使用,但是它本质上还是一个生成器Generator

拿前面的一个例子修改一下,上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from collections.abc import Generator, Coroutine
import asyncio

@asyncio.coroutine
def test(n):
now = 0
while now < n:
yield now
now += 1

generator = test(10)
print(generator)
print(isinstance(generator, Generator)) # 判断是不是生成器
print(isinstance(generator, Coroutine)) # 判断是不是协程

输出

1
2
3
<generator object test at 0x1051ff840>
True
False

2.2 协程是如何工作的

首先列出在asyncio中贯穿始终的概念

  • event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数,事件循环是asyncio的核心。事件循环运行异步任务和回调,执行网络IO操作以及运行子进程。
  • coroutine协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • future对象: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
  • task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task对象是 Future的子类,它将coroutineFuture联系在一起,将coroutine封装成一个Future对象。
  • async/await关键字:python3.5中用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

协程的工作流程

  1. 创建一个协程对象
  2. 将协程转换为task任务
  3. 定义event_loop容器
  4. task任务放入event_loop容器中触发

上代码

1
2
3
4
5
6
7
8
9
import asyncio

async def test(a):
print("hello, " + a)
if __name__ == '__main__':
coroutine = test("MengSec") # 生成一个协程对象
loop = asyncio.get_event_loop() # 定义事件循环对象
task = loop.create_task(coroutine) # 将协程转换为task任务
loop.run_until_complete(task) # 将task任务扔进事件循环对象中触发

输出

1
hello, MengSec

2.3 如何获取协程函数的返回值

Task对象是Future对象的子类,它保存了协程函数运行的状态。我们可以通过它来获取协程函数的返回值,具体来说有两种方式

2.3.1 直接获取Task的结果

当协程函数运行结束后,我们需要得到其返回值,第一种方式就是等到task状态为finish时,调用taskresult方法获取返回值。

修改一下前面的例子

1
2
3
4
5
6
7
8
9
10
11
import asyncio

async def test(a):
print("hello, " + a)
return "Returned value"
if __name__ == '__main__':
coroutine = test("MengSec") # 生成一个协程对象
loop = asyncio.get_event_loop() # 定义事件循环对象
task = loop.create_task(coroutine) # 将协程转换为task任务
loop.run_until_complete(task) # 将task任务扔进事件循环对象中触发
print(task.result()) # 直接获得Task执行结束的结果

输出

1
2
hello, MengSec
Returned value

2.3.2 绑定回调函数

回调的实现有两种。

一种是利用同步编程实现的回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import time

async def test(a):
print("执行一个非常耗时的IO操作")
time.sleep(2)
return "暂停了{}秒".format(a)

if __name__ == '__main__':
coroutine = test("2") # 生成一个协程对象
loop = asyncio.get_event_loop() # 定义事件循环对象
task = loop.create_task(coroutine) # 将协程转换为task任务
loop.run_until_complete(task) # 将task任务扔进事件循环对象中触发
print(task.result()) # 直接获得Task执行结束的结果

输出

1
2
执行一个非常耗时的IO操作
暂停了2秒

另一种是通过asyncio自带的添加回调函数的功能来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time

async def test(a):
print("执行一个非常耗时的IO操作")
time.sleep(2)
return "暂停了{}秒".format(a)

def callback(future):
print("正在执行回调函数,获取返回的结果是:", future.result())
if __name__ == '__main__':
coroutine = test("2") # 生成一个协程对象
loop = asyncio.get_event_loop() # 定义事件循环对象
task = loop.create_task(coroutine) # 将协程转换为task任务
task.add_done_callback(callback) # 添加回调函数
loop.run_until_complete(task) # 将task任务扔进事件循环对象中触发

输出

1
2
执行一个非常耗时的IO操作
正在执行回调函数,获取返回的结果是: 暂停了2秒

2.4 协程中的并发

首先理解一下并发和并行

并发通常指有多个任务需要同时进行,并行则是同一时刻有多个任务执行。用多线程、多进程、协程来说,协程实现并发,多线程与多进程实现并行

asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。

第一步,创建包含多个协程的列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
async def test(): # 定义一个协程函数
print("Before IO...")
await asyncio.sleep(1) # 假定有一个IO任务,阻塞
print("After IO...")
return "任务执行完毕"
# 创建三个协程对象
a = test()
b = test()
c = test()
# 将三个协程对象转换为Task对象,并存到list中。
tasks = [
asyncio.ensure_future(a),
asyncio.ensure_future(b),
asyncio.ensure_future(c),
]

第二步,将这些协程添加到事件循环中。

同样有两种办法

  • 使用 asyncio.wait()

    1
    2
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
  • 使用 asyncio.gather()

    1
    2
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))

最后的结果,使用task.result来查看

1
2
for task in tasks:
print("任务执行结果为: ", task.result())

完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
async def test(): # 定义一个协程函数
print("Before IO...")
await asyncio.sleep(1) # 假定有一个IO任务,阻塞
print("After IO...")
return "任务执行完毕"
# 创建三个协程对象
a = test()
b = test()
c = test()
# 将三个协程对象转换为Task对象,并存到list中。
tasks = [
asyncio.ensure_future(a),
asyncio.ensure_future(b),
asyncio.ensure_future(c),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# loop.run_until_complete(asyncio.gather(*tasks))
for task in tasks:
print("任务执行结果为: ", task.result())

输出

1
2
3
4
5
6
7
8
9
Before IO...
Before IO...
Before IO...
After IO...
After IO...
After IO...
任务执行结果为: 任务执行完毕
任务执行结果为: 任务执行完毕
任务执行结果为: 任务执行完毕

3. 协程爬虫

我们日常写爬虫使用的requests库,并不支持异步,协程在本质上运行的时候,整个事件循环还是跑在单线程上的,requests等库在进行网络请求的时候,会将整个线程阻塞,导致事件循环不能继续,也就不能达到协程异步的效果了。

3.1 使用aiohttp

不过python中有aiohttp这个库来支持异步爬虫的编写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import aiohttp

async def run(url):
print("开始执行爬虫...",url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.url)

url_list = ["https://www.baidu.com","https://mengsec.com"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

输出

1
2
3
4
开始执行爬虫... https://www.baidu.com
开始执行爬虫... https://mengsec.com
https://mengsec.com
https://www.baidu.com

3.2 使用requests

函数(例如io读写,requests网络请求)阻塞了客户代码与asycio事件循环的唯一线程,因此在执行调用时,整个应用程序都会冻结。这个问题的解决方法是,使用事件循环对象的 run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用对象发给它执行。

先看一下官方demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import concurrent.futures

def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)

def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))

async def main():
loop = asyncio.get_running_loop()

## Options:

# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)

# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)

# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)

asyncio.run(main())

输出

1
2
3
default thread pool b'\xe7\xdb+\x18\xfaB\xfc-\xc3\x9c\x0b1\xb8\xbdMw\x8fK\x93\x15Q$\xe8\xd8\x90\xc1\xce\x8fB\xb8\x1b\x05\xa2 \x90\xa1\x0cv\xc9\x15E\xa3\x94\xa6\xc8J\xc9\x1bP\xed\xc47\x7f\x98,\x8bc\xb7\xf4p1\xd6\xb4p\x03\xbaQ\xb0\xbeC\xe6\x08\xbb]0n\xfe9\xe96\xaa%\xc8\x8f1\xae\x92\xde\x0bv\x83\x86\x0c\xa1\x8b\x1c\xbf\x16\xdb\xa7'
custom thread pool b'\xd8\x91\x83N\x88\xce>\xb4sPA\xae\xcf\xdd/\x94j\xdc1Uj\xf1P\xfcv\x84\xf1\x90z\x1a~8S\x9e\x146\xb7o\xc0\x03b\x1ee\x18oZ\xeeG\xb5*S\r\xadu\xc3\x8f\\\xb2q\x1b\xe9S<!\xde\xff\x02\xba\x03>\xd4\x04\x80\xfa5\xc3\x1a\x11\xef\xd4\xef\xd6^]\x1dv\x8eC\x96t\xcb\xcb:r\x84\x1d\xb5\xecC\\'
custom process pool 333333283333335000000

如果我们想在异步爬虫中使用最常用的requests库,可以使用run_in_executor()方法.新建一个线程来执行网络请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import requests
import time

async def run(url):
print("开始执行爬虫...",url)
print("开始执行时间:", time.time())
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, requests.get, url)
print(response.url)
print("结束执行时间:", time.time())

url_list = ["https://www.baidu.com","https://mengsec.com"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

输出

1
2
3
4
5
6
7
8
开始执行爬虫... https://www.baidu.com
开始执行时间: 1551690906.5337
开始执行爬虫... https://mengsec.com
开始执行时间: 1551690906.534755
https://www.baidu.com/
结束执行时间: 1551690906.618516
https://mengsec.com/
结束执行时间: 1551690908.009247

从输出来看,requests库的网络请求成功异步执行了。

在后面,我们就可以借助run_in_executor()来使用一些常用的但不支持异步的库来实现自己的目的了。

4. 参考