forked from PacktPublishing/AdvancedPythonProgramming
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample1.py
More file actions
68 lines (48 loc) · 1.8 KB
/
example1.py
File metadata and controls
68 lines (48 loc) · 1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# ch7/example1.py
import multiprocessing
class ReductionConsumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
pname = self.name
print('Using process %s...' % pname)
while True:
num1 = self.task_queue.get()
if num1 is None:
print('Exiting process %s.' % pname)
self.task_queue.task_done()
break
self.task_queue.task_done()
num2 = self.task_queue.get()
if num2 is None:
print('Reaching the end with process %s and number %i.' % (pname, num1))
self.task_queue.task_done()
self.result_queue.put(num1)
break
print('Running process %s on numbers %i and %i.' % (pname, num1, num2))
self.task_queue.task_done()
self.result_queue.put(num1 + num2)
def reduce_sum(array):
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.JoinableQueue()
result_size = len(array)
n_consumers = multiprocessing.cpu_count()
for item in array:
results.put(item)
while result_size > 1:
tasks = results
results = multiprocessing.JoinableQueue()
consumers = [ReductionConsumer(tasks, results) for i in range(n_consumers)]
for consumer in consumers:
consumer.start()
for i in range(n_consumers):
tasks.put(None)
tasks.join()
result_size = result_size // 2 + (result_size % 2)
#print('-' * 40)
return results.get()
my_array = [i for i in range(20)]
result = reduce_sum(my_array)
print('Final result: %i.' % result)