From cebba90118af5b18e1b5c1db09df969b8d61efeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Thu, 30 Jun 2016 00:35:25 +0300 Subject: [PATCH] rref #5367 rref #5366 ref zetaops/zengine#66 ref zetaops/zengine#65 --- zengine/tornado_server/queue_manager.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/zengine/tornado_server/queue_manager.py b/zengine/tornado_server/queue_manager.py index 39b4bd05..c9465c16 100644 --- a/zengine/tornado_server/queue_manager.py +++ b/zengine/tornado_server/queue_manager.py @@ -72,7 +72,7 @@ def _send_message(self, sess_id, input_data): body=json_encode(input_data)) def _store_user_id(self, sess_id, body): - sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id'] + sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id'].lower() def _wait_for_reply(self, sess_id, input_data): channel = self.create_channel() @@ -194,8 +194,8 @@ def register_websocket(self, sess_id, ws): ws: """ user_id = sys.sessid_to_userid[sess_id] - self.websockets[sess_id] = ws - channel = self.create_out_channel(sess_id) + self.websockets[user_id] = ws + self.create_out_channel(sess_id, user_id) def inform_disconnection(self, sess_id): self.in_channel.basic_publish(exchange='input_exc', @@ -206,22 +206,25 @@ def inform_disconnection(self, sess_id): _zops_remote_ip=''))) def unregister_websocket(self, sess_id): + user_id = sys.sessid_to_userid.get(sess_id, None) try: self.inform_disconnection(sess_id) - del self.websockets[sess_id] + del self.websockets[user_id] except KeyError: - log.exception("Non-existent websocket") + log.exception("Non-existent websocket for %s" % user_id) if sess_id in self.out_channels: try: self.out_channels[sess_id].close() except ChannelClosed: log.exception("Pika client (out) channel already closed") - def create_out_channel(self, sess_id): + def create_out_channel(self, sess_id, user_id): def _on_output_channel_creation(channel): def _on_output_queue_decleration(queue): channel.basic_consume(self.on_message, queue=sess_id) - log.debug("BIND QUEUE TO WS Q.%s on Ch.%s" % (sess_id, channel.consumer_tags[0])) + log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id, + channel.consumer_tags[0], + user_id)) self.out_channels[sess_id] = channel channel.queue_declare(callback=_on_output_queue_decleration, @@ -243,10 +246,10 @@ def redirect_incoming_message(self, sess_id, message, request): body=json_encode(message)) def on_message(self, channel, method, header, body): - sess_id = method.routing_key - log.debug("WS RPLY for %s: %s" % (sess_id, body)) - if sess_id in self.websockets: - self.websockets[sess_id].write_message(body) + user_id = method.exchange[4:] + log.debug("WS RPLY for %s: %s" % (user_id, body)) + if user_id in self.websockets: + self.websockets[user_id].write_message(body) channel.basic_ack(delivery_tag=method.delivery_tag) # else: