-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathsimple_task_queue_logging_separate_files.py
63 lines (50 loc) · 1.64 KB
/
simple_task_queue_logging_separate_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# simple_task_queue_logging_separate_files.py
import logging
import multiprocessing
import os
import time
from tasks import get_word_counts
PROCESSES = multiprocessing.cpu_count() - 1
NUMBER_OF_TASKS = 10
def create_logger(pid):
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
fh = logging.FileHandler(f"logs/process_{pid}.log")
fmt = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(fmt)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def process_tasks(task_queue):
proc = os.getpid()
logger = create_logger(proc)
while not task_queue.empty():
try:
book = task_queue.get()
get_word_counts(book)
except Exception as e:
logger.error(e)
logger.info(f"Process {proc} completed successfully")
return True
def add_tasks(task_queue, number_of_tasks):
for num in range(number_of_tasks):
task_queue.put("pride-and-prejudice.txt")
task_queue.put("heart-of-darkness.txt")
task_queue.put("frankenstein.txt")
task_queue.put("dracula.txt")
return task_queue
def run():
empty_task_queue = multiprocessing.Queue()
full_task_queue = add_tasks(empty_task_queue, NUMBER_OF_TASKS)
processes = []
print(f"Running with {PROCESSES} processes!")
start = time.time()
for w in range(PROCESSES):
p = multiprocessing.Process(target=process_tasks, args=(full_task_queue,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Time taken = {time.time() - start:.10f}")
if __name__ == "__main__":
run()