forked from irods/irods_rule_engine_plugin_audit_amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_broker_consumer.py
59 lines (45 loc) · 1.84 KB
/
message_broker_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from __future__ import print_function
import time
import json
from proton.handlers import MessagingHandler
class MessageBrokerConsumer(MessagingHandler):
def __init__(self, url, queue_name, pid_queue):
super(MessageBrokerConsumer, self).__init__()
self.url = url
self.queue_name = queue_name
self.pid_dictionary = {}
self.pid_queue = pid_queue
self.last_msg_received_at = time.time()
def on_start(self, event):
self.listener_connection = event.container.connect(self.url)
self.msg_receiver = event.container.create_receiver(self.listener_connection, self.queue_name)
def on_transport_error(self, event):
print('received an error "%s"' % event)
def on_message(self, event):
self.last_msg_received_at = time.time()
message = event.message.body
pid = 0
if "__BEGIN_JSON__" in message:
message = message.split("__BEGIN_JSON__")[1]
if "__END_JSON__" in message:
message = message.split("__END_JSON__")[0]
json_data = json.loads(message)
if 'pid' in json_data:
pid = json_data['pid']
if 'action' in json_data:
action = json_data['action']
if action == 'START':
self.pid_dictionary[pid] = []
print(pid)
#print(message)
self.pid_dictionary[pid].append(message)
elif action == 'END' and pid in self.pid_dictionary:
#print(message)
#print(pid)
self.pid_dictionary[pid].append(message)
self.pid_queue.put(self.pid_dictionary[pid])
del self.pid_dictionary[pid]
else:
if pid in self.pid_dictionary:
print(message)
self.pid_dictionary[pid].append(message)