From 5f34aa7b0e78628955b2acec5346ed9b4d2c035c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Thu, 21 Jul 2016 16:43:07 +0300 Subject: [PATCH] rref #5367 rref #5366 ref zetaops/zengine#66 ref zetaops/zengine#65 --- zengine/management_commands.py | 5 ++- zengine/messaging/model.py | 48 ++++++++++++++++----------- zengine/messaging/views.py | 6 +++- zengine/tornado_server/ws_to_queue.py | 4 +-- zengine/wf_daemon.py | 2 +- 5 files changed, 41 insertions(+), 24 deletions(-) diff --git a/zengine/management_commands.py b/zengine/management_commands.py index 9229987f..fca3a793 100644 --- a/zengine/management_commands.py +++ b/zengine/management_commands.py @@ -210,7 +210,10 @@ def create_user_channels(self): sb, new = Subscriber.objects.get_or_create(channel=ch, user=usr, read_only=True, - name='Notifications') + name='Notifications', + can_manage=True, + can_leave=False + ) print("%s notify sub: %s" % ('created' if new else 'existing', ch.code_name)) diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index 394f12aa..c3ef6844 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -60,7 +60,9 @@ class Channel(Model): owner = UserModel(reverse_name='created_channels', null=True) def __unicode__(self): - return "%s (%s's %s channel)" % (self.name or '', self.owner.__unicode__(), self.get_typ_display()) + return "%s (%s's %s channel)" % (self.name or '', + self.owner.full_name, + self.get_typ_display()) # # class Managers(ListNode): @@ -86,17 +88,17 @@ def get_or_create_direct_channel(cls, initiator_key, receiver_key): code_name='%s_%s' % (receiver_key, initiator_key)) receiver_name = UserModel.objects.get(receiver_key).full_name if existing: - return existing[0], receiver_name + channel = existing[0] else: channel_name = '%s_%s' % (initiator_key, receiver_key) channel = cls(is_direct=True, code_name=channel_name, typ=10).save() - Subscriber(channel=channel, - user_id=initiator_key, - name=receiver_name).save() - Subscriber(channel=channel, - user_id=receiver_key, - name=UserModel.objects.get(initiator_key).full_name).save() - return channel, receiver_name + Subscriber.objects.get_or_create(channel=channel, + user_id=initiator_key, + name=receiver_name) + Subscriber.objects.get_or_create(channel=channel, + user_id=receiver_key, + name=UserModel.objects.get(initiator_key).full_name) + return channel, receiver_name @classmethod def add_message(cls, channel_key, body, title=None, sender=None, url=None, typ=2, @@ -104,6 +106,7 @@ def add_message(cls, channel_key, body, title=None, sender=None, url=None, typ=2 mq_channel = cls._connect_mq() msg_object = Message(sender=sender, body=body, msg_title=title, url=url, typ=typ, channel_id=channel_key, receiver=receiver, key=uuid4().hex) + msg_object.setattr('unsaved', True) mq_channel.basic_publish(exchange=channel_key, routing_key='', body=json.dumps(msg_object.serialize())) @@ -111,7 +114,7 @@ def add_message(cls, channel_key, body, title=None, sender=None, url=None, typ=2 def get_last_messages(self): # TODO: Try to refactor this with https://github.com/rabbitmq/rabbitmq-recent-history-exchange - return self.message_set.objects.filter()[:20] + return self.message_set.objects.filter().set_params(sort="timestamp asc")[:20] @classmethod def _connect_mq(cls): @@ -129,6 +132,12 @@ def create_exchange(self): exchange_type='fanout', durable=True) + def subscribe_owner(self): + sbs, new = Subscriber.objects.get_or_create(user=self.owner, + channel=self, + can_manage=True, + can_leave=False) + def pre_creation(self): if not self.code_name: if self.name: @@ -145,6 +154,7 @@ def pre_creation(self): def post_save(self): self.create_exchange() + # self.subscribe_owner() class Subscriber(Model): @@ -171,7 +181,7 @@ class Subscriber(Model): # status = field.Integer("Status", choices=SUBSCRIPTION_STATUS) def __unicode__(self): - return "%s >> %s" % (self.user.full_name, self.channel.__unicode__()) + return "%s subscription of %s" % (self.name, self.user) @classmethod def _connect_mq(cls): @@ -195,13 +205,14 @@ def get_actions(self): def is_online(self): # TODO: Cache this method if self.channel.typ == 10: - return self.channel.subscriber_set.objects.exclude(user=self.user).get().user.is_online() - + return self.channel.subscriber_set.objects.exclude( + user=self.user).get().user.is_online() def unread_count(self): # FIXME: track and return actual unread message count if self.last_seen_msg_time: - return self.channel.message_set.objects.filter(timestamp__lt=self.last_seen_msg_time).count() + return self.channel.message_set.objects.filter( + timestamp__lt=self.last_seen_msg_time).count() else: self.channel.message_set.objects.filter().count() @@ -215,7 +226,7 @@ def create_exchange(self): to be safe we always call it before binding to the channel we currently subscribe """ channel = self._connect_mq() - channel.exchange_declare(exchange='prv_%s' % self.user.key.lower(), + channel.exchange_declare(exchange=self.user.prv_exchange, exchange_type='fanout', durable=True) @@ -236,11 +247,11 @@ def post_creation(self): self.create_exchange() self.bind_to_channel() + def pre_creation(self): if not self.name: self.name = self.channel.name - MSG_TYPES = ( (1, "Info Notification"), (11, "Error Notification"), @@ -287,7 +298,6 @@ def get_actions_for(self, user): ('Edit', '_zops_edit_message') ]) - def serialize(self, user=None): """ Serializes message for given user. @@ -306,7 +316,7 @@ def serialize(self, user=None): 'type': self.typ, 'updated_at': self.updated_at, 'timestamp': self.timestamp.strftime(DATE_TIME_FORMAT), - 'is_update': self.exist, + 'is_update': hasattr(self, 'unsaved'), 'attachments': [attachment.serialize() for attachment in self.attachment_set], 'title': self.msg_title, 'sender_name': self.sender.full_name, @@ -330,7 +340,7 @@ def _republish(self): body=json.dumps(self.serialize())) def pre_save(self): - if self.exist: + if not hasattr(self, 'unsaved'): self._republish() diff --git a/zengine/messaging/views.py b/zengine/messaging/views.py index 26f14d44..fca75f07 100644 --- a/zengine/messaging/views.py +++ b/zengine/messaging/views.py @@ -295,6 +295,10 @@ def create_channel(current): description=current.input['description'], owner=current.user, typ=15).save() + sbs, new = Subscriber.objects.get_or_create(user=channel.owner, + channel=channel, + can_manage=True, + can_leave=False) current.output = { 'channel_key': channel.key, 'status': 'OK', @@ -600,7 +604,7 @@ def pin_channel(current): } """ try: - Subscriber(current).objects.get(user_id=current.user_id, + Subscriber(current).objects.filter(user_id=current.user_id, channel_id=current.input['channel_key']).update(pinned=True) current.output = {'status': 'OK', 'code': 200} except ObjectDoesNotExist: diff --git a/zengine/tornado_server/ws_to_queue.py b/zengine/tornado_server/ws_to_queue.py index 037333e3..ec7d9aa3 100644 --- a/zengine/tornado_server/ws_to_queue.py +++ b/zengine/tornado_server/ws_to_queue.py @@ -235,7 +235,7 @@ def unregister_websocket(self, sess_id): def create_out_channel(self, sess_id, user_id): def _on_output_channel_creation(channel): def _on_output_queue_decleration(queue): - channel.basic_consume(self.on_message, queue=sess_id) + channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=user_id) log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id, channel.consumer_tags[0], user_id)) @@ -263,7 +263,7 @@ def publish_incoming_message(self, message, sess_id): body=json_encode(message)) def on_message(self, channel, method, header, body): - user_id = method.exchange[4:] + user_id = method.consumer_tag log.debug("WS RPLY for %s: %s" % (user_id, body)) if user_id in self.websockets: log.info("write msg to client") diff --git a/zengine/wf_daemon.py b/zengine/wf_daemon.py index 4649ac15..5ad8efd7 100755 --- a/zengine/wf_daemon.py +++ b/zengine/wf_daemon.py @@ -175,7 +175,7 @@ def handle_message(self, ch, method, properties, body): def send_output(self, output): # TODO: This is ugly, we should separate login process - log.debug("SEND_OUTPUT: %s" % output) + # log.debug("SEND_OUTPUT: %s" % output) if self.current.user_id is None or 'login_process' in output: self.client_queue.send_to_default_exchange(self.sessid, output) else: