Skip to content

Commit

Permalink
everything updated (TM)
Browse files Browse the repository at this point in the history
rref #5367
rref #5366

ref #66
ref #65
  • Loading branch information
evrenesat committed Jul 28, 2016
1 parent 34be41d commit 38598b6
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 56 deletions.
23 changes: 23 additions & 0 deletions zengine/messaging/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,26 @@ def send_notification(self, title, message, typ=1, url=None):
typ=typ,
url=url
)

def send_client_cmd(self, data, cmd=None, via_queue=None):
"""
Send arbitrary cmd and data to client
if queue name passed by "via_queue" parameter,
that queue will be used instead of users private exchange.
Args:
data: dict
cmd: string
via_queue: queue name,
"""
mq_channel = self._connect_mq()
if cmd:
data['cmd'] = cmd
if via_queue:
mq_channel.basic_publish(exchange='',
routing_key=via_queue,
body=json.dumps(data))
else:
mq_channel.basic_publish(exchange=self.prv_exchange,
routing_key='',
body=json.dumps(data))
38 changes: 32 additions & 6 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

UserModel = get_object_from_path(settings.USER_MODEL)




CHANNEL_TYPES = (
# users private message hub
(5, "Private"),
Expand Down Expand Up @@ -142,6 +139,13 @@ def create_exchange(self):
exchange_type='fanout',
durable=True)

def delete_exchange(self):
"""
Deletes MQ exchange for this channel
Needs to be defined only once.
"""
mq_channel = self._connect_mq()
mq_channel.exchange_delete(exchange=self.code_name)

def pre_creation(self):
if not self.code_name:
Expand All @@ -157,6 +161,9 @@ def pre_creation(self):
else:
self.key = self.code_name

def post_delete(self):
self.delete_exchange()

def post_save(self):
self.create_exchange()
# self.subscribe_owner()
Expand All @@ -166,6 +173,7 @@ class Subscriber(Model):
"""
Permission model
"""

class Meta:
verbose_name = "Abonelik"
verbose_name_plural = "Abonelikler"
Expand Down Expand Up @@ -195,13 +203,25 @@ class Meta:
def __unicode__(self):
return "%s subscription of %s" % (self.name, self.user)


@classmethod
def _connect_mq(cls):
if cls.mq_connection is None or cls.mq_connection.is_closed:
cls.mq_connection, cls.mq_channel = get_mq_connection()
return cls.mq_channel

def get_channel_listing(self):
"""
serialized form for channel listing
"""
return {'name': self.name,
'key': self.channel.key,
'type': self.channel.typ,
'read_only': self.read_only,
'is_online': self.is_online(),
'actions': self.get_actions(),
'unread': self.unread_count()}

def get_actions(self):
actions = [
('Yukarı Sabitle', '_zops_pin_channel'),
Expand Down Expand Up @@ -263,11 +283,19 @@ def bind_to_channel(self):
channel = self._connect_mq()
channel.exchange_bind(source=self.channel.code_name, destination=self.user.prv_exchange)

def inform_subscriber(self):
if self.channel.typ != 5:
self.user.send_client_cmd(self.get_channel_listing(), 'channel_subscription')

def post_creation(self):
self.create_exchange()
self.bind_to_channel()
self.inform_subscriber()

def pre_creation(self):
if (self.channel.key == self.user.prv_exchange and
Subscriber.objects.filter(channel_id=self.user.prv_exchange).count()):
raise Exception("Duplicate private channel subscription for %s" % self.user)
if not self.name:
self.name = self.channel.name

Expand Down Expand Up @@ -300,7 +328,6 @@ class Message(Model):
- User object's **set_message()** method. (which also uses channel.add_message)
"""


class Meta:
verbose_name = "Mesaj"
verbose_name_plural = "Mesajlar"
Expand All @@ -323,7 +350,6 @@ def get_actions_for(self, user):
else:
actions.append(('Favorilere ekle', '_zops_favorite_message'))


if user:
if FlaggedMessage.objects.filter(user=user, message=self).count():
actions.append(('İşareti Kaldır', '_zops_unflag_message'))
Expand Down
31 changes: 14 additions & 17 deletions zengine/messaging/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@
'avatar_url': string,
'is_online': boolean,
}
CHANNEL_SUBSCRIPTION = {
'cmd': 'channel_subscription',
'channel_key': key,
'channel_name': string,
'avatar_url': string,
'is_online': boolean,
}
"""


Expand Down Expand Up @@ -282,13 +290,7 @@ def list_channels(current):
'channels': []}
for sbs in current.user.subscriptions.objects.filter(is_visible=True):
try:
current.output['channels'].append({'name': sbs.name,
'key': sbs.channel.key,
'type': sbs.channel.typ,
'read_only': sbs.read_only,
'is_online': sbs.is_online(),
'actions': sbs.get_actions(),
'unread': sbs.unread_count()})
current.output['channels'].append(sbs.get_channel_listing())
except ObjectDoesNotExist:
# FIXME: This should not happen,
log.exception("UNPAIRED DIRECT EXCHANGES!!!!")
Expand Down Expand Up @@ -690,16 +692,11 @@ def delete_channel(current):
'code': 200
}
"""
ch = Channel(current).objects.get(owner_id=current.user_id,
key=current.input['channel_key'])
for sbs in ch.subscriber_set.objects.filter():
sbs.delete()
for msg in ch.message_set.objects.filter():
msg.delete()
try:
ch.delete()
except:
log.exception("fix this!!!!!")
ch_key = current.input['channel_key']
ch = Channel(current).objects.get(owner_id=current.user_id, key=ch_key)
ch.delete()
Subscriber.objects.filter(channel_id=ch_key).delete()
Message.objects.filter(channel_id=ch_key).delete()
current.output = {'status': 'Deleted', 'code': 200}


Expand Down
53 changes: 26 additions & 27 deletions zengine/tornado_server/ws_to_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@


class BlockingConnectionForHTTP(object):
REPLY_TIMEOUT = 5 # sec
REPLY_TIMEOUT = 105 # sec

def __init__(self):
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
Expand Down Expand Up @@ -201,15 +201,16 @@ def register_websocket(self, sess_id, ws):
sess_id:
ws:
"""
log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid)
try:
user_id = sys.sessid_to_userid[sess_id]
self.websockets[user_id] = ws
except KeyError:
self.ask_for_user_id(sess_id)
self.websockets[sess_id] = ws
user_id = sess_id
self.create_out_channel(sess_id, user_id)
# log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid)
# try:
# user_id = sys.sessid_to_userid[sess_id]
# self.websockets[user_id] = ws
# except KeyError:
# self.ask_for_user_id(sess_id)
# self.websockets[sess_id] = ws
# user_id = sess_id
self.websockets[sess_id] = ws
self.create_out_channel(sess_id)

def inform_disconnection(self, sess_id):
self.in_channel.basic_publish(exchange='input_exc',
Expand All @@ -220,25 +221,23 @@ def inform_disconnection(self, sess_id):
_zops_remote_ip='')))

def unregister_websocket(self, sess_id):
user_id = sys.sessid_to_userid.get(sess_id, None)
# user_id = sys.sessid_to_userid.get(sess_id, None)
try:
self.inform_disconnection(sess_id)
del self.websockets[user_id]
del self.websockets[sess_id]
except KeyError:
log.exception("Non-existent websocket for %s" % user_id)
log.exception("Non-existent websocket for %s" % sess_id)
if sess_id in self.out_channels:
try:
self.out_channels[sess_id].close()
except ChannelClosed:
log.exception("Pika client (out) channel already closed")

def create_out_channel(self, sess_id, user_id):
def create_out_channel(self, sess_id):
def _on_output_channel_creation(channel):
def _on_output_queue_decleration(queue):
channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=user_id)
log.debug("BIND QUEUE TO WS Q.%s on Ch.%s WS.%s" % (sess_id,
channel.consumer_tags[0],
user_id))
channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=sess_id)
log.debug("BIND QUEUE TO WS Q.%s" % sess_id)
self.out_channels[sess_id] = channel

channel.queue_declare(callback=_on_output_queue_decleration,
Expand All @@ -263,17 +262,17 @@ def publish_incoming_message(self, message, sess_id):
body=json_encode(message))

def on_message(self, channel, method, header, body):
user_id = method.consumer_tag
log.debug("WS RPLY for %s: %s" % (user_id, body))
if user_id in self.websockets:
sess_id = method.consumer_tag
log.debug("WS RPLY for %s: %s" % (sess_id, body))
if sess_id in self.websockets:
log.info("write msg to client")
self.websockets[user_id].write_message(body)
self.websockets[sess_id].write_message(body)
# channel.basic_ack(delivery_tag=method.delivery_tag)
elif 'sessid_to_userid' in body:
reply = json_decode(body)
sys.sessid_to_userid[reply['sess_id']] = reply['user_id']
self.websockets[reply['user_id']] = self.websockets[reply['sess_id']]
del self.websockets[reply['sess_id']]
# elif 'sessid_to_userid' in body:
# reply = json_decode(body)
# sys.sessid_to_userid[reply['sess_id']] = reply['user_id']
# self.websockets[reply['user_id']] = self.websockets[reply['sess_id']]
# del self.websockets[reply['sess_id']]
channel.basic_ack(delivery_tag=method.delivery_tag)

# else:
Expand Down
17 changes: 11 additions & 6 deletions zengine/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#
# This file is licensed under the GNU General Public License v3
# (GPLv3). See LICENSE.txt for details.
from time import sleep

import falcon

from pyoko import fields
Expand Down Expand Up @@ -59,19 +61,22 @@ def _do_upgrade(self):
""" open websocket connection """
self.current.output['cmd'] = 'upgrade'
self.current.output['user_id'] = self.current.user_id
self.current.user.is_online(True)
self.terminate_existing_login()
self.current.user.is_online(True)
self.current.user.bind_private_channel(self.current.session.sess_id)
user_sess = UserSessionID(self.current.user_id)
user_sess.set(self.current.session.sess_id)

def terminate_existing_login(self):
existing_sess_id = UserSessionID(self.current.user_id).get()
if self.current.session.sess_id == existing_sess_id:
log.info("TERMINATE: this should not happen!")
if existing_sess_id:
self.current.user.unbind_private_channel(existing_sess_id)
Session(existing_sess_id).delete()
if existing_sess_id and self.current.session.sess_id != existing_sess_id:
if Session(existing_sess_id).delete():
log.info("EXISTING LOGIN DEDECTED, WE SHOULD LOGUT IT FIRST")
self.current.user.send_client_cmd({'error': "Login required", "code": 401},
via_queue=existing_sess_id)
self.current.user.unbind_private_channel(existing_sess_id)



def do_view(self):
"""
Expand Down

0 comments on commit 38598b6

Please sign in to comment.