From 38598b66e13b3b25aaa6a397418215e22a185c9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Thu, 28 Jul 2016 22:51:36 +0300 Subject: [PATCH] everything updated (TM) rref #5367 rref #5366 ref zetaops/zengine#66 ref zetaops/zengine#65 --- zengine/messaging/lib.py | 23 ++++++++++++ zengine/messaging/model.py | 38 ++++++++++++++++--- zengine/messaging/views.py | 31 +++++++--------- zengine/tornado_server/ws_to_queue.py | 53 +++++++++++++-------------- zengine/views/auth.py | 17 ++++++--- 5 files changed, 106 insertions(+), 56 deletions(-) diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py index fc0d3469..f475e866 100644 --- a/zengine/messaging/lib.py +++ b/zengine/messaging/lib.py @@ -160,3 +160,26 @@ def send_notification(self, title, message, typ=1, url=None): typ=typ, url=url ) + + def send_client_cmd(self, data, cmd=None, via_queue=None): + """ + Send arbitrary cmd and data to client + + if queue name passed by "via_queue" parameter, + that queue will be used instead of users private exchange. + Args: + data: dict + cmd: string + via_queue: queue name, + """ + mq_channel = self._connect_mq() + if cmd: + data['cmd'] = cmd + if via_queue: + mq_channel.basic_publish(exchange='', + routing_key=via_queue, + body=json.dumps(data)) + else: + mq_channel.basic_publish(exchange=self.prv_exchange, + routing_key='', + body=json.dumps(data)) diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index 765d9e9b..bc1c22e9 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -22,9 +22,6 @@ UserModel = get_object_from_path(settings.USER_MODEL) - - - CHANNEL_TYPES = ( # users private message hub (5, "Private"), @@ -142,6 +139,13 @@ def create_exchange(self): exchange_type='fanout', durable=True) + def delete_exchange(self): + """ + Deletes MQ exchange for this channel + Needs to be defined only once. + """ + mq_channel = self._connect_mq() + mq_channel.exchange_delete(exchange=self.code_name) def pre_creation(self): if not self.code_name: @@ -157,6 +161,9 @@ def pre_creation(self): else: self.key = self.code_name + def post_delete(self): + self.delete_exchange() + def post_save(self): self.create_exchange() # self.subscribe_owner() @@ -166,6 +173,7 @@ class Subscriber(Model): """ Permission model """ + class Meta: verbose_name = "Abonelik" verbose_name_plural = "Abonelikler" @@ -195,13 +203,25 @@ class Meta: def __unicode__(self): return "%s subscription of %s" % (self.name, self.user) - @classmethod def _connect_mq(cls): if cls.mq_connection is None or cls.mq_connection.is_closed: cls.mq_connection, cls.mq_channel = get_mq_connection() return cls.mq_channel + def get_channel_listing(self): + """ + serialized form for channel listing + + """ + return {'name': self.name, + 'key': self.channel.key, + 'type': self.channel.typ, + 'read_only': self.read_only, + 'is_online': self.is_online(), + 'actions': self.get_actions(), + 'unread': self.unread_count()} + def get_actions(self): actions = [ ('Yukarı Sabitle', '_zops_pin_channel'), @@ -263,11 +283,19 @@ def bind_to_channel(self): channel = self._connect_mq() channel.exchange_bind(source=self.channel.code_name, destination=self.user.prv_exchange) + def inform_subscriber(self): + if self.channel.typ != 5: + self.user.send_client_cmd(self.get_channel_listing(), 'channel_subscription') + def post_creation(self): self.create_exchange() self.bind_to_channel() + self.inform_subscriber() def pre_creation(self): + if (self.channel.key == self.user.prv_exchange and + Subscriber.objects.filter(channel_id=self.user.prv_exchange).count()): + raise Exception("Duplicate private channel subscription for %s" % self.user) if not self.name: self.name = self.channel.name @@ -300,7 +328,6 @@ class Message(Model): - User object's **set_message()** method. (which also uses channel.add_message) """ - class Meta: verbose_name = "Mesaj" verbose_name_plural = "Mesajlar" @@ -323,7 +350,6 @@ def get_actions_for(self, user): else: actions.append(('Favorilere ekle', '_zops_favorite_message')) - if user: if FlaggedMessage.objects.filter(user=user, message=self).count(): actions.append(('İşareti Kaldır', '_zops_unflag_message')) diff --git a/zengine/messaging/views.py b/zengine/messaging/views.py index 927f01af..c0b28eed 100644 --- a/zengine/messaging/views.py +++ b/zengine/messaging/views.py @@ -52,6 +52,14 @@ 'avatar_url': string, 'is_online': boolean, } + + CHANNEL_SUBSCRIPTION = { + 'cmd': 'channel_subscription', + 'channel_key': key, + 'channel_name': string, + 'avatar_url': string, + 'is_online': boolean, + } """ @@ -282,13 +290,7 @@ def list_channels(current): 'channels': []} for sbs in current.user.subscriptions.objects.filter(is_visible=True): try: - current.output['channels'].append({'name': sbs.name, - 'key': sbs.channel.key, - 'type': sbs.channel.typ, - 'read_only': sbs.read_only, - 'is_online': sbs.is_online(), - 'actions': sbs.get_actions(), - 'unread': sbs.unread_count()}) + current.output['channels'].append(sbs.get_channel_listing()) except ObjectDoesNotExist: # FIXME: This should not happen, log.exception("UNPAIRED DIRECT EXCHANGES!!!!") @@ -690,16 +692,11 @@ def delete_channel(current): 'code': 200 } """ - ch = Channel(current).objects.get(owner_id=current.user_id, - key=current.input['channel_key']) - for sbs in ch.subscriber_set.objects.filter(): - sbs.delete() - for msg in ch.message_set.objects.filter(): - msg.delete() - try: - ch.delete() - except: - log.exception("fix this!!!!!") + ch_key = current.input['channel_key'] + ch = Channel(current).objects.get(owner_id=current.user_id, key=ch_key) + ch.delete() + Subscriber.objects.filter(channel_id=ch_key).delete() + Message.objects.filter(channel_id=ch_key).delete() current.output = {'status': 'Deleted', 'code': 200} diff --git a/zengine/tornado_server/ws_to_queue.py b/zengine/tornado_server/ws_to_queue.py index ec7d9aa3..b0e85342 100644 --- a/zengine/tornado_server/ws_to_queue.py +++ b/zengine/tornado_server/ws_to_queue.py @@ -52,7 +52,7 @@ class BlockingConnectionForHTTP(object): - REPLY_TIMEOUT = 5 # sec + REPLY_TIMEOUT = 105 # sec def __init__(self): self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) @@ -201,15 +201,16 @@ def register_websocket(self, sess_id, ws): sess_id: ws: """ - log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid) - try: - user_id = sys.sessid_to_userid[sess_id] - self.websockets[user_id] = ws - except KeyError: - self.ask_for_user_id(sess_id) - self.websockets[sess_id] = ws - user_id = sess_id - self.create_out_channel(sess_id, user_id) + # log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid) + # try: + # user_id = sys.sessid_to_userid[sess_id] + # self.websockets[user_id] = ws + # except KeyError: + # self.ask_for_user_id(sess_id) + # self.websockets[sess_id] = ws + # user_id = sess_id + self.websockets[sess_id] = ws + self.create_out_channel(sess_id) def inform_disconnection(self, sess_id): self.in_channel.basic_publish(exchange='input_exc', @@ -220,25 +221,23 @@ def inform_disconnection(self, sess_id): _zops_remote_ip=''))) def unregister_websocket(self, sess_id): - user_id = sys.sessid_to_userid.get(sess_id, None) + # user_id = sys.sessid_to_userid.get(sess_id, None) try: self.inform_disconnection(sess_id) - del self.websockets[user_id] + del self.websockets[sess_id] except KeyError: - log.exception("Non-existent websocket for %s" % user_id) + log.exception("Non-existent websocket for %s" % sess_id) if sess_id in self.out_channels: try: self.out_channels[sess_id].close() except ChannelClosed: log.exception("Pika client (out) channel already closed") - def create_out_channel(self, sess_id, user_id): + def create_out_channel(self, sess_id): def _on_output_channel_creation(channel): def _on_output_queue_decleration(queue): - channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=user_id) - log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id, - channel.consumer_tags[0], - user_id)) + channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=sess_id) + log.debug("BIND QUEUE TO WS Q.%s" % sess_id) self.out_channels[sess_id] = channel channel.queue_declare(callback=_on_output_queue_decleration, @@ -263,17 +262,17 @@ def publish_incoming_message(self, message, sess_id): body=json_encode(message)) def on_message(self, channel, method, header, body): - user_id = method.consumer_tag - log.debug("WS RPLY for %s: %s" % (user_id, body)) - if user_id in self.websockets: + sess_id = method.consumer_tag + log.debug("WS RPLY for %s: %s" % (sess_id, body)) + if sess_id in self.websockets: log.info("write msg to client") - self.websockets[user_id].write_message(body) + self.websockets[sess_id].write_message(body) # channel.basic_ack(delivery_tag=method.delivery_tag) - elif 'sessid_to_userid' in body: - reply = json_decode(body) - sys.sessid_to_userid[reply['sess_id']] = reply['user_id'] - self.websockets[reply['user_id']] = self.websockets[reply['sess_id']] - del self.websockets[reply['sess_id']] + # elif 'sessid_to_userid' in body: + # reply = json_decode(body) + # sys.sessid_to_userid[reply['sess_id']] = reply['user_id'] + # self.websockets[reply['user_id']] = self.websockets[reply['sess_id']] + # del self.websockets[reply['sess_id']] channel.basic_ack(delivery_tag=method.delivery_tag) # else: diff --git a/zengine/views/auth.py b/zengine/views/auth.py index c9aed142..793dd884 100644 --- a/zengine/views/auth.py +++ b/zengine/views/auth.py @@ -5,6 +5,8 @@ # # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. +from time import sleep + import falcon from pyoko import fields @@ -59,19 +61,22 @@ def _do_upgrade(self): """ open websocket connection """ self.current.output['cmd'] = 'upgrade' self.current.output['user_id'] = self.current.user_id - self.current.user.is_online(True) self.terminate_existing_login() + self.current.user.is_online(True) self.current.user.bind_private_channel(self.current.session.sess_id) user_sess = UserSessionID(self.current.user_id) user_sess.set(self.current.session.sess_id) def terminate_existing_login(self): existing_sess_id = UserSessionID(self.current.user_id).get() - if self.current.session.sess_id == existing_sess_id: - log.info("TERMINATE: this should not happen!") - if existing_sess_id: - self.current.user.unbind_private_channel(existing_sess_id) - Session(existing_sess_id).delete() + if existing_sess_id and self.current.session.sess_id != existing_sess_id: + if Session(existing_sess_id).delete(): + log.info("EXISTING LOGIN DEDECTED, WE SHOULD LOGUT IT FIRST") + self.current.user.send_client_cmd({'error': "Login required", "code": 401}, + via_queue=existing_sess_id) + self.current.user.unbind_private_channel(existing_sess_id) + + def do_view(self): """