Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366
ref #66
ref #65
  • Loading branch information
evrenesat committed Jun 29, 2016
1 parent 864d172 commit cebba90
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions zengine/tornado_server/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _send_message(self, sess_id, input_data):
body=json_encode(input_data))

def _store_user_id(self, sess_id, body):
sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id']
sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id'].lower()

def _wait_for_reply(self, sess_id, input_data):
channel = self.create_channel()
Expand Down Expand Up @@ -194,8 +194,8 @@ def register_websocket(self, sess_id, ws):
ws:
"""
user_id = sys.sessid_to_userid[sess_id]
self.websockets[sess_id] = ws
channel = self.create_out_channel(sess_id)
self.websockets[user_id] = ws
self.create_out_channel(sess_id, user_id)

def inform_disconnection(self, sess_id):
self.in_channel.basic_publish(exchange='input_exc',
Expand All @@ -206,22 +206,25 @@ def inform_disconnection(self, sess_id):
_zops_remote_ip='')))

def unregister_websocket(self, sess_id):
user_id = sys.sessid_to_userid.get(sess_id, None)
try:
self.inform_disconnection(sess_id)
del self.websockets[sess_id]
del self.websockets[user_id]
except KeyError:
log.exception("Non-existent websocket")
log.exception("Non-existent websocket for %s" % user_id)
if sess_id in self.out_channels:
try:
self.out_channels[sess_id].close()
except ChannelClosed:
log.exception("Pika client (out) channel already closed")

def create_out_channel(self, sess_id):
def create_out_channel(self, sess_id, user_id):
def _on_output_channel_creation(channel):
def _on_output_queue_decleration(queue):
channel.basic_consume(self.on_message, queue=sess_id)
log.debug("BIND QUEUE TO WS Q.%s on Ch.%s" % (sess_id, channel.consumer_tags[0]))
log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id,
channel.consumer_tags[0],
user_id))
self.out_channels[sess_id] = channel

channel.queue_declare(callback=_on_output_queue_decleration,
Expand All @@ -243,10 +246,10 @@ def redirect_incoming_message(self, sess_id, message, request):
body=json_encode(message))

def on_message(self, channel, method, header, body):
sess_id = method.routing_key
log.debug("WS RPLY for %s: %s" % (sess_id, body))
if sess_id in self.websockets:
self.websockets[sess_id].write_message(body)
user_id = method.exchange[4:]
log.debug("WS RPLY for %s: %s" % (user_id, body))
if user_id in self.websockets:
self.websockets[user_id].write_message(body)

channel.basic_ack(delivery_tag=method.delivery_tag)
# else:
Expand Down

0 comments on commit cebba90

Please sign in to comment.