下面是一个使用asyncio实现的多生产者(同步)单消费者的示例代码:
import asyncio
import random
async def producer(queue):
while True:
# 模拟一些生产的任务
await asyncio.sleep(random.random())
item = random.randint(1, 100)
await queue.put(item)
print(f'生产者放入队列: {item}')
async def consumer(queue):
while True:
# 模拟一些消费的任务
await asyncio.sleep(random.random())
item = await queue.get()
print(f'消费者从队列取出: {item}')
queue.task_done()
async def main():
queue = asyncio.Queue()
producers = [asyncio.create_task(producer(queue)) for _ in range(3)]
consumer_task = asyncio.create_task(consumer(queue))
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待队列中的任务全部完成
await queue.join()
consumer_task.cancel()
await consumer_task
asyncio.run(main())
在这个示例中,我们创建了一个asyncio队列,并创建了多个生产者任务和一个消费者任务。生产者任务循环执行,每次生产一个随机数并放入队列中。消费者任务循环执行,每次从队列中取出一个元素并进行处理。在生产者任务完成后,我们使用await queue.join()
等待队列中的所有任务都完成,然后取消消费者任务。最后,我们使用asyncio.run(main())
来运行主任务。