Python进阶实例 第50例:asyncio 实现异步生产者-消费者模型

本例将展示如何使用 asyncio.Queue 实现一个 异步的生产者-消费者系统
在不使用多线程或多进程的情况下,轻松实现任务并发与队列通信。


问题描述:

模拟一个“任务中心”系统:

  • 生产者(Producer)负责生成任务;
  • 消费者(Consumer)负责处理任务;
  • 使用 asyncio.Queue 实现两者之间的异步协作。

核心知识点:

概念

说明

asyncio.Queue

异步安全的队列,支持并发 put() / get()

asyncio.create_task()

创建可独立运行的异步任务

await queue.join()

等待所有队列中的任务处理完成

queue.task_done()

告知队列该任务已被处理


代码示例:

import asyncio
import random

# 生产者
async def producer(queue, name):
    for i in range(5):
        await asyncio.sleep(random.uniform(0.5, 1.5))  # 模拟生产耗时
        item = f"任务-{i}-{name}"
        await queue.put(item)
        print(f"[生产者 {name}] 生产了 {item}")
    print(f"[生产者 {name}] 完成所有任务。")

# 消费者
async def consumer(queue, name):
    while True:
        item = await queue.get()  # 等待任务
        print(f"→ [消费者 {name}] 开始处理 {item}")
        await asyncio.sleep(random.uniform(1, 2))  # 模拟处理耗时
        print(f"✓ [消费者 {name}] 完成处理 {item}")
        queue.task_done()  # 标记任务完成

# 主协程
async def main():
    queue = asyncio.Queue(maxsize=10)

    # 启动多个消费者
    consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(2)]

    # 启动多个生产者
    producers = [asyncio.create_task(producer(queue, f"P{i}")) for i in range(2)]

    # 等待所有生产者完成
    await asyncio.gather(*producers)

    # 等待所有任务被消费者处理完
    await queue.join()

    # 撤销消费者(由于它们是无限循环)
    for c in consumers:
        c.cancel()

    print("
所有任务已完成 ✅")

# 运行异步主程序
asyncio.run(main())

运行结果示例(输出顺序可能不同):

[生产者 P0] 生产了 任务-0-P0
[生产者 P1] 生产了 任务-0-P1[消费者 C0] 开始处理 任务-0-P0[消费者 C1] 开始处理 任务-0-P1[消费者 C1] 完成处理 任务-0-P1
[生产者 P1] 生产了 任务-1-P1[消费者 C1] 开始处理 任务-1-P1[消费者 C0] 完成处理 任务-0-P0
...
所有任务已完成 ✅

程序执行逻辑图:

┌────────────┐       ┌────────────┐
│ 生产者 P0  │──────▶│            │
├────────────┤       │            │
│ 生产者 P1  │──────▶│ asyncio.Queue │──────▶ 消费者 C0
└────────────┘       │            │──────▶ 消费者 C1
                     └────────────┘

数据流:

  1. 生产者不断往队列中 put() 任务;
  2. 消费者从队列 get() 任务并异步处理;
  3. 队列自动调度,实现高效异步通信。

关键机制讲解:

  • asyncio.Queue 内部通过事件循环协调生产和消费,不会阻塞线程。
  • queue.join() 会阻塞主协程,直到所有任务调用了 task_done()。
  • create_task() 能同时启动多个协程任务,实现真正的并发。

同步版本 vs 异步版本

项目

同步实现

异步实现

并发性

性能

低(阻塞)

高(协程调度)

实现复杂度

简单

稍复杂但更高效

应用场景

小任务批处理

网络IO / 并发爬虫 / 异步API服务


扩展示例:控制消费者数量

你可以轻松扩展消费者数量来提高吞吐量,例如:

consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(5)]

协程会自动分配任务,不会出现资源竞争问题。


总结:

  • ✅ asyncio.Queue 是异步任务通信的核心;
  • ✅ create_task() 用于并发运行多个协程;
  • ✅ await queue.join() 等待所有任务处理完毕;
  • ✅ 适合网络爬虫、异步日志、任务调度系统等应用。
© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 共1条

请登录后发表评论