Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366
ref #66
ref #65
  • Loading branch information
evrenesat committed Jun 29, 2016
1 parent b3f8e53 commit 864d172
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 88 deletions.
5 changes: 2 additions & 3 deletions requirements/local_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,5 @@ lazy_object_proxy
enum34
werkzeug
pytest
celery
werkzeug
pytest
pika
tornado
79 changes: 41 additions & 38 deletions zengine/client_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ class ClientQueue(object):
"""
def __init__(self, user_id=None, sess_id=None):

self.user_id = user_id
# self.user_id = user_id
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
self.channel = self.connection.channel()
self.sess_id = sess_id
# self.sess_id = sess_id

def close(self):
self.channel.close()
Expand All @@ -47,42 +47,45 @@ def get_channel(self):
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
self.channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
return self.channel
#
# def get_sess_id(self):
# if not self.sess_id:
# self.sess_id = UserSessionID(self.user_id).get()
# return self.sess_id

def get_sess_id(self):
if not self.sess_id:
self.sess_id = UserSessionID(self.user_id).get()
return self.sess_id
def send_to_default_exchange(self, sess_id, message=None):
msg = json.dumps(message)
log.debug("Sending following message to %s queue through default exchange:\n%s" % (
sess_id, msg))
self.get_channel().publish(exchange='', routing_key=sess_id, body=msg)

def send_to_queue(self, message=None, json_message=None):
exchange = self.user_id or ''
log.debug("Sending following message to %s queue, \"%s\" exchange:\n%s " % (
self.sess_id, exchange, json_message or json.dumps(message)))
def send_to_prv_exchange(self, user_id, message=None):
exchange = 'prv_%s' % user_id.lower()
msg = json.dumps(message)
log.debug("Sending following users \"%s\" exchange:\n%s " % (exchange, msg))
self.get_channel().publish(exchange=exchange, routing_key='', body=msg)

self.get_channel().publish(exchange=exchange,
routing_key=self.sess_id,
body=json_message or json.dumps(message))

def old_to_new_queue(self, old_sess_id):
"""
Somehow if users old (obsolete) queue has
undelivered messages, we should redirect them to
current queue.
"""
old_input_channel = self.connection.channel()
while True:
try:
method_frame, header_frame, body = old_input_channel.basic_get(old_sess_id)
if method_frame:
self.send_to_queue(json_message=body)
old_input_channel.basic_ack(method_frame.delivery_tag)
else:
old_input_channel.queue_delete(old_sess_id)
old_input_channel.close()
break
except ChannelClosed as e:
if e[0] == 404:
break
# e => (404, "NOT_FOUND - no queue 'sess_id' in vhost '/'")
else:
raise
# old_input_channel = self.connection.channel()
# def old_to_new_queue(self, old_sess_id):
# """
# Somehow if users old (obsolete) queue has
# undelivered messages, we should redirect them to
# current queue.
# """
# old_input_channel = self.connection.channel()
# while True:
# try:
# method_frame, header_frame, body = old_input_channel.basic_get(old_sess_id)
# if method_frame:
# self.send_to_queue(json_message=body)
# old_input_channel.basic_ack(method_frame.delivery_tag)
# else:
# old_input_channel.queue_delete(old_sess_id)
# old_input_channel.close()
# break
# except ChannelClosed as e:
# if e[0] == 404:
# break
# # e => (404, "NOT_FOUND - no queue 'sess_id' in vhost '/'")
# else:
# raise
# # old_input_channel = self.connection.channel()
3 changes: 2 additions & 1 deletion zengine/lib/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def post(self, **data):
self.token = self.response_wrapper.token
return self.response_wrapper

def send_output(self, output, sessid):
def send_output(self, output):
self.response_wrapper = ResponseWrapper(output)


Expand Down Expand Up @@ -219,6 +219,7 @@ def _do_login(self):
assert all([(field in req_fields) for field in ('username', 'password')])
resp = self.client.post(username=self.client.username or self.client.user.username,
password="123", cmd="do")
log.debug("login result :\n%s" % resp.json)
assert resp.json['cmd'] == 'upgrade'

@staticmethod
Expand Down
18 changes: 12 additions & 6 deletions zengine/messaging/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pyoko.conf import settings
from zengine.client_queue import BLOCKING_MQ_PARAMS
from zengine.lib.cache import Cache

from zengine.log import log

class ConnectionStatus(Cache):
"""
Expand Down Expand Up @@ -106,10 +106,16 @@ def get_role(self, role_id):
def full_name(self):
return self.username

@classmethod
def bind_private_channel(cls, sess_id):
mq_channel = self._connect_mq()
mq_channel.queue_bind(exchange='prv_%s' % self.key, queue=sess_id)
@property
def prv_exchange(self):
return 'prv_%s' % self.key.lower()

def bind_private_channel(self, sess_id):
mq_channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS).channel()
mq_channel.queue_declare(queue=sess_id, arguments={'x-expires': 40000})
log.debug("Binding private exchange to client queue: Q:%s --> E:%s" % (sess_id,
self.prv_exchange))
mq_channel.queue_bind(exchange=self.prv_exchange, queue=sess_id)

def send_notification(self, title, message, typ=1, url=None):
"""
Expand All @@ -124,7 +130,7 @@ def send_notification(self, title, message, typ=1, url=None):
"""
self.channel_set.channel.__class__.add_message(
channel_key='prv_%s' % self.key,
channel_key=self.prv_exchange,
body=message,
title=title,
typ=typ,
Expand Down
18 changes: 12 additions & 6 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def create_exchange(self):
Needs to be defined only once.
"""
mq_channel = self._connect_mq()
mq_channel.exchange_declare(exchange=self.code_name, exchange_type='fanout', durable=True)
mq_channel.exchange_declare(exchange=self.code_name,
exchange_type='fanout',
durable=True)

def pre_creation(self):
if not self.code_name:
Expand All @@ -122,7 +124,7 @@ def pre_creation(self):
self.key = self.code_name
return
if self.owner and self.is_private:
self.code_name = "prv_%s" % to_safe_str(self.owner.key)
self.code_name = self.owner.prv_exchange
self.key = self.code_name
return
raise IntegrityError('Non-private and non-direct channels should have a "name".')
Expand Down Expand Up @@ -161,12 +163,16 @@ def unread_count(self):
def create_exchange(self):
"""
Creates user's private exchange
Actually needed to be defined only once.
but since we don't know if it's exists or not
we always call it before binding it to related channel
Actually user's private channel needed to be defined only once,
and this should be happened when user first created.
But since this has a little performance cost,
to be safe we always call it before binding to the channel we currently subscribe
"""
channel = self._connect_mq()
channel.exchange_declare(exchange=self.user.key, exchange_type='fanout', durable=True)
channel.exchange_declare(exchange='prv_%s' % self.user.key.lower(),
exchange_type='fanout',
durable=True)

@classmethod
def mark_seen(cls, key, datetime_str):
Expand Down
9 changes: 7 additions & 2 deletions zengine/tornado_server/get_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ def get_logger(settings):
# ch.setLevel(logging.DEBUG)

# create formatter
formatter = logging.Formatter(
'%(asctime)s - %(process)d - %(pathname)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s')
if settings.DEBUG:
# make log messages concise and readble for developemnt
format_str = '%(created)d - %(filename)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s'
else:
format_str = '%(asctime)s - %(process)d - %(pathname)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s'

formatter = logging.Formatter(format_str)

# add formatter to ch
ch.setFormatter(formatter)
Expand Down
28 changes: 21 additions & 7 deletions zengine/tornado_server/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import json
from uuid import uuid4

import os

import os, sys
sys.sessid_to_userid = {}
import pika
import time
from pika.adapters import TornadoConnection, BaseConnection
Expand All @@ -30,6 +30,7 @@
'MQ_PORT': int(os.environ.get('MQ_PORT', '5672')),
'MQ_USER': os.environ.get('MQ_USER', 'guest'),
'MQ_PASS': os.environ.get('MQ_PASS', 'guest'),
'DEBUG': os.environ.get('DEBUG', False),
'MQ_VHOST': os.environ.get('MQ_VHOST', '/'),
})
log = get_logger(settings)
Expand Down Expand Up @@ -70,9 +71,15 @@ def _send_message(self, sess_id, input_data):
routing_key=sess_id,
body=json_encode(input_data))

def _store_user_id(self, sess_id, body):
sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id']

def _wait_for_reply(self, sess_id, input_data):
channel = self.create_channel()
channel.queue_declare(queue=sess_id, auto_delete=True)
channel.queue_declare(queue=sess_id,
arguments={'x-expires': 4000}
# auto_delete=True
)
timeout_start = time.time()
while 1:
method_frame, header_frame, body = channel.basic_get(sess_id)
Expand All @@ -83,6 +90,8 @@ def _wait_for_reply(self, sess_id, input_data):
channel.basic_ack(method_frame.delivery_tag)
channel.close()
log.info('Returned view message for %s: %s' % (sess_id, body))
if 'upgrade' in body:
self._store_user_id(sess_id, body)
return body
else:
if time.time() - json_decode(body)['reply_timestamp'] > self.REPLY_TIMEOUT:
Expand Down Expand Up @@ -184,6 +193,7 @@ def register_websocket(self, sess_id, ws):
sess_id:
ws:
"""
user_id = sys.sessid_to_userid[sess_id]
self.websockets[sess_id] = ws
channel = self.create_out_channel(sess_id)

Expand All @@ -192,8 +202,8 @@ def inform_disconnection(self, sess_id):
routing_key=sess_id,
body=json_encode(dict(data={
'view': 'mark_offline_user',
'sess_id': sess_id
})))
'sess_id': sess_id,},
_zops_remote_ip='')))

def unregister_websocket(self, sess_id):
try:
Expand All @@ -211,16 +221,19 @@ def create_out_channel(self, sess_id):
def _on_output_channel_creation(channel):
def _on_output_queue_decleration(queue):
channel.basic_consume(self.on_message, queue=sess_id)

log.debug("BIND QUEUE TO WS Q.%s on Ch.%s" % (sess_id, channel.consumer_tags[0]))
self.out_channels[sess_id] = channel

channel.queue_declare(callback=_on_output_queue_decleration,
queue=sess_id,
arguments={'x-expires': 40000},
# auto_delete=True,
# exclusive=True
)

self.connection.channel(_on_output_channel_creation)


def redirect_incoming_message(self, sess_id, message, request):
message = json_decode(message)
message['_zops_sess_id'] = sess_id
Expand All @@ -231,9 +244,10 @@ def redirect_incoming_message(self, sess_id, message, request):

def on_message(self, channel, method, header, body):
sess_id = method.routing_key
log.debug("WS RPLY for %s: %s" % (sess_id, body))
if sess_id in self.websockets:
self.websockets[sess_id].write_message(body)
log.debug("WS RPLY for %s: %s" % (sess_id, body))

channel.basic_ack(delivery_tag=method.delivery_tag)
# else:
# channel.basic_reject(delivery_tag=method.delivery_tag)
31 changes: 18 additions & 13 deletions zengine/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class Login(SimpleView):
def _user_is_online(self):
self.current.user.is_online(True)

def _do_upgrade(self):
""" open websocket connection """
self.current.output['cmd'] = 'upgrade'
self.current.output['user_id'] = self.current.user_id
self.current.user.is_online(True)
self.current.user.bind_private_channel(self.current.session.sess_id)
user_sess = UserSessionID(self.current.user_id)
user_sess.set(self.current.session.sess_id)

def do_view(self):
"""
Authenticate user with given credentials.
Expand All @@ -64,24 +73,21 @@ def do_view(self):
self.current.output['login_process'] = True
self.current.task_data['login_successful'] = False
if self.current.is_auth:
self.current.output['cmd'] = 'upgrade'
self._do_upgrade()
else:
try:
auth_result = self.current.auth.authenticate(
self.current.input['username'],
self.current.input['password'])
self.current.task_data['login_successful'] = auth_result
if auth_result:
self.current.user.is_online(True)
self.current.user.bind_private_channel(self.current.session.sess_id)
user_sess = UserSessionID(self.current.user_id)
old_sess_id = user_sess.get()
user_sess.set(self.current.session.sess_id)
notify = Notify(self.current.user_id)
notify.cache_to_queue()
if old_sess_id:
notify.old_to_new_queue(old_sess_id)
self.current.output['cmd'] = 'upgrade'
self._do_upgrade()

# old_sess_id = user_sess.get()
# notify = Notify(self.current.user_id)
# notify.cache_to_queue()
# if old_sess_id:
# notify.old_to_new_queue(old_sess_id)
except:
raise
self.current.log.exception("Wrong username or another error occurred")
Expand All @@ -96,7 +102,6 @@ def show_view(self):
"""
self.current.output['login_process'] = True
if self.current.is_auth:
self.current.output['cmd'] = 'upgrade'
self._do_upgrade()
else:

self.current.output['forms'] = LoginForm(current=self.current).serialize()
Loading

0 comments on commit 864d172

Please sign in to comment.