September 15, 2019
当我们希望从网上爬取一些有用的信息时,就可能会写出类似以下这样的代码,不过这里简化了任务的逻辑。
import requests
import time
from functools import wraps
def func_time(func):
@wraps(func)
def new_func(*args, **kwargs):
print(f'start {func.__name__}')
start = time.time()
result = func(*args, **kwargs)
stop = time.time()
print(f'finish {func.__name__}, and taking {format(stop - start, "0.2f")} s')
return result
return new_func
def do_work(url):
requests.get(url)
@func_time
def do_works_serial(tasks):
for task in tasks:
do_work(task)
def main():
tasks = [
"https://movie.douban.com/subject/5912992/",
"https://movie.douban.com/subject/30170448/",
"https://movie.douban.com/subject/30334073/",
"https://movie.douban.com/subject/1292064/",
"https://movie.douban.com/subject/21937445/",
]
do_works_serial(tasks)
# output:
# start do_works_serial
# finish do_works_serial, and taking 3.59 s
上面的代码处理方式是依次进行网络请求,上一个请求返回之后再进行下一个请求。
这种方式在需要处理的小任务不多的时候,倒也是可行的。但是处理方式非常笨拙,会有大量的时间耗费在等待请求的返回,没有充分地利用到计算机的资源,所以通常对于这样的需求,最容易想到的就是多线程和多进程,就像下面的代码一样。
@func_time
def do_works_muti_threads(tasks):
threads = set()
for task in tasks:
t = threading.Thread(target=do_work, args=(task,))
t.start()
threads.add(t)
# 等待所有线程的结束
for t in threads:
t.join()
@func_time
def do_works_muti_processes(tasks):
processes = set()
for task in tasks:
t = Process(target=do_work, args=(task,))
t.start()
processes.add(t)
# 等待所有进程的结束
for t in processes:
t.join()
# output
# start do_works_muti_threads
# finish do_works_muti_threads, and taking 1.38 s
# start do_works_muti_processes
# finish do_works_muti_processes, and taking 1.61 s
可以发现,不论是多线程或者多进程的方案,都会提升程序的效率(当然这里的计时并不严谨,因为网络波动会影响耗时),实际上多线程或者多进程的也是软件工程解决并发问题常用的手段。
不过多线程或者多进程的方案并不是万能的,这两种方案存在一些问题,第一,会占用额外的系统资源,而且进程会占用更多的资源,互联网领域有名的 C10K 问题,就印证了这点,频繁的上下文切换带来了大量的资源消耗。第二,多线程会有同步共享资源的问题,使用锁无疑又会增加资源的消耗,而多进程的方案会有进程间通信问题。
随着技术的进步,人们又推出了协程的概念,可以轻松应对上述问题。而 Python 也在语言层面提供了支持,使得协程即提高了程序运行效率,又保证了代码的可读性,本文的示例代码都是基于 Python 3.7 的(另一种基于生成器的协程,使用 @asyncio.coroutine
装饰,不过会在 Python 3.10 中移除,这里不做讨论了)。
说了那么多,Python 3.7 中使用协程是如何解决上面提到案例呢?
import asyncio
import aiohttp
import threading
async def do_work_async(url):
print(f'{url}:{threading.currentThread().ident}')
async with aiohttp.ClientSession() as session:
async with session.get(url) as _:
print(f'{url} done')
async def coro_func(tasks):
tasks = (asyncio.create_task(do_work_async(task)) for task in tasks)
print('before tasks')
await asyncio.gather(*tasks)
print('finish tasks')
@func_time
def do_works_coroutine(tasks):
asyncio.run(coro_func(tasks))
# output
# start do_works_coroutine
# before tasks
# https://movie.douban.com/subject/5912992/:3856
# https://movie.douban.com/subject/30170448/:3856
# https://movie.douban.com/subject/30334073/:3856
# https://movie.douban.com/subject/1292064/:3856
# https://movie.douban.com/subject/21937445/:3856
# https://movie.douban.com/subject/1292064/ done
# https://movie.douban.com/subject/21937445/ done
# https://movie.douban.com/subject/5912992/ done
# https://movie.douban.com/subject/30334073/ done
# https://movie.douban.com/subject/30170448/ done
# finish tasks
# finish do_works_coroutine, and taking 1.02 s
可以发现代码只有短短的 10 来行,不过对于没有接触过协程的同学来说,不太好理解,所以针对上面代码,这里稍作梳理,更多关于 async
的介绍和用法应该参考官方文档。
async def
的函数coroutine 'xxx' was never awaited
,所以说协程函数与 await
通常是配套使用的。Task
对象,而 Task 对象被用来在事件循环中运行协程,可以通过 Task
对象取消正在运行的任务,或者是获取任务是否已经完成等信息。Task
对象了解完上面这些内容之后,我们再回过头来分析下代码。
aiohttp.ClientSession()
或者 session.get(url)
时并不会阻塞,而是切出当前任务,由事件调度器开始调度下一个任务。asyncio.run
返回,事件循环结束。以上说了这么多之后,应该了解了什么是协程,协程的工作机理,但是如果要一句话描述出来,还真不太好说。所以在 Stack Overflow
看到一句话描述,感觉挺合适的,就迁移过来了。
协程是一种通用控制结构,其中流的控制权在不同任务之间协同传递而不返回。
线程会比协程更重一些,需要操作系统知道线程的信息,操作系统会在合适的时机进行线程切换,这样做的优势就是代码简单,程序员友好,无需关系任务切换逻辑。
相比进程和线程,协程应该是最轻量的一种并发方案,任务切换完全由程序员自由控制,只有当上一个任务交出控制权下一个任务才能开始执行。对比线程,协程会更高效,但同时也存在一个问题,需要库提供者的支持,比如上面的例程中用到的 aiohttp
,如果开源社区没有的话,那么就需要自行实现了。
而多进程则是一种真正的并行方案,在多核 CPU 的机器上能同时运行多个进程。
总的来说,协程和线程更适合处理 I/O 密集的场景,特别是 Python 中的多线程实际上也只是单线程中执行;而对于 CPU 密集的场景来说,多进程、多机器、多处理器才能提高程序的运行速度。
https://github.com/Jacksonlike/blog_code/tree/master/coroutine