diff --git a/zengine/client_queue.py b/zengine/client_queue.py index 5e2a5bc1..9c9dceb9 100644 --- a/zengine/client_queue.py +++ b/zengine/client_queue.py @@ -24,6 +24,14 @@ ) from zengine.log import log +def get_mq_connection(): + connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) + channel = connection.channel() + if not channel.is_open: + channel.open() + return connection, channel + + class ClientQueue(object): """ User AMQP queue manager diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py index 7a58a753..69a02bbd 100644 --- a/zengine/messaging/lib.py +++ b/zengine/messaging/lib.py @@ -12,7 +12,7 @@ from passlib.handlers.pbkdf2 import pbkdf2_sha512 from pyoko.conf import settings -from zengine.client_queue import BLOCKING_MQ_PARAMS +from zengine.client_queue import BLOCKING_MQ_PARAMS, get_mq_connection from zengine.lib.cache import Cache from zengine.log import log @@ -37,8 +37,7 @@ class BaseUser(object): @classmethod def _connect_mq(cls): if cls.mq_connection is None or cls.mq_connection.is_closed: - cls.mq_connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) - cls.mq_channel = cls.mq_connection.channel() + cls.mq_connection, cls.mq_channel = get_mq_connection() return cls.mq_channel def get_avatar_url(self): @@ -66,7 +65,19 @@ def set_password(self, raw_password): def is_online(self, status=None): if status is None: return ConnectionStatus(self.key).get() or False - ConnectionStatus(self.key).set(status) + else: + mq_channel = self._connect_mq() + for sbs in self.subscriptions.objects.filter(): + mq_channel.basic_publish(exchange=sbs.channel.key, + routing_key='', + body=json.dumps({ + 'cmd': 'user_status', + 'channel_key': sbs.channel.key, + 'channel_name': sbs.name, + 'avatar_url': self.get_avatar_url(), + 'is_online': status, + })) + ConnectionStatus(self.key).set(status) def encrypt_password(self): @@ -118,7 +129,7 @@ def prv_exchange(self): return 'prv_%s' % str(self.key).lower() def bind_private_channel(self, sess_id): - mq_channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS).channel() + mq_channel = self._connect_mq() mq_channel.queue_declare(queue=sess_id, arguments={'x-expires': 40000}) log.debug("Binding private exchange to client queue: Q:%s --> E:%s" % (sess_id, self.prv_exchange)) diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index c2f276af..9b1328e6 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -17,18 +17,12 @@ from pyoko.exceptions import IntegrityError from pyoko.fields import DATE_TIME_FORMAT from pyoko.lib.utils import get_object_from_path -from zengine.client_queue import BLOCKING_MQ_PARAMS +from zengine.client_queue import BLOCKING_MQ_PARAMS, get_mq_connection from zengine.lib.utils import to_safe_str UserModel = get_object_from_path(settings.USER_MODEL) -def get_mq_connection(): - connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) - channel = connection.channel() - if not channel.is_open: - channel.open() - return connection, channel CHANNEL_TYPES = ( @@ -304,8 +298,13 @@ def get_actions_for(self, user): actions.append(('Remove from favorites', '_zops_remove_from_favorites')) else: actions.append(('Add to favorites', '_zops_favorite_message')) + + if user: - actions.extend([('Flag', '_zops_flag_message')]) + if FlaggedMessage.objects.filter(user=user, message=self).count(): + actions.append(('Remove Flag', '_zops_unflag_message')) + else: + actions.append(('Flag Message', '_zops_flag_message')) if self.sender == user: actions.extend([ ('Delete', '_zops_delete_message'), diff --git a/zengine/messaging/views.py b/zengine/messaging/views.py index 1faa4b9e..f2a793ff 100644 --- a/zengine/messaging/views.py +++ b/zengine/messaging/views.py @@ -22,25 +22,35 @@ .. code-block:: python - MSG_DICT = {'content': string, - 'title': string, - 'timestamp': datetime, - 'updated_at': datetime, - 'is_update': boolean, # false for new messages - # true if this is an updated message - 'channel_key': key, - 'sender_name': string, - 'sender_key': key, - 'type': int, - 'avatar_url': string, - 'key': key, - 'cmd': 'message', - 'attachments': [{ - 'description': string, - 'file_name': string, - 'url': string, - },] - } + MSG_DICT = { + 'content': string, + 'title': string, + 'timestamp': datetime, + 'updated_at': datetime, + 'is_update': boolean, # false for new messages + # true if this is an updated message + 'channel_key': key, + 'sender_name': string, + 'sender_key': key, + 'type': int, + 'avatar_url': string, + 'key': key, + 'cmd': 'message', + 'attachments': [{ + 'description': string, + 'file_name': string, + 'url': string, + },] +} + + + USER_STATUS_UPDATE = { + 'cmd': 'user_status', + 'channel_key': key, + 'channel_name': string, + 'avatar_url': string, + 'is_online': boolean, + } """ @@ -589,7 +599,6 @@ def delete_channel(current): current.output = {'status': 'Deleted', 'code': 200} - def edit_channel(current): """ Update channel name or description @@ -714,28 +723,44 @@ def flag_message(current): # request: { 'view':'_zops_flag_message', - 'message': { - 'key': key - 'flag': boolean, # true for flagging - # false for unflagging - } + 'message_key': key, } # response: { ' - 'status': string, # 'OK' for success - 'code': int, # 200 for success + 'status': 'Created', + 'code': 201, + } + + """ + current.output = {'status': 'Created', 'code': 201} + FlaggedMessage.objects.get_or_create(user_id=current.user_id, + message_id=current.input['key']) + + +def unflag_message(current): + """ + remove flag of a message + + .. code-block:: python + + # request: + { + 'view':'_zops_flag_message', + 'key': key, + } + # response: + { + ' + 'status': 'OK', + 'code': 200, } """ current.output = {'status': 'OK', 'code': 200} - if current.input['flag']: - FlaggedMessage.objects.get_or_create(current, - user_id=current.user_id, - message_id=current.input['key']) - else: - FlaggedMessage(current).objects.filter(user_id=current.user_id, - message_id=current.input['key']).delete() + + FlaggedMessage(current).objects.filter(user_id=current.user_id, + message_id=current.input['key']).delete() def get_message_actions(current): @@ -772,7 +797,7 @@ def add_to_favorites(current): # request: { 'view':'_zops_add_to_favorites, - 'message_key': key, + 'key': key, } # response: @@ -785,7 +810,7 @@ def add_to_favorites(current): """ msg = Message.objects.get(current.input['message_key']) current.output = {'status': 'Created', 'code': 201} - fav, new = Favorite.objects.get_or_create(user_id=current.user_id, message=msg['key']) + fav, new = Favorite.objects.get_or_create(user_id=current.user_id, message=msg) current.output['favorite_key'] = fav.key @@ -798,20 +823,20 @@ def remove_from_favorites(current): # request: { 'view':'_zops_remove_from_favorites, - 'message_key': key, + 'key': key, } # response: { - 'status': 'Deleted', + 'status': 'OK', 'code': 200 } """ try: - current.output = {'status': 'Deleted', 'code': 200} + current.output = {'status': 'OK', 'code': 200} Favorite(current).objects.get(user_id=current.user_id, - key=current.input['message_key']).delete() + key=current.input['key']).delete() except ObjectDoesNotExist: raise HTTPError(404, "") diff --git a/zengine/settings.py b/zengine/settings.py index d6ab49cc..ef0c0811 100644 --- a/zengine/settings.py +++ b/zengine/settings.py @@ -142,6 +142,7 @@ '_zops_delete_channel': 'zengine.messaging.views.delete_channel', '_zops_pin_channel': 'zengine.messaging.views.pin_channel', '_zops_flag_message': 'zengine.messaging.views.flag_message', + '_zops_unflag_message': 'zengine.messaging.views.unflag_message', # '_zops_': 'zengine.messaging.views.', } diff --git a/zengine/views/auth.py b/zengine/views/auth.py index d8571fdf..8f2bb95e 100644 --- a/zengine/views/auth.py +++ b/zengine/views/auth.py @@ -30,9 +30,7 @@ def logout(current): Args: current: :attr:`~zengine.engine.WFCurrent` object. """ - user_id = current.session.get('user_id') - if user_id: - KeepAlive(user_id).delete() + current.user.is_online(False) current.session.delete()