asyncio 是 Python 中实现异步编程的强大库,通过事件循环机制来处理并发任务,避免阻塞操作,提高程序在 I/O 密集型场景中的效率。asyncio 主要依赖协程和任务(tasks)来实现异步操作,使开发者可以在网络、文件操作等场景中进行非阻塞的操作。

核心概念和组件

1. 协程(Coroutines)

协程是 asyncio 中的基本执行单元。可以通过 async def 定义协程函数(coroutine),执行协程函数时不会立即运行代码,而是返回一个协程对象,需要在事件循环中调度后才能运行。协程支持在某些操作(如网络 I/O)等待时暂停执行,之后再恢复,以实现非阻塞操作。

示例:

1
2
3
4
5
6
import asyncio

async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(2) # 模拟 I/O 操作
print("数据获取完成")

2. await 关键字

await 是在协程中使用的关键字,用于暂停当前协程的执行,等待某个异步操作完成后再恢复。只有在协程函数内部才可以使用 awaitawait 常用于等待异步函数的返回值,例如 await asyncio.sleep(1) 会暂停协程 1 秒。

示例:

1
2
3
4
5
6
async def main():
print("任务开始")
await fetch_data()
print("任务完成")

asyncio.run(main())

3. 事件循环(Event Loop)

事件循环是 asyncio 的核心机制,负责调度和执行所有的协程任务。事件循环会不断运行,寻找需要执行的任务和事件,并在需要时恢复暂停的协程,确保并发任务的顺序和时机。通常使用 asyncio.run() 函数启动事件循环并执行协程。

示例:

1
2
3
4
5
async def main():
await fetch_data()

# 启动事件循环并运行协程 main
asyncio.run(main())

4. 任务(Tasks)

任务是事件循环调度协程执行的对象。可以通过 asyncio.create_task() 创建任务,将协程对象包装成任务,使得多个协程可以并发执行。例如,如果一个协程需要执行多个 I/O 操作,创建任务可以实现异步并发。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def task1():
await asyncio.sleep(1)
print("任务 1 完成")

async def task2():
await asyncio.sleep(2)
print("任务 2 完成")

async def main():
# 创建并发任务
task1_ = asyncio.create_task(task1())
task2_ = asyncio.create_task(task2())

# 等待所有任务完成
await task1_
await task2_

asyncio.run(main())

5. asyncio.gather()asyncio.wait()

  • asyncio.gather():同时运行多个协程,收集所有协程的结果,并行处理多个异步任务。适合同时运行并等待多个任务完成。
  • asyncio.wait():提供更多控制,可以等待所有任务或部分任务完成,允许设置超时(timeout)。
1
2
3
4
5
6
7
8
9
10
11
12
13
async def task1():
await asyncio.sleep(1)
return "任务 1 完成"

async def task2():
await asyncio.sleep(2)
return "任务 2 完成"

async def main():
results = await asyncio.gather(task1(), task2())
print(results)

asyncio.run(main())

6. 异步 I/O 操作

asyncio 提供了多种内置的异步 I/O 函数,用于处理文件、网络等 I/O 操作,以下是一些常用的异步 I/O 操作:

  • asyncio.sleep():模拟非阻塞的延迟。
  • asyncio.open_connection():用于建立 TCP 连接。
  • asyncio.wait_for():设置超时,等待异步操作完成。
  • asyncio.Queue():异步队列,用于跨任务之间传递数据,适合生产者-消费者模式。

示例:使用 asyncio.sleep() 模拟一个网络请求

1
2
3
4
async def fetch_data():
print("Fetching data...")
await asyncio.sleep(2) # 模拟网络 I/O
print("Data fetched!")

7. 异步上下文管理器和异步迭代器

Python 支持在 asyncio 中使用异步上下文管理器(async with)和异步迭代器(async for)来实现资源管理。它们在异步场景中常用于数据库、文件、网络连接的打开和关闭操作。

  • 异步上下文管理器示例:

    1
    2
    3
    4
    5
    import aiofiles

    async def write_data():
    async with aiofiles.open('data.txt', 'w') as f:
    await f.write("Hello, asyncio!")
  • 异步迭代器示例:

    1
    2
    3
    async def main():
    async for item in async_generator():
    print(item)

实际应用场景

I/O 密集型任务

asyncio 非常适合需要频繁进行 I/O 操作的应用,比如:

  • 网络爬虫:同时爬取多个网站。
  • API 调用:批量请求 API,例如 RESTful 或 WebSocket API。
  • 文件读写:处理文件时不阻塞主线程,可以提高处理效率。

生产者-消费者模式

在处理需要排队的任务时,异步队列(asyncio.Queue)可以用于实现生产者-消费者模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def producer(queue):
for i in range(5):
await queue.put(f"消息 {i}")
print(f"生产消息 {i}")
await asyncio.sleep(1)

async def consumer(queue):
while True:
message = await queue.get()
print(f"消费消息 {message}")
queue.task_done()

async def main():
queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))

asyncio.run(main())

这个代码是一个生产者-消费者模式的示例,通过 asyncio.Queue 实现异步任务的通信与协作。

代码分解与执行流程:

  1. producer(queue) 函数

    • 这是一个生产者协程函数,用于生成消息并将其放入 queue 队列中。
    • 它在一个循环中生成 5 条消息,并依次将消息放入队列 (queue.put())。
    • 每生成一条消息,都会打印消息编号,例如 “生产消息 0”,然后 await asyncio.sleep(1) 暂停 1 秒。
    • 总体上,producer 相当于一个每秒生产一条消息的任务。
  2. consumer(queue) 函数

    • 这是一个消费者协程函数,用于从 queue 队列中获取消息并进行处理。
    • 它使用一个 while True 循环不断从队列中获取消息 (queue.get()),处理后打印消费的消息内容,例如 “消费消息 0”。
    • 调用 queue.task_done() 表示消息已被处理完成。
    • 因为 while True 是无限循环,consumer 会一直等待并消费消息,直到手动结束或异常终止。
  3. main() 函数

    • main() 是主协程函数,负责创建队列和启动生产者和消费者任务。
    • 首先创建一个 asyncio.Queue() 实例 queue,用于存储生产者生成的消息。
    • 使用 await asyncio.gather(producer(queue), consumer(queue)) 并发地运行 producerconsumer,确保生产者生成的消息可以被消费者处理。
  4. asyncio.run(main())

    • asyncio.run() 启动事件循环,执行 main() 协程。
    • 事件循环会调度 producerconsumer 并发执行,控制他们在每次 await 时切换,实现非阻塞地生成和消费消息。

执行过程中的输出解释:

  1. 生产者启动并生成消息:

    • 生产者每次生成一条消息后,打印消息内容并暂停 1 秒。
    • 消息放入队列后,消费者协程会立即获取消息,执行消费。
  2. 消费者会实时处理消息:

    • 消费者无限循环等待队列中的消息。
    • 每次从队列获取一条消息后,会打印消费消息的内容,然后标记任务完成 (queue.task_done())。

代码执行时,输出可能如下:

1
2
3
4
5
6
7
8
9
10
生产消息 0
消费消息 0
生产消息 1
消费消息 1
生产消息 2
消费消息 2
生产消息 3
消费消息 3
生产消息 4
消费消息 4

异步编程的优势与局限

  • 优势

    1. 高效并发:在 I/O 密集型任务中,asyncio 可以显著提高程序的响应速度和效率,适合网络、文件读写等高并发任务。
    2. 资源节省:异步操作避免了多线程的开销和线程切换问题,不需要额外的锁或信号量。
    3. 更简洁的代码:使用协程可以更自然地组织代码,相比回调函数结构更清晰。
  • 局限

    1. 不适合 CPU 密集任务asyncio 本质上是单线程的,不能在多核 CPU 上并行执行 CPU 密集型任务。
    2. 兼容性:并非所有库都支持异步操作,如果需要混合使用同步和异步库,可能会增加代码复杂度。
    3. 调试难度:异步代码的执行顺序可能不如同步代码直观,调试时容易产生时序错误。

官方文档: