-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomm_via_queue.py
90 lines (76 loc) · 2.54 KB
/
comm_via_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Simple demo for coroutines to communicate via a queue.
"""
__author__ = 'Ziang Lu'
import asyncio
import random
import time
from asyncio import Queue
from typing import Coroutine
async def worker_coro(name: str, q: Queue) -> Coroutine:
"""
Worker coroutine.
:param name: str
:param q: Queue
:return: coroutine
"""
while True:
sleep_for = await q.get()
await asyncio.sleep(sleep_for)
q.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
q = Queue()
# Generate some random timings and put them into the queue
# i.e., This main function also works as a "producer", but just putting all
# the tasks in one batch.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
q.put_nowait(sleep_for)
# Create 3 worker tasks to process the queue concurrently
tasks = []
for i in range(3):
task = asyncio.create_task(worker_coro(f'Worker-{i}', q))
tasks.append(task)
# Wait until the queue is fully processed
started_at = time.monotonic()
await q.join()
total_slept_for = time.monotonic() - started_at
# Since a worker task runs in an infinite loop, we need to manually cancel
# them.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled
await asyncio.gather(*tasks, return_exceptions=True)
print('==========')
print(f'Total expected sleep time: {total_sleep_time:.2f} seconds')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
asyncio.run(main())
# Output:
# Worker-0 has slept for 0.20 seconds
# Worker-1 has slept for 0.53 seconds
# Worker-2 has slept for 0.96 seconds
# Worker-1 has slept for 0.45 seconds
# Worker-0 has slept for 1.00 seconds
# Worker-2 has slept for 0.35 seconds
# Worker-2 has slept for 0.15 seconds
# Worker-2 has slept for 0.09 seconds
# Worker-1 has slept for 0.77 seconds
# Worker-0 has slept for 0.66 seconds
# Worker-2 has slept for 0.53 seconds
# Worker-1 has slept for 0.36 seconds
# Worker-1 has slept for 0.30 seconds
# Worker-0 has slept for 0.63 seconds
# Worker-2 has slept for 0.50 seconds
# Worker-1 has slept for 0.46 seconds
# Worker-0 has slept for 0.44 seconds
# Worker-2 has slept for 0.35 seconds
# Worker-1 has slept for 0.54 seconds
# Worker-0 has slept for 0.71 seconds
# =====
# Total expected sleep time: 9.97 seconds
# 3 workers slept in parallel for 3.66 seconds