From 5b712ab7abff0e732e8b86d9bbcb64bd245985af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Wed, 29 Jun 2016 00:27:54 +0300 Subject: [PATCH] rref #5367 rref #5366 ref zetaops/zengine#66 ref zetaops/zengine#65 --- zengine/client_queue.py | 4 +-- zengine/messaging/lib.py | 41 ++++++++++++------------- zengine/messaging/model.py | 19 ++++++------ zengine/tornado_server/queue_manager.py | 3 +- zengine/views/auth.py | 18 +++-------- zengine/wf_daemon.py | 14 +++++---- 6 files changed, 46 insertions(+), 53 deletions(-) diff --git a/zengine/client_queue.py b/zengine/client_queue.py index 6f0f0ea6..e4907805 100644 --- a/zengine/client_queue.py +++ b/zengine/client_queue.py @@ -54,8 +54,8 @@ def get_sess_id(self): return self.sess_id def send_to_queue(self, message=None, json_message=None): - self.get_channel().basic_publish(exchange='', - routing_key=self.get_sess_id(), + self.get_channel().basic_publish(exchange=self.user_id or '', + routing_key=self.sess_id, body=json_message or json.dumps(message)) def old_to_new_queue(self, old_sess_id): diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py index 597ff0e7..3349b7e9 100644 --- a/zengine/messaging/lib.py +++ b/zengine/messaging/lib.py @@ -30,14 +30,14 @@ def __init__(self, user_id): class BaseUser(object): - connection = None - channel = None + mq_connection = None + mq_channel = None def _connect_mq(self): - if not self.connection is None or self.connection.is_closed: - self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) - self.channel = self.connection.channel() - return self.channel + if not self.mq_connection is None or self.mq_connection.is_closed: + self.mq_connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) + self.mq_channel = self.mq_connection.channel() + return self.mq_channel def get_avatar_url(self): """ @@ -90,7 +90,7 @@ def check_password(self, raw_password): def get_role(self, role_id): """ - Kullanıcıya ait Role nesnesini getirir. + Retrieves user's roles. Args: role_id (int) @@ -105,7 +105,11 @@ def get_role(self, role_id): def full_name(self): return self.username - def send_message(self, title, message, sender=None, url=None, typ=1): + def bind_private_channel(self, sess_id): + mq_channel = self._connect_mq() + mq_channel.queue_bind(exchange='prv_%s' % self.key, queue=sess_id) + + def send_notification(self, title, message, typ=1, url=None): """ sends message to users private mq exchange Args: @@ -117,17 +121,10 @@ def send_message(self, title, message, sender=None, url=None, typ=1): """ - mq_channel = self._connect_mq() - mq_msg = dict(body=message, msg_title=title, url=url, typ=typ) - if sender: - mq_msg['sender_name'] = sender.full_name - mq_msg['sender_key'] = sender.key - - mq_channel.basic_publish(exchange=self.key, body=json.dumps(mq_msg)) - self._write_message_to_db(sender, message, title, url, typ) - - def _write_message_to_db(self, sender, body, title, url, typ): - from zengine.messaging.model import Channel, Message - channel = Channel.objects.get(owner=self, is_private=True) - Message(channel=channel, sender=sender, msg_title=title, - body=body, receiver=self, url=url, typ=typ).save() + self.channel_set.channel.__class__.add_message( + channel_key='prv_%s' % self.key, + body=message, + title=title, + typ=typ, + url=url + ) diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index dd750293..f2e734af 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -44,8 +44,8 @@ class Channel(Model): is_direct: Represents a user-to-user direct message exchange """ - channel = None - connection = None + mq_channel = None + mq_connection = None name = field.String("Name") code_name = field.String("Internal name") @@ -62,7 +62,8 @@ class Meta: unique_together = (('is_private', 'owner'),) class Managers(ListNode): - user = UserModel(reverse_name='managed_channels') + user = UserModel() + @classmethod def get_or_create_direct_channel(cls, initiator, receiver): @@ -102,17 +103,17 @@ def get_last_messages(self): @classmethod def _connect_mq(cls): - if cls.connection is None or cls.connection.is_closed: - cls.connection, cls.channel = get_mq_connection() - return cls.channel + if cls.mq_connection is None or cls.mq_connection.is_closed: + cls.mq_connection, cls.mq_channel = get_mq_connection() + return cls.mq_channel def create_exchange(self): """ Creates MQ exchange for this channel Needs to be defined only once. """ - channel = self._connect_mq() - channel.exchange_declare(exchange=self.code_name, exchange_type='fanout', durable=True) + mq_channel = self._connect_mq() + mq_channel.exchange_declare(exchange=self.code_name, exchange_type='fanout', durable=True) def pre_creation(self): if not self.code_name: @@ -212,7 +213,7 @@ class Message(Model): Notes: Never use directly for creating new messages! Use these methods: - Channel objects's **add_message()** method. - - User object's **set_message()** method. (which uses channel.add_message) + - User object's **set_message()** method. (which also uses channel.add_message) """ channel = Channel() sender = UserModel(reverse_name='sent_messages') diff --git a/zengine/tornado_server/queue_manager.py b/zengine/tornado_server/queue_manager.py index d06b8878..7df79801 100644 --- a/zengine/tornado_server/queue_manager.py +++ b/zengine/tornado_server/queue_manager.py @@ -51,7 +51,7 @@ class BlockingConnectionForHTTP(object): - REPLY_TIMEOUT = 10 # sec + REPLY_TIMEOUT = 100 # sec def __init__(self): self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) @@ -216,6 +216,7 @@ def _on_output_queue_decleration(queue): # exclusive=True ) + self.connection.channel(_on_output_channel_creation) def redirect_incoming_message(self, sess_id, message, request): diff --git a/zengine/views/auth.py b/zengine/views/auth.py index cc3dfc0f..f84c9f8d 100644 --- a/zengine/views/auth.py +++ b/zengine/views/auth.py @@ -53,17 +53,6 @@ class Login(SimpleView): does the authentication at ``do`` stage. """ - def _do_binding(self): - """ - Bind user's ephemeral session queue to user's durable private exchange - """ - from zengine.messaging.model import get_mq_connection - connection, channel = get_mq_connection() - channel.queue_bind(exchange=self.current.user_id, - queue=self.current.session.sess_id, - # routing_key="#" - ) - def _user_is_online(self): self.current.user.is_online(True) @@ -72,6 +61,7 @@ def do_view(self): Authenticate user with given credentials. Connects user's queue and exchange """ + self.current.output['login_process'] = True self.current.task_data['login_successful'] = False if self.current.is_auth: self.current.output['cmd'] = 'upgrade' @@ -82,8 +72,8 @@ def do_view(self): self.current.input['password']) self.current.task_data['login_successful'] = auth_result if auth_result: - self._user_is_online() - self._do_binding() + self.current.user.is_online(True) + self.current.user.bind_private_channel(self.current.session.sess_id) user_sess = UserSessionID(self.current.user_id) old_sess_id = user_sess.get() user_sess.set(self.current.session.sess_id) @@ -104,7 +94,9 @@ def show_view(self): """ Show :attr:`LoginForm` form. """ + self.current.output['login_process'] = True if self.current.is_auth: self.current.output['cmd'] = 'upgrade' else: + self.current.output['forms'] = LoginForm(current=self.current).serialize() diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index 4002bda3..d481f56a 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -62,10 +62,13 @@ def connect(self): self.client_queue = ClientQueue() self.input_channel = self.connection.channel() - self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, type='topic', durable=True) + self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, + type='topic', + durable=True) self.input_channel.queue_declare(queue=self.INPUT_QUEUE_NAME) self.input_channel.queue_bind(exchange=self.INPUT_EXCHANGE, queue=self.INPUT_QUEUE_NAME) - log.info("Bind to queue named '%s' queue with exchange '%s'" % (self.INPUT_QUEUE_NAME, self.INPUT_EXCHANGE)) + log.info("Bind to queue named '%s' queue with exchange '%s'" % (self.INPUT_QUEUE_NAME, + self.INPUT_EXCHANGE)) def run(self): """ @@ -172,11 +175,10 @@ def handle_message(self, ch, method, properties, body): def send_output(self, output, sessid): self.client_queue.sess_id = sessid + # TODO: This is ugly + if 'login_process' not in output: + self.client_queue.user_id = self.current.user_id self.client_queue.send_to_queue(output) - # self.output_channel.basic_publish(exchange='', - # routing_key=sessid, - # body=json.dumps(output)) - # except ConnectionClosed: def run_workers(no_subprocess):