From 849249af224372270854902ed939bab8a1d7fe34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Tue, 21 Jun 2016 10:56:05 +0300 Subject: [PATCH] rref #5367 rref #5366 ref zetaops/zengine#66 ref zetaops/zengine#65 --- zengine/messaging/model.py | 109 +++++++++++++++++++++++-------------- zengine/views/auth.py | 13 +++++ 2 files changed, 82 insertions(+), 40 deletions(-) diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index 3d7b5f5d..e3182c34 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -6,6 +6,8 @@ # # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. +import json + import pika from pyoko import Model, field, ListNode @@ -15,46 +17,23 @@ UserModel = get_object_from_path(settings.USER_MODEL) -MSG_TYPES = ( - (1, "Info"), - (11, "Error"), - (111, "Success"), - (2, "Direct Message"), - (3, "Broadcast Message") - (4, "Channel Message") -) + +def get_mq_connection(): + connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) + channel = connection.channel() + return connection, channel + CHANNEL_TYPES = ( + # (1, "Notification"), (10, "System Broadcast"), - (10, "User Broadcast"), - (15, "Direct"), + (15, "User Broadcast"), (20, "Chat"), + (25, "Direct"), ) -MESSAGE_STATUS = ( - (1, "Created"), - (11, "Transmitted"), - (22, "Seen"), - (33, "Read"), - (44, "Archived"), - -) -ATTACHMENT_TYPES = ( - (1, "Document"), - (11, "Spreadsheet"), - (22, "Image"), - (33, "PDF"), - -) - - -def get_mq_connection(): - connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) - channel = connection.channel() - return connection, channel - -class Channel(Model): +class Channel(Model): name = field.String("Name") code_name = field.String("Internal name") description = field.String("Description") @@ -64,6 +43,11 @@ class Channel(Model): class Managers(ListNode): user = UserModel(reverse_name='managed_channels') + def add_message(self, body, title, sender=None, url=None, typ=2): + channel = self._connect_mq() + mq_msg = json.dumps(dict(sender=sender, body=body, msg_title=title, url=url, typ=typ)) + channel.basic_publish(exchange=self.code_name, body=mq_msg) + Message(sender=sender, body=body, msg_title=title, url=url, typ=typ, channel=self).save() def _connect_mq(self): self.connection, self.channel = get_mq_connection() @@ -71,11 +55,15 @@ def _connect_mq(self): def create_exchange(self): """ - This method creates MQ exch - which actually needed to be defined only once. + Creates MQ exchange for this channel + Needs to be defined only once. """ channel = self._connect_mq() - channel.exchange_declare(exchange=self.code_name) + channel.exchange_declare(exchange=self.code_name, exchange_type='fanout', durable=True) + + def post_creation(self): + self.create_exchange() + class Subscription(Model): """ @@ -87,6 +75,7 @@ class Subscription(Model): is_muted = field.Boolean("Mute the channel") inform_me = field.Boolean("Inform when I'm mentioned") can_leave = field.Boolean("Membership is not obligatory", default=True) + # status = field.Integer("Status", choices=SUBSCRIPTION_STATUS) def _connect_mq(self): @@ -95,18 +84,48 @@ def _connect_mq(self): def create_exchange(self): """ - This method creates user's private exchange - which actually needed to be defined only once. + Creates user's private exchange + Actually needed to be defined only once. + but since we don't know if it's exists or not + we always call it before """ channel = self._connect_mq() - channel.exchange_declare(exchange=self.user.key) - + channel.exchange_declare(exchange=self.user.key, exchange_type='direct', durable=True) + def bind_to_channel(self): + """ + Binds (subscribes) users private exchange to channel exchange + Automatically called at creation of subscription record. + """ + channel = self._connect_mq() + channel.exchange_bind(source=self.channel.code_name, destination=self.user.key) + def post_creation(self): + self.create_exchange() + self.bind_to_channel() def __unicode__(self): return "%s in %s" % (self.user, self.channel.name) + +MSG_TYPES = ( + (1, "Info"), + (11, "Error"), + (111, "Success"), + (2, "Direct Message"), + (3, "Broadcast Message") + (4, "Channel Message") +) +MESSAGE_STATUS = ( + (1, "Created"), + (11, "Transmitted"), + (22, "Seen"), + (33, "Read"), + (44, "Archived"), + +) + + class Message(Model): """ Permission model @@ -119,6 +138,7 @@ class Message(Model): url = field.String("URL") channel = Channel() sender = UserModel(reverse_name='sent_messages') + # FIXME: receiver should be removed after all of it's usages refactored to channels receiver = UserModel(reverse_name='received_messages') def __unicode__(self): @@ -126,6 +146,15 @@ def __unicode__(self): return "%s%s" % (content[:30], '...' if len(content) > 30 else '') +ATTACHMENT_TYPES = ( + (1, "Document"), + (11, "Spreadsheet"), + (22, "Image"), + (33, "PDF"), + +) + + class Attachment(Model): """ A model to store message attachments diff --git a/zengine/views/auth.py b/zengine/views/auth.py index 66a340c0..8afa7ce1 100644 --- a/zengine/views/auth.py +++ b/zengine/views/auth.py @@ -53,9 +53,21 @@ class Login(SimpleView): does the authentication at ``do`` stage. """ + def _do_binding(self): + """ + Bind user's ephemeral session queue to user's durable private exchange + """ + from zengine.messaging.model import get_mq_connection + connection, channel = get_mq_connection() + channel.queue_bind(exchange=self.current.user_id, + queue=self.current.session.sess_id, + # routing_key="#" + ) + def do_view(self): """ Authenticate user with given credentials. + Connects user's queue and exchange """ self.current.task_data['login_successful'] = False if self.current.is_auth: @@ -67,6 +79,7 @@ def do_view(self): self.current.input['password']) self.current.task_data['login_successful'] = auth_result if auth_result: + self._do_binding() user_sess = UserSessionID(self.current.user_id) old_sess_id = user_sess.get() user_sess.set(self.current.session.sess_id)