本例将展示如何使用 asyncio.Queue 实现一个 异步的生产者-消费者系统,
在不使用多线程或多进程的情况下,轻松实现任务并发与队列通信。
问题描述:
模拟一个“任务中心”系统:
- 生产者(Producer)负责生成任务;
- 消费者(Consumer)负责处理任务;
- 使用 asyncio.Queue 实现两者之间的异步协作。
核心知识点:
|
概念 |
说明 |
|
asyncio.Queue |
异步安全的队列,支持并发 put() / get() |
|
asyncio.create_task() |
创建可独立运行的异步任务 |
|
await queue.join() |
等待所有队列中的任务处理完成 |
|
queue.task_done() |
告知队列该任务已被处理 |
代码示例:
import asyncio
import random
# 生产者
async def producer(queue, name):
for i in range(5):
await asyncio.sleep(random.uniform(0.5, 1.5)) # 模拟生产耗时
item = f"任务-{i}-{name}"
await queue.put(item)
print(f"[生产者 {name}] 生产了 {item}")
print(f"[生产者 {name}] 完成所有任务。")
# 消费者
async def consumer(queue, name):
while True:
item = await queue.get() # 等待任务
print(f"→ [消费者 {name}] 开始处理 {item}")
await asyncio.sleep(random.uniform(1, 2)) # 模拟处理耗时
print(f"✓ [消费者 {name}] 完成处理 {item}")
queue.task_done() # 标记任务完成
# 主协程
async def main():
queue = asyncio.Queue(maxsize=10)
# 启动多个消费者
consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(2)]
# 启动多个生产者
producers = [asyncio.create_task(producer(queue, f"P{i}")) for i in range(2)]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待所有任务被消费者处理完
await queue.join()
# 撤销消费者(由于它们是无限循环)
for c in consumers:
c.cancel()
print("
所有任务已完成 ✅")
# 运行异步主程序
asyncio.run(main())
运行结果示例(输出顺序可能不同):
[生产者 P0] 生产了 任务-0-P0
[生产者 P1] 生产了 任务-0-P1
→ [消费者 C0] 开始处理 任务-0-P0
→ [消费者 C1] 开始处理 任务-0-P1
✓ [消费者 C1] 完成处理 任务-0-P1
[生产者 P1] 生产了 任务-1-P1
→ [消费者 C1] 开始处理 任务-1-P1
✓ [消费者 C0] 完成处理 任务-0-P0
...
所有任务已完成 ✅
程序执行逻辑图:
┌────────────┐ ┌────────────┐
│ 生产者 P0 │──────▶│ │
├────────────┤ │ │
│ 生产者 P1 │──────▶│ asyncio.Queue │──────▶ 消费者 C0
└────────────┘ │ │──────▶ 消费者 C1
└────────────┘
数据流:
- 生产者不断往队列中 put() 任务;
- 消费者从队列 get() 任务并异步处理;
- 队列自动调度,实现高效异步通信。
关键机制讲解:
- asyncio.Queue 内部通过事件循环协调生产和消费,不会阻塞线程。
- queue.join() 会阻塞主协程,直到所有任务调用了 task_done()。
- create_task() 能同时启动多个协程任务,实现真正的并发。
同步版本 vs 异步版本
|
项目 |
同步实现 |
异步实现 |
|
并发性 |
无 |
有 |
|
性能 |
低(阻塞) |
高(协程调度) |
|
实现复杂度 |
简单 |
稍复杂但更高效 |
|
应用场景 |
小任务批处理 |
网络IO / 并发爬虫 / 异步API服务 |
扩展示例:控制消费者数量
你可以轻松扩展消费者数量来提高吞吐量,例如:
consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(5)]
协程会自动分配任务,不会出现资源竞争问题。
总结:
- ✅ asyncio.Queue 是异步任务通信的核心;
- ✅ create_task() 用于并发运行多个协程;
- ✅ await queue.join() 等待所有任务处理完毕;
- ✅ 适合网络爬虫、异步日志、任务调度系统等应用。
© 版权声明
文章版权归作者所有,未经允许请勿转载。如内容涉嫌侵权,请在本页底部进入<联系我们>进行举报投诉!
THE END





![在苹果iPhone手机上编写ios越狱插件deb[超简单] - 鹿快](https://img.lukuai.com/blogimg/20251123/23f740f048644a198a64e73eeaa43e60.jpg)













- 最新
- 最热
只看作者