-
Notifications
You must be signed in to change notification settings - Fork 2
/
amqp.q
146 lines (118 loc) · 5.46 KB
/
amqp.q
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*******************************************************************************************
/ Author: Thomas Kunnumpurath
/ This module allows users to connect to a message broker via AMQP messaging protocol.
/ It provides q wrapper, via embedPy, on top of python code which uses a python library called proton-amqp.
/ Dependencies include python, proton-amqp and embedPy
/ Examples:
/ To submit a message to a topic:
/ q)amqp_publish["<host>.messaging.solace.cloud:<port>";"topic://test";"TEST";"solace-cloud-client";"<password>"]
/ To receive messages:
/ q)amqp_subscribe["<host>.messaging.solace.cloud:<port>";"topic://test";"solace-cloud-client";"<password>"]
/*******************************************************************************************
p)from __future__ import print_function, unicode_literals
p)from proton.handlers import MessagingHandler
p)from proton.reactor import Container
p)from proton import Message
/ define function which would be called when a connection is established
p)class Recv(MessagingHandler):
def __init__(self, url, address, count, username, password):
super(Recv, self).__init__()
# amqp broker host url
self.url = url
# amqp node address
self.address = address
# authentication credentials
self.username = username
self.password = password
# messaging counters
self.expected = count
self.received = 0
def on_start(self, event):
# select authentication options for connection
if self.username:
# basic username and password authentication
conn = event.container.connect(url=self.url,
user=self.username,
password=self.password,
allow_insecure_mechs=True)
else:
# Anonymous authentication
conn = event.container.connect(url=self.url)
# create receiver link to consume messages
if conn:
event.container.create_receiver(conn, source=self.address)
print("Connected to AMQP broker!")
def on_message(self, event):
if event.message.id and event.message.id < self.received:
# ignore duplicate message
return
if self.expected == 0 or self.received < self.expected:
print(event.message.body)
self.received += 1
if self.received == self.expected:
event.receiver.close()
event.connection.close()
# the on_transport_error event catches socket and authentication failures
def on_transport_error(self, event):
print("Transport error:", event.transport.condition)
MessagingHandler.on_transport_error(self, event)
def on_disconnected(self, event):
print("Disconnected")
p)class Send(MessagingHandler):
def __init__(self, url, address, messageText, username, password, QoS=1):
super(Send, self).__init__()
# amqp broker host url
self.url = url
# target amqp node address
self.address = address
# authentication credentials
self.username = username
self.password = password
self.messageText = messageText
# the message durability flag must be set to True for persistent messages
self.message_durability = True if QoS==2 else False
def on_start(self, event):
# select connection authenticate
if self.username:
# creates and establishes an amqp connection with the user credentials
conn = event.container.connect(url=self.url,
user=self.username,
password = self.password,
allow_insecure_mechs=True)
else:
# creates and establishes an amqp connection with anonymous credentials
conn = event.container.connect(url=self.url)
if conn:
# attaches sender link to transmit messages
event.container.create_sender(conn, target=self.address)
def on_sendable(self, event):
# creates message to send
msg = Message(body=self.messageText,durable=self.message_durability)
# sends message
event.sender.send(msg)
def on_accepted(self, event):
print("Message Sent!")
event.connection.close()
def on_rejected(self, event):
print("Broker", self.url, "Reject message:", event.delivery.tag)
event.connection.close()
# catches event for socket and authentication failures
def on_transport_error(self, event):
print("Transport error:", event.transport.condition)
MessagingHandler.on_transport_error(self, event)
def on_disconnected(self, event):
if event.transport and event.transport.condition :
print('disconnected with error : ', event.transport.condition)
event.connection.close()
p)def publish(url,address,messageText,username,password,QoS=1):
try:
# start proton event reactor
Container(Send(url, address, messageText, username, password, QoS)).run()
except KeyboardInterrupt: pass
p)def subscribe(url,address,username,password):
try:
Container(Recv(url,address,0,username,password)).run()
except KeyboardInterrupt: pass
/ link python functions to q functions
amqp_subscribe:.p.get[`subscribe;<]
amqp_publish:.p.get[`publish;<]