-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExtractAndStreamMultipleThreads.py
108 lines (74 loc) · 2.55 KB
/
ExtractAndStreamMultipleThreads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import threading
import time
import SqsProducerKombu
import codecs
#shared variables between producer and consumer
capacity = 5
buffer = [-1 for i in range(capacity)]
in_index = 0
out_index = 0
# Declaring Semaphores
#allowing only one semaphore to operate on the shared buffer at a time.
mutex = threading.Semaphore()
#for producer to keep track of capacity , this will block the producer thread if the capacity is full
empty_cells = threading.Semaphore(capacity)
#for consumer to keep track of capacity, this
filled_cells = threading.Semaphore(0)
# Producer Thread Class
class Producer(threading.Thread):
def run(self):
global capacity, buffer, in_index, out_index
global mutex, empty_cells, filled_cells
try:
with codecs.open(r"C:\Users\sunit\Desktop\CapstoneProject\data\nearby-all-public-posts\posts_1",'r',encoding ='utf-8') as csv_file:
for line in csv_file:
empty_cells.acquire()
mutex.acquire()
buffer[in_index] = line
in_index = (in_index + 1)%capacity
print("Producer produced : ", line)
mutex.release()
filled_cells.release()
# time.sleep(1)
except KeyboardInterrupt as e:
print(e)
class Consumer(threading.Thread):
def __init__(self, combined_message_count):
super(Consumer, self).__init__()
self. combined_message_count = combined_message_count
def run(self):
print("consumer started")
global capacity, buffer, in_index, out_index, counter
global mutex, empty_cells, filled_cells,producer_finished
combined_message = ""
msg_no = 0
msg_consumed = 0
#when to stop consuming the message
while msg_consumed < 16:
filled_cells.acquire()
mutex.acquire()
item = buffer[out_index]
out_index = (out_index + 1)%capacity
msg_no += 1
combined_message += item
if msg_no >= self.combined_message_count:
#send_message to the messaging service
SqsProducerKombu.send_message_SQS(combined_message)
print(f'consumed message:{combined_message},{msg_consumed}')
combined_message = ""
msg_no = 0
mutex.release()
empty_cells.release()
# time.sleep(2)
msg_consumed += 1
# Creating Threads
start_time = time.time()
producer = Producer()
consumer = Consumer(1)
# Starting Threads
producer.start()
consumer.start()
# Waiting for threads to complete
producer.join()
consumer.join()
print(time.time() - start_time)