diff --git a/zengine/client_queue.py b/zengine/client_queue.py index fbbd4d8a..c8a8197b 100644 --- a/zengine/client_queue.py +++ b/zengine/client_queue.py @@ -13,7 +13,7 @@ import time from pika.exceptions import ConnectionClosed, ChannelClosed -from zengine.lib.cache import UserSessionID + BLOCKING_MQ_PARAMS = pika.ConnectionParameters( host=settings.MQ_HOST, @@ -47,11 +47,6 @@ def get_channel(self): self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) self.channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS) return self.channel - # - # def get_sess_id(self): - # if not self.sess_id: - # self.sess_id = UserSessionID(self.user_id).get() - # return self.sess_id def send_to_default_exchange(self, sess_id, message=None): msg = json.dumps(message) @@ -65,27 +60,3 @@ def send_to_prv_exchange(self, user_id, message=None): log.debug("Sending following users \"%s\" exchange:\n%s " % (exchange, msg)) self.get_channel().publish(exchange=exchange, routing_key='', body=msg) - # def old_to_new_queue(self, old_sess_id): - # """ - # Somehow if users old (obsolete) queue has - # undelivered messages, we should redirect them to - # current queue. - # """ - # old_input_channel = self.connection.channel() - # while True: - # try: - # method_frame, header_frame, body = old_input_channel.basic_get(old_sess_id) - # if method_frame: - # self.send_to_queue(json_message=body) - # old_input_channel.basic_ack(method_frame.delivery_tag) - # else: - # old_input_channel.queue_delete(old_sess_id) - # old_input_channel.close() - # break - # except ChannelClosed as e: - # if e[0] == 404: - # break - # # e => (404, "NOT_FOUND - no queue 'sess_id' in vhost '/'") - # else: - # raise - # # old_input_channel = self.connection.channel() diff --git a/zengine/management_commands.py b/zengine/management_commands.py index 78acd24b..746147e3 100644 --- a/zengine/management_commands.py +++ b/zengine/management_commands.py @@ -8,6 +8,7 @@ # (GPLv3). See LICENSE.txt for details. import six +from pyoko.db.adapter.db_riak import BlockSave from pyoko.exceptions import ObjectDoesNotExist from pyoko.lib.utils import get_object_from_path from pyoko.manage import * @@ -200,9 +201,10 @@ def run(self): def create_user_channels(self): from zengine.messaging.model import Channel user_model = get_object_from_path(settings.USER_MODEL) - for usr in user_model.objects.filter(): - ch, new = Channel.objects.get_or_create(owner=usr, is_private=True) - print("%s exchange: %s" % ('created' if new else 'existing', ch.code_name)) + with BlockSave(Channel): + for usr in user_model.objects.filter(): + ch, new = Channel.objects.get_or_create(owner=usr, is_private=True) + print("%s exchange: %s" % ('created' if new else 'existing', ch.code_name)) def create_channel_exchanges(self): from zengine.messaging.model import Channel diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py index 3f397b60..a74009ff 100644 --- a/zengine/messaging/lib.py +++ b/zengine/messaging/lib.py @@ -108,7 +108,7 @@ def full_name(self): @property def prv_exchange(self): - return 'prv_%s' % self.key.lower() + return 'prv_%s' % str(self.key).lower() def bind_private_channel(self, sess_id): mq_channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS).channel() diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index b69c4cd9..cd3663b1 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -93,7 +93,7 @@ def get_or_create_direct_channel(cls, initiator, receiver): def add_message(self, channel_key, body, title=None, sender=None, url=None, typ=2, receiver=None): mq_channel = self._connect_mq() mq_msg = json.dumps(dict(sender=sender, body=body, msg_title=title, url=url, typ=typ)) - mq_channel.basic_publish(exchange=channel_key, body=mq_msg) + mq_channel.basic_publish(exchange=channel_key, routing_key='', body=mq_msg) return Message(sender=sender, body=body, msg_title=title, url=url, typ=typ, channel_id=channel_key, receiver=receiver).save() diff --git a/zengine/settings.py b/zengine/settings.py index c3d0cf0f..ba94d4f3 100644 --- a/zengine/settings.py +++ b/zengine/settings.py @@ -114,6 +114,7 @@ #: ('URI template', 'python path to view method/class'), VIEW_URLS = { 'dashboard': 'zengine.views.menu.Menu', + 'sessid_to_userid': 'zengine.views.system.sessid_to_userid', 'ping': 'zengine.views.dev_utils.Ping', '_zops_create_message': 'zengine.messaging.views.create_message', 'mark_offline_user': 'zengine.messaging.views.mark_offline_user', diff --git a/zengine/tornado_server/get_logger.py b/zengine/tornado_server/get_logger.py index 06e8cca8..a0a84036 100644 --- a/zengine/tornado_server/get_logger.py +++ b/zengine/tornado_server/get_logger.py @@ -24,8 +24,8 @@ def get_logger(settings): # create formatter if settings.DEBUG: - # make log messages concise and readble for developemnt - format_str = '%(created)d - %(filename)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s' + # make log messages more readable at development + format_str = '%(asctime)s - %(filename)s:%(lineno)d %(module)s.%(funcName)s \n> %(message)s\n\n' else: format_str = '%(asctime)s - %(process)d - %(pathname)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s' diff --git a/zengine/tornado_server/server.py b/zengine/tornado_server/server.py index 671d535f..7c13e8b7 100644 --- a/zengine/tornado_server/server.py +++ b/zengine/tornado_server/server.py @@ -15,7 +15,7 @@ from tornado.httpclient import HTTPError sys.path.insert(0, os.path.realpath(os.path.dirname(__file__))) -from queue_manager import QueueManager, BlockingConnectionForHTTP, log +from ws_to_queue import QueueManager, BlockingConnectionForHTTP, log COOKIE_NAME = 'zopsess' DEBUG = os.getenv("DEBUG", False) diff --git a/zengine/tornado_server/queue_manager.py b/zengine/tornado_server/ws_to_queue.py similarity index 91% rename from zengine/tornado_server/queue_manager.py rename to zengine/tornado_server/ws_to_queue.py index 76ab68d1..4c004af3 100644 --- a/zengine/tornado_server/queue_manager.py +++ b/zengine/tornado_server/ws_to_queue.py @@ -186,6 +186,13 @@ def on_input_queue_declare(self, queue): exchange='input_exc', queue=self.INPUT_QUEUE_NAME, routing_key="#") + def ask_for_user_id(self, sess_id): + log.debug(sess_id) + # TODO: add remote ip + self.publish_incoming_message({'view': 'sessid_to_userid', + '_zops_remote_ip': '', + }, sess_id) + def register_websocket(self, sess_id, ws): """ @@ -195,9 +202,14 @@ def register_websocket(self, sess_id, ws): ws: """ log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid) - user_id = sys.sessid_to_userid[sess_id] + try: + user_id = sys.sessid_to_userid[sess_id] + except KeyError: + self.ask_for_user_id(sess_id) + self.websockets[sess_id] = ws self.websockets[user_id] = ws self.create_out_channel(sess_id, user_id) + return True def inform_disconnection(self, sess_id): self.in_channel.basic_publish(exchange='input_exc', @@ -243,6 +255,9 @@ def redirect_incoming_message(self, sess_id, message, request): message = json_decode(message) message['_zops_sess_id'] = sess_id message['_zops_remote_ip'] = request.remote_ip + self.publish_incoming_message(message, sess_id) + + def publish_incoming_message(self, message, sess_id): self.in_channel.basic_publish(exchange='input_exc', routing_key=sess_id, body=json_encode(message)) @@ -252,7 +267,11 @@ def on_message(self, channel, method, header, body): log.debug("WS RPLY for %s: %s" % (user_id, body)) if user_id in self.websockets: self.websockets[user_id].write_message(body) - + channel.basic_ack(delivery_tag=method.delivery_tag) + elif 'sessid_to_userid' in body: + reply = json_decode(body) + sys.sessid_to_userid[reply['sess_id']] = reply['user_id'] + self.websockets[reply['user_id']] = self.websockets[reply['sess_id']] channel.basic_ack(delivery_tag=method.delivery_tag) # else: # channel.basic_reject(delivery_tag=method.delivery_tag) diff --git a/zengine/views/system.py b/zengine/views/system.py new file mode 100644 index 00000000..8989045c --- /dev/null +++ b/zengine/views/system.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. + +from zengine.lib.cache import UserSessionID + +def sessid_to_userid(current): + current.output['user_id'] = current.user_id.lower() + current.output['sess_id'] = current.session.sess_id + current.output['sessid_to_userid'] = True