From b3f8e53d2f005e2ec78e93985eb06e88df78d9c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Wed, 29 Jun 2016 02:32:01 +0300 Subject: [PATCH] rref #5367 rref #5366 ref zetaops/zengine#66 ref zetaops/zengine#65 --- zengine/client_queue.py | 8 +++++--- zengine/messaging/lib.py | 3 ++- zengine/tornado_server/queue_manager.py | 4 ++-- zengine/wf_daemon.py | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/zengine/client_queue.py b/zengine/client_queue.py index 50046a1a..bf0d8536 100644 --- a/zengine/client_queue.py +++ b/zengine/client_queue.py @@ -54,9 +54,11 @@ def get_sess_id(self): return self.sess_id def send_to_queue(self, message=None, json_message=None): - log.debug("Sending following message to %s queue:\n%s " % ( - self.sess_id, json_message or json.dumps(message))) - self.get_channel().publish(exchange=self.user_id or '', + exchange = self.user_id or '' + log.debug("Sending following message to %s queue, \"%s\" exchange:\n%s " % ( + self.sess_id, exchange, json_message or json.dumps(message))) + + self.get_channel().publish(exchange=exchange, routing_key=self.sess_id, body=json_message or json.dumps(message)) diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py index 4b85c975..8e3a97b6 100644 --- a/zengine/messaging/lib.py +++ b/zengine/messaging/lib.py @@ -106,7 +106,8 @@ def get_role(self, role_id): def full_name(self): return self.username - def bind_private_channel(self, sess_id): + @classmethod + def bind_private_channel(cls, sess_id): mq_channel = self._connect_mq() mq_channel.queue_bind(exchange='prv_%s' % self.key, queue=sess_id) diff --git a/zengine/tornado_server/queue_manager.py b/zengine/tornado_server/queue_manager.py index d490eeda..841a5736 100644 --- a/zengine/tornado_server/queue_manager.py +++ b/zengine/tornado_server/queue_manager.py @@ -190,10 +190,10 @@ def register_websocket(self, sess_id, ws): def inform_disconnection(self, sess_id): self.in_channel.basic_publish(exchange='input_exc', routing_key=sess_id, - body=json_encode({ + body=json_encode(dict(data={ 'view': 'mark_offline_user', 'sess_id': sess_id - })) + }))) def unregister_websocket(self, sess_id): try: diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index d481f56a..4ddd5694 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -166,7 +166,7 @@ def handle_message(self, ch, method, properties, body): raise err = traceback.format_exc() output = {'error': self._prepare_error_msg(err), "code": 500} - log.exception("Worker error occurred") + log.exception("Worker error occurred with messsage body:\n%s" % body) if 'callbackID' in input: output['callbackID'] = input['callbackID'] log.info("OUTPUT for %s: %s" % (sessid, output))