蒙珣的博客

活好当下,做好今天该做的事情。

0%

Python多线程

Python并发编程简介

因为CPU和IO是可以同时并行执行的,IO的执行,比如读取内存、磁盘、网络他的过程中是不需要CPU的参与的。这样的话使用多线程CPU可以释放出来,来执行其他Task,实现并发加速

同理加入多进程就可以同时使用多个CPU来执行多个Task,来为我们的程序提速

  • 多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴的等待IO

  • 多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务

  • 异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行

  • 当多线程和进程同时访问同一个文件时,我们可以使用Lock对资源加锁,防止冲突访问,有序访问

  • 使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式

  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果

  • 使用subprocess启动外部程序的进程,并进行输入输出交互(如:你写好了一个exe程序,那么使用这个模块就可以调起这个exe,并跟它进行输入输出的交互,来实现交互式的进程通信)

怎样选择多线程、多进程、多协程?

  1. 什么事CPU密集型(CPU-bound)计算、IO密集型(I/O-bound)计算

CPU密集型也叫计算密集型,是指I/O在很短时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高

例如:压缩解压缩、加密解密、正则表达式搜索

I/O密集型指的是系统运行大部分时间都是CPU在等I/O(硬盘/内存)的读/写操作,CPU占用率仍然较低

例如:文件处理程序、网络爬虫程序、读写数据库程序

  1. 多线程、多进程、多协程对比

多进程 Process(multiprocessing)

  • 优点:可以利用多核CPU并行计算
  • 缺点:占用资源最多、可启动数目比线程少
  • 适用于:CPU密集型计算

多线程 Thread(threading)

  • 优点:相比进程,更轻量级、占用资源少
  • 缺点:
    • 相比进程:多线程只能并发执行,不能利用多CPU(GIL)
    • 相比协程:启动数目有限制,占用内存资源,有线程切换开销
  • 适用于:IO密集型计算、同时运行任务数目要求不多

多协程 Coroutine(asyncio)

一个线程可以启动N个协程

  • 优点:内存开销最少、启动协程数量最多
  • 缺点:支持的库有限制(aiohttp vs requests)、代码实现复杂
  • 适用于:IO密集型计算、需要超多歌任务运行、但有现成库支持的场景

GIL锁

  • Python速度慢的两大原因
  • GIL是什么
  • 为什么有GIL这个东西
  • 怎样规避GIL带来的限制

Python速度慢的原因

  • 动态类型语言,边解释边执行
  • 有GIL锁存在,无法利用多核CPU并发执行

GIL是什么

全局解释器锁(Global Interpreter Lock)

是计算机程序设计语言解释器用于同步线程的一种机制,他使得任何时刻仅有一个线程在执行

即便在多核心处理器上,使用GIL的解释器也只允许同一时间执行一个线程

为什么有GIL这个东西

简而言之:Python设计初期,为了规避并发问题引入了GIL,现在无法去除

为了解决多线程之间数据完整性和状态同步问题

Python中对象的管理,是使用引用计数器进行的,引用数0则释放对象

开始:线程A和线程B都引用了对象obj,obj.ref_num = 2,线程A和B都想撤销对obj的引用

不过GIL简化了Python对共享资源的管理

怎样规避GIL带来的限制?

  1. 多线程threading机制依然是有用的,用于IO密集型计算

    因为在 I/O(read, write, send, recv, etc.)期间,线程会释放GIL,实现CPU和IO的并行

    因此多线程用于IO密集型计算依然可以大幅提升速度

    但多线程用于CPU密集型计算时,只会更加拖慢速度

  2. 使用multiprocessing的多进程机制实现并行计算、利用多核CPU优势

    为了对应GIL的问题,Python提供了multiprocessing

多线程使用

Python 创建多线程方法

  1. 准备一个函数

    1
    2
    def my_func(a,b):
    do_craw(a,b)
  2. 怎样创建一个线程

    1
    2
    3
    4
    import threading
    # target 传入一个函数
    # args传的是一个元组
    t = threading.Thread(target=my_func, args=(100,200,)
  3. 启动线程

    1
    t.start()
  4. 等待结束

    1
    t.join()

使用简单的多线程爬虫

blog_spider.py

1
2
3
4
5
6
7
8
9
10
11
12
import requests

urls = [
f"https://www.cnblogs.com/#p{page}"
for page in range(1, 50 + 1)
]

def craw(url):
r = requests.get(url)
print(url, len(r.text))

# craw(urls[0])

multi_thread_cray.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import threading
import time
import blog_spider


class BenchmarkThread:
def __enter__(self):
self.start = time.time()

def __exit__(self, *args):
self.end = time.time()
print(f"consume: {self.end - self.start}")


def single_threaded_crawler():
print("start single thread crawling...")
for url in blog_spider.urls:
blog_spider.craw(url)
print("end single thread crawling...")


def multi_threaded_crawler():
print("start multi thread crawling...")
threads = []
for url in blog_spider.urls:
threads.append(
threading.Thread(target=blog_spider.craw, args=(url, ))
)

for thread in threads:
thread.start()

for thread in threads:
thread.join()

print("end multi thread crawling...")


if __name__ == '__main__':
with BenchmarkThread():
single_threaded_crawler()

with BenchmarkThread():
multi_threaded_crawler()

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
start single thread crawling...
...
https://www.cnblogs.com/#p49 71620
https://www.cnblogs.com/#p50 71620
end single thread crawling...
consume: 17.035787105560303

start multi thread crawling...
...
https://www.cnblogs.com/#p12 71620
https://www.cnblogs.com/#p4 71620
end multi thread crawling...
consume: 1.0366919040679932

可以看出多线程要快了17倍左右

生产者-消费者模式

  • 多组件的Pipeline技术架构
  • 生产者-消费者爬虫的架构
  • 多线程数据通信的queue.Queue
  • 代码编写实现生产者消费者爬虫

多组件的Pipline技术架构

生产者-消费值爬虫架构

多线程数据通信的queue.Queue

queue.Queue可以用于多线程之间的、线程安全的数据通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import queue

# 创建Queue
q = queue.Queue()

# 添加元素
q.put(item)

# 获取元素
item = q.get()

# 查询状态
q.qsize() # 查看元素的多少
q.empty() # 判断是否为空
q.full() # 判断是否已满

代码编写实现生产者消费者爬虫

modify blog_spider.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import queue
import blog_spider
import random
import time
import threading


def fout(file_path, data):
with open(file_path, 'a+', encoding='utf-8') as f:
f.write(str(data) + "\n")


# producer
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
while True:
url = url_queue.get()
html = blog_spider.craw(url)
html_queue.put(html)
print(threading.current_thread().name, f"craw {url}",
"url_queue.size =", url_queue.qsize())
time.sleep(random.randint(1, 2))


# consumer
def do_parse(html_queue: queue.Queue, file_path):
while True:
html = html_queue.get()
results = blog_spider.parse(html)
for result in results:
fout(file_path, result)
print(threading.current_thread().name, f"results.size",
len(results), "html_queue.size =", html_queue.qsize())
time.sleep(random.randint(1, 2))


if __name__ == '__main__':
url_queue = queue.Queue()
html_queue = queue.Queue()

file_path = './spider_data.txt'

for url in blog_spider.urls:
url_queue.put(url)

# producer thread
for idx in range(3):
t = threading.Thread(target=do_craw, args=(url_queue, html_queue),
name=f"craw{idx}")

t.start()

# consumer thread
for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue, file_path),
name=f"parse{idx}")

t.start()

线程安全问题和解决

  • 线程安全概念介绍
  • Lock用于解决线程安全问题
  • 实例代码演示问题以及解决方案

线程安全概念介绍

线程安全指某个函数、函数库在多线程环境中被调用时,能够正确处理多个线程之间的共享变量,使程序功能正确完成

由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全

例如:两个线程先后进入 if 语句,第一个线程执行后,第二个线程也执行了(其实不应该执行)

1
2
3
def draw(account, amount):
if account.balance >= amount:
account.balance -= amount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading
import time

class Account:
def __init__(self, balance):
self.balance = balance

def draw(account, amount):
if account.balance >= amount:
time.sleep(0.1) # blocking can lead to thread switching
print(threading.current_thread().name,
"withdraw money successfully")
account.balance -= amount
print(threading.current_thread().name,
"balance", account.balance)

else:
print(threading.current_thread().name,
"withdrawal failure insuffcient balance")

if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(target=draw, args=(account, 800), name="ta")
tb = threading.Thread(target=draw, args=(account, 800), name="tb")

ta.start()
tb.start()

结果1

1
2
3
tatb withdraw money successfully
ta balance 200 withdraw money successfully
tb balance -600

Lock用于解决线程安全问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import time

lock = threading.Lock()

class Account:
def __init__(self, balance):
self.balance = balance

def draw(account, amount):
with lock: # adding a lock to a thread
if account.balance >= amount:
time.sleep(0.1) # blocking can lead to thread switching
print(threading.current_thread().name,
"withdraw money successfully")
account.balance -= amount
print(threading.current_thread().name,
"balance", account.balance)

else:
print(threading.current_thread().name,
"withdrawal failure insuffcient balance")

if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(target=draw, args=(account, 800), name="ta")
tb = threading.Thread(target=draw, args=(account, 800), name="tb")

ta.start()
tb.start()

结果

1
2
3
ta withdraw money successfully
ta balance 200
tb withdrawal failure insuffcient balance

线程池

  • 线程池的原理
  • 使用线程池的好吃
  • ThreadPoolExecutor的使用语法
  • 使用线程池改造爬虫程序

使用线程池本身不需要加锁,但在访问和修改共享数据时可能需要使用锁或其他同步机制来确保线程安全

线程池的原理

因为新建线程系统需要分配资源、终止线程系统需要回收资源

那么如果可以重用线程,则可以减去新建/终止的开销

线程池本身为提前预先建好的线程,这些线程会被重复的使用、

当有新任务时先放入任务队列,线程池里的线程会挨个取队列里的线程,依次执行

当没有任务时,回到线程池,并不会被销毁,等待下一个任务到来

使用线程池的好处

  1. 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源
  2. 适用场景:适合处理突发性大量请求或需要大量线程完成的任务,但实际任务处理时间较短
  3. 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
  4. 代码优势:使用线程池的语法比自己新建线程执行线程更加简洁

ThreadPoolExecutor的使用语法

1
2
3
4
5
6
from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
results = pool.map(craw, urls) # 提前把urls都准备好,进行全部的执行
for result im results:
print(result)

用法1:map 函数,很简单

注意 map 的结果和入参是顺序对应的

1
2
3
4
5
6
7
8
9
10
from concurrent.futures import ThreadPoolExecutor, as_completed

with ThreadPoolExecutor() as pool:
futures = [pool.submit(craw, url) for url in urls]

for future in futures: # 挨个等待每一个按顺序的结束,进行返回和打印
print(future.result())

for future in as_completed(futures): # 不管里面那个任务先执行完了,它就会先进行返回
print(future.result())

用法2:future模式,更强大

注意如果使用 as_completed 顺序是不定的

使用线程池改造爬虫程序

thread_pool.py

使用pool.mapfuturesas_completed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ThreadPoolExecutor
import blog_spider

with ThreadPoolExecutor() as pool:
htmls = pool.map(blog_spider.craw, blog_spider.urls)
htmls = list(zip(blog_spider.urls, htmls))
for url, html in htmls:
print(url, len(html))

print("craw over")

with ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
future = pool.submit(blog_spider.parse, html)
futures[future] = url

for future, url in futures.items():
print(url, future.result())

可以看出是顺序执行的,下面看看使用futures.as_completed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ThreadPoolExecutor
import blog_spider

with ThreadPoolExecutor() as pool:
htmls = pool.map(blog_spider.craw, blog_spider.urls)
htmls = list(zip(blog_spider.urls, htmls))
for url, html in htmls:
print(url, len(html))

print("craw over")

with ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
future = pool.submit(blog_spider.parse, html)
futures[future] = url

for future, url in futures.items():
print(url, future.result())

那个任务先执行完成,就先返回那个任务

总结

  • pool.map方式更加简洁,但你需要把所有的任务提前准备好,才能一次性提交。并且返回是按顺序返回的
  • pool.submit方式是单个提交任务。for future in futures是按顺序返回的,而for future in as_completed那个任务先执行完成,就先返回那个
  • 更推荐使用for future in as_completed

线程池在web服务中实现加速

  • Web服务的架构以及特点
  • 使用线程池ThreadPoolExecutor加速
  • 代码使用Flask实现Web服务并实现加速

Web服务的架构以及特点

Web后台服务的特点

  • Web服务对响应时间要求非常高,比如要求200ms内返回
  • Web服务有大量的依赖IO操作的调用,比如磁盘文件、数据库、远程API
  • Web服务经常需要处理几万人、几百万人的同时请求

使用线程池ThreadPoolExecutor加速

使用线程池ThreadPoolExecutor的好处:

  • 方便的将磁盘文件、数据库、远程API的IO调用并发执行
  • 线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能

代码使用Flask实现Web服务并实现加速

flask_thread_pool.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import json
import time
import flask

app = flask.Flask(__name__)


def read_file():
time.sleep(0.1)
return "read_file"


def read_db():
time.sleep(0.2)
return "read_db"


def read_api():
time.sleep(0.3)
return "read_api"


@app.route('/')
def index():
result_file = read_file()
result_db = read_db()
result_api = read_api()

return json.dumps({
"result_file": result_file,
"result_db": result_db,
"result_api": result_api
})

if __name__ == '__main__':
app.run()

time curl http://127.0.0.1

可以看出每一次请求大概消耗600ms左右(三个IO操作,以及系统时间),那么这个程序我们如何改造和加速呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json
import time
import flask
from concurrent.futures import ThreadPoolExecutor

app = flask.Flask(__name__)
pool = ThreadPoolExecutor() # 初始化全局线程池

def read_file():
time.sleep(0.1)
return "read_file"


def read_db():
time.sleep(0.2)
return "read_db"


def read_api():
time.sleep(0.3)
return "read_api"


@app.route('/')
def index():
result_file = pool.submit(read_file)
result_db = pool.submit(read_db)
result_api = pool.submit(read_api)

return json.dumps({
"result_file": result_file.result(),
"result_db": result_db.result(),
"result_api": result_api.result()
})

if __name__ == '__main__':
app.run()

结果

为什么是300ms左右呢?因为他们并发同时运行时,花费最多的就是read_api()300ms,其他两个都在300ms内完成,所以请求花费时间基本等于最长消耗时间的函数了

多进程使用

  • 有了多线程threading,为什么还要使用多进程multiprocessing
  • 多进程mutiprocessing知识梳理(对比多线程threading)
  • 单线程、多线程、多进程对CPU密集计算速度

有了多线程threading,为什么还要使用多进程multiprocessing

multiprocessing 模块就是python为了解决GIL缺陷引入的一个模块,原理是用多进程在多CPU上并行执行

多进程mutiprocessing知识梳理

单线程、多线程、多进程CPU密集计算速度

thread_process_cpu_bound.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import math, time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

PRIMES = [112272535095293] * 100


class Benchmark:
def __init__(self, description):
self.description = description

def __enter__(self):
self.start_time = time.time()
print(self.description)

def __exit__(self, *args):
self.end_time = time.time()
print(f"consume: {self.end_time - self.start_time}")


def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True


def single_thread():
for number in PRIMES:
is_prime(number)


def multi_thread():
with ThreadPoolExecutor() as pool:
pool.map(is_prime, PRIMES)


def multi_process():
with ProcessPoolExecutor() as pool:
pool.map(is_prime, PRIMES)


if __name__ == '__main__':
with Benchmark("single thread"):
single_thread()

with Benchmark("multi thread"):
multi_thread()

with Benchmark("multi process"):
multi_process()

结果

1
2
3
4
5
6
single thread
consume: 22.90883493423462
multi thread
consume: 22.713069915771484
multi process
consume: 4.842185974121094

在Flask服务中使用进程池加速

flask_process_pool.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import flask, math, json
from concurrent.futures import ProcessPoolExecutor

app = flask.Flask(__name__)
process_pool = ProcessPoolExecutor()

def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
number_list = [int(x) for x in numbers.split(",")]
results = process_pool.map(is_prime, number_list)
return json.dumps(dict(zip(number_list, results)))

if __name__ == '__main__':
app.run()

请求路径http://127.0.0.1:5000/is_prime/1,2,3,4

结果

1
{"1": false, "2": true, "3": true, "4": false}

异步IO:anyncio

在一个线程中如果遇到IO等待时间,线程不会傻傻等待,利用空闲的时候去干其他事情

语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

# 定义协程
async def myfunc(url):
await get_url(url)

# 创建task列表
tasks = [loop.create_task(myfunc(url)) for url in urls]

# 去生成或者获取一个事件循环
loop = asyncio.get_event_loop()

# 将任务放到任务列表
loop.run_until_complete(asyncio.wait(tasks))

注意:

  1. 要用在异步IO编程中,依赖的库必须支持异步IO特性
  2. 爬虫引用中:requests 不支持异步,需要用 aiohttp

python3.4及以后版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动切换到tasks中的其他任务
print(2)

@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2)
print(4)

tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

python3.5及以后版本(推荐),使用 anync & await 关键字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def func1():
print(1)
await asyncio.sleep(2) # 遇到IO耗时操作,自动切换到tasks中的其他任务
print(2)

async def func2():
print(3)
await asyncio.sleep(2)
print(4)

tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

使用协程异步保存文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import aiohttp
import asyncio

async def fetch(session, url):
print("发送请求: ", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('/')[-1]
with open(file_name, 'wb') as f:
f.write(content)
print("下载完成: ", url)

async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://dailybing.com/view/zh-cn/20240109.html',
'https://dailybing.com/view/zh-cn/20240112.html',
'https://dailybing.com/view/zh-cn/20240113.html'
]

tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]

await asyncio.wait(tasks)

if __name__ == '__main__':
asyncio.run(main())

结果

1
2
3
4
5
6
发送请求:  https://dailybing.com/view/zh-cn/20240109.html
发送请求: https://dailybing.com/view/zh-cn/20240112.html
发送请求: https://dailybing.com/view/zh-cn/20240113.html
下载完成: https://dailybing.com/view/zh-cn/20240112.html
下载完成: https://dailybing.com/view/zh-cn/20240113.html
下载完成: https://dailybing.com/view/zh-cn/20240109.html

Python 异步IO库介绍:asyncio

1
2
3
4
async def func():
pass

result = func()

执行协程函数创建的写成对象,函数内部代码不会执行

如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理

1
2
3
4
5
6
7
8
9
10
import asyncio

async def func():
print("1")

result = func()

# loop = asyncio.get_event_loop()
# loop.run_until_complete(result) # 将函数添加到任务列表中
asyncio.run(result) # python 3.7及以后,代替上面两步

await

await + 可等待的对象(协程对象、Future、Task对象 -> IO等待)

示例1:

1
2
3
4
5
6
7
8
import asyncio

async def func():
print("1")
response = await asyncio.sleep(2) # IO等待
print("结束",response)

asyncio.run(func())

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def others():
print("start")
await asyncio.sleep(2)
print("end")
return "返回值"

async def func():
print("执行协程函数内部代码")

# 遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起任务时,事件循环可以去执行其他协程(任务)
response = await others()

print("IO请求结束, 结果为: ", response)

asyncio.run(func())

结果

1
2
3
4
执行协程函数内部代码
start
end
IO请求结束, 结果为: 返回值

await就是等待对象的值得到结果后再继续执行

Task对象

Tasks are used to schedule coroutines concurrently

When a coroutine is wrapped into a Task with functions like asyncio.create_ task() the coroutine is automatically scheduled to run soon.

在事件循环中添加多个任务

Tasks 用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件,循环中等待被调度执行。除了使用asyncio.create_task()函数以外(python3.7),还可以用低层级的loop.create_task()ensure_future()函数(python3.7之前)。不建议手动实例化Task对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

async def func():
print("1")
await asyncio.sleep(2)
print("2")
return "返回值"

async def main():
print("start")

# 创建Task对象,将当前执行func函数任务添加到事件循环中
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())

print("end")

# 当执行某协程遇到IO操作时,会自动化切换执行其他任务
# 此处的await是等待相对应的协程全都执行完毕并获取结果
ret1 = await task1
ret2 = await task2
print(ret1, ret2)

asyncio.run(main())

结果

1
2
3
4
5
6
7
start
end
1
1
2
2
返回值 返回值

简化写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio

async def func():
print("1")
await asyncio.sleep(2)
print("2")
return "返回值"

async def main():
print("start")

# 创建Task对象,将当前执行func函数任务添加到事件循环中
task_list = [
asyncio.create_task(func(), name='n1'),
asyncio.create_task(func(), name='n2')
]

print("end")

# 当执行某协程遇到IO操作时,会自动化切换执行其他任务
# 此处的await是等待相对应的协程全都执行完毕并获取结果

done, pending = await asyncio.wait(task_list, timeout=None) # timeout 最多等待时间。 如果等待后未完成,pending返回一个未完成的集合
print(done) # 返回一个完成的集合

asyncio.run(main())

结果

1
2
3
4
5
6
7
start
end
1
1
2
2
{<Task finished name='n1' coro=<func() done, defined at test.py:3> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at test.py:3> result='返回值'>}

asyncio.Future对象

A Futureis a special low-level awaitable object that represents an eventual result of an asynchronous operation.

asyncio中的Future对象是一个相对更偏向底层的可对象,通常我们不会直接用到这个对象,而是直接使用Task对象来完成任务的并和状态的追踪。( Task 是 Futrue的子类 )

Future为我们提供了异步编程中的 最终结果 的处理(Task类也具备状态处理的功能)。

示例1:

1
2
3
4
5
6
7
8
9
10
11
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()

# # 创建一个任务(Future对象),这个任务什么都不干。
fut = loop.create_future()

# 等待任务最终结果(Future对象),没有结果则会一直等下去。
await fut

asyncio.run(main())

示例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio


async def set_after(fut):
await asyncio.sleep(2)
fut.set_result("666")


async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()

# 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。
fut = loop.create_future()

# 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。
# 即手动设置future任务的最终结果,那么fut就可以结束了。
await loop.create_task(set_after(fut))

# 等待 Future对象获取 最终结果,否则一直等下去
data = await fut
print(data)

asyncio.run(main())

Future对象本身函数进行绑定,所以想要让事件循环获取Future的结果,则需要手动设置。而Task对象继承了Future对象,其实就对Future进行扩展,他可以实现在对应绑定的函数执行完成之后,自动执行set_result,从而实现自动结束。

虽然,平时使用的是Task对象,但对于结果的处理本质是基于Future对象来实现的。

扩展:支持 await 对象语 法的对象课成为可等待对象,所以 协程对象Task对象Future对象 都可以被成为可等待对象。

futures.Future对象

concurrent.futures.Future对象使用线程池、进程池实现异步操作时用到的对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time
from concurrent.futures import Future
from concurrent.futrues.thread import ThreadPoolExecutor
from concurrent.futrues.process import ProcessPoolExecutor

def func(value):
time.sleep(1)
print(value)
return 123

# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)

# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
fut = pool.submit(func, i)
print(fut)

两个Future对象是不同的,他们是为不同的应用场景而设计,例如:concurrent.futures.Future不支持await语法等

官方提示两对象之间不同:

  • unlike asyncio Futures, concurrent.futures.Future instances cannot be awaited.
  • asyncio.Future.result() and asyncio.Future.exception() do not accept the timeout argument.
  • asyncio.Future.result() and asyncio.Future.exception() raise an InvalidStateError exception when the Future is not done.
  • Callbacks registered with asyncio.Future.add_done_callback() are not called immediately. They are scheduled with loop.call_soon() instead.
  • asyncio Future is not compatible with the concurrent.futures.wait() and concurrent.futures.as_completed() functions.

在Python提供了一个将futures.Future 对象包装成asyncio.Future对象的函数 asynic.wrap_future

一般在程序开发中我们要么统一使用 asycio 的协程实现异步操作、要么都使用进程池和线程池实现异步操作。但如果 协程的异步进程池/线程池的异步 混搭时,那么就会用到此功能了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time
import asyncio
import concurrent.futures

def func1():
# 某个耗时操作
time.sleep(2)
return "SB"

async def main():
loop = asyncio.get_running_loop()

# 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
# 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
# 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
# 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。
fut = loop.run_in_executor(None, func1)
result = await fut
print('default thread pool', result)

# 2. Run in a custom thread pool:
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom thread pool', result)

# 3. Run in a custom process pool:
# with concurrent.futures.ProcessPoolExecutor() as pool:
# result = await loop.run_in_executor(
# pool, func1)
# print('custom process pool', result)

asyncio.run(main())

asyncio + 不支持异步模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
import requests

async def download_file(url):
# 发送网络请求,下载文件(遇到网络下载文件IO请求,自动切换到其他任务)
print("开始下载:", url)

loop = asyncio.get_event_loop()
# requests模块默认不支持异步操作,所以使用线程池来配合实现
future = loop.run_in_executor(None, requests.get, url)

response = await future
print("下载完成:", url)
# 文件保存到本地
file_name = url.rsplit('/')[-1]
with open(file_name, 'wb') as f:
f.write(response.content)

if __name__ == "__main__":

url_list = [
"https://dailybing.com/view/zh-cn/20240109.html",
"https://dailybing.com/view/zh-cn/20240110.html",
"https://dailybing.com/view/zh-cn/20240111.html"
]

tasks = [download_file(url) for url in url_list]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

https://www.bilibili.com/video/BV1bK411A7tV?p=12&vd_source=7edae3cd790e850cc7836ab5c5d9ac4b

Understanding GIL

https://zhuanlan.zhihu.com/p/354358309

https://docs.python.org/zh-cn/3.9/library/asyncio-task.html

https://zhuanlan.zhihu.com/p/137057192