Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366
ref #66
ref #65
  • Loading branch information
evrenesat committed Jun 28, 2016
1 parent 70b3279 commit 5b712ab
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 53 deletions.
4 changes: 2 additions & 2 deletions zengine/client_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def get_sess_id(self):
return self.sess_id

def send_to_queue(self, message=None, json_message=None):
self.get_channel().basic_publish(exchange='',
routing_key=self.get_sess_id(),
self.get_channel().basic_publish(exchange=self.user_id or '',
routing_key=self.sess_id,
body=json_message or json.dumps(message))

def old_to_new_queue(self, old_sess_id):
Expand Down
41 changes: 19 additions & 22 deletions zengine/messaging/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ def __init__(self, user_id):


class BaseUser(object):
connection = None
channel = None
mq_connection = None
mq_channel = None

def _connect_mq(self):
if not self.connection is None or self.connection.is_closed:
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
self.channel = self.connection.channel()
return self.channel
if not self.mq_connection is None or self.mq_connection.is_closed:
self.mq_connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
self.mq_channel = self.mq_connection.channel()
return self.mq_channel

def get_avatar_url(self):
"""
Expand Down Expand Up @@ -90,7 +90,7 @@ def check_password(self, raw_password):

def get_role(self, role_id):
"""
Kullanıcıya ait Role nesnesini getirir.
Retrieves user's roles.
Args:
role_id (int)
Expand All @@ -105,7 +105,11 @@ def get_role(self, role_id):
def full_name(self):
return self.username

def send_message(self, title, message, sender=None, url=None, typ=1):
def bind_private_channel(self, sess_id):
mq_channel = self._connect_mq()
mq_channel.queue_bind(exchange='prv_%s' % self.key, queue=sess_id)

def send_notification(self, title, message, typ=1, url=None):
"""
sends message to users private mq exchange
Args:
Expand All @@ -117,17 +121,10 @@ def send_message(self, title, message, sender=None, url=None, typ=1):
"""
mq_channel = self._connect_mq()
mq_msg = dict(body=message, msg_title=title, url=url, typ=typ)
if sender:
mq_msg['sender_name'] = sender.full_name
mq_msg['sender_key'] = sender.key

mq_channel.basic_publish(exchange=self.key, body=json.dumps(mq_msg))
self._write_message_to_db(sender, message, title, url, typ)

def _write_message_to_db(self, sender, body, title, url, typ):
from zengine.messaging.model import Channel, Message
channel = Channel.objects.get(owner=self, is_private=True)
Message(channel=channel, sender=sender, msg_title=title,
body=body, receiver=self, url=url, typ=typ).save()
self.channel_set.channel.__class__.add_message(
channel_key='prv_%s' % self.key,
body=message,
title=title,
typ=typ,
url=url
)
19 changes: 10 additions & 9 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class Channel(Model):
is_direct: Represents a user-to-user direct message exchange
"""
channel = None
connection = None
mq_channel = None
mq_connection = None

name = field.String("Name")
code_name = field.String("Internal name")
Expand All @@ -62,7 +62,8 @@ class Meta:
unique_together = (('is_private', 'owner'),)

class Managers(ListNode):
user = UserModel(reverse_name='managed_channels')
user = UserModel()


@classmethod
def get_or_create_direct_channel(cls, initiator, receiver):
Expand Down Expand Up @@ -102,17 +103,17 @@ def get_last_messages(self):

@classmethod
def _connect_mq(cls):
if cls.connection is None or cls.connection.is_closed:
cls.connection, cls.channel = get_mq_connection()
return cls.channel
if cls.mq_connection is None or cls.mq_connection.is_closed:
cls.mq_connection, cls.mq_channel = get_mq_connection()
return cls.mq_channel

def create_exchange(self):
"""
Creates MQ exchange for this channel
Needs to be defined only once.
"""
channel = self._connect_mq()
channel.exchange_declare(exchange=self.code_name, exchange_type='fanout', durable=True)
mq_channel = self._connect_mq()
mq_channel.exchange_declare(exchange=self.code_name, exchange_type='fanout', durable=True)

def pre_creation(self):
if not self.code_name:
Expand Down Expand Up @@ -212,7 +213,7 @@ class Message(Model):
Notes:
Never use directly for creating new messages! Use these methods:
- Channel objects's **add_message()** method.
- User object's **set_message()** method. (which uses channel.add_message)
- User object's **set_message()** method. (which also uses channel.add_message)
"""
channel = Channel()
sender = UserModel(reverse_name='sent_messages')
Expand Down
3 changes: 2 additions & 1 deletion zengine/tornado_server/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@


class BlockingConnectionForHTTP(object):
REPLY_TIMEOUT = 10 # sec
REPLY_TIMEOUT = 100 # sec

def __init__(self):
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
Expand Down Expand Up @@ -216,6 +216,7 @@ def _on_output_queue_decleration(queue):
# exclusive=True
)


self.connection.channel(_on_output_channel_creation)

def redirect_incoming_message(self, sess_id, message, request):
Expand Down
18 changes: 5 additions & 13 deletions zengine/views/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,6 @@ class Login(SimpleView):
does the authentication at ``do`` stage.
"""

def _do_binding(self):
"""
Bind user's ephemeral session queue to user's durable private exchange
"""
from zengine.messaging.model import get_mq_connection
connection, channel = get_mq_connection()
channel.queue_bind(exchange=self.current.user_id,
queue=self.current.session.sess_id,
# routing_key="#"
)

def _user_is_online(self):
self.current.user.is_online(True)

Expand All @@ -72,6 +61,7 @@ def do_view(self):
Authenticate user with given credentials.
Connects user's queue and exchange
"""
self.current.output['login_process'] = True
self.current.task_data['login_successful'] = False
if self.current.is_auth:
self.current.output['cmd'] = 'upgrade'
Expand All @@ -82,8 +72,8 @@ def do_view(self):
self.current.input['password'])
self.current.task_data['login_successful'] = auth_result
if auth_result:
self._user_is_online()
self._do_binding()
self.current.user.is_online(True)
self.current.user.bind_private_channel(self.current.session.sess_id)
user_sess = UserSessionID(self.current.user_id)
old_sess_id = user_sess.get()
user_sess.set(self.current.session.sess_id)
Expand All @@ -104,7 +94,9 @@ def show_view(self):
"""
Show :attr:`LoginForm` form.
"""
self.current.output['login_process'] = True
if self.current.is_auth:
self.current.output['cmd'] = 'upgrade'
else:

self.current.output['forms'] = LoginForm(current=self.current).serialize()
14 changes: 8 additions & 6 deletions zengine/wf_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ def connect(self):
self.client_queue = ClientQueue()
self.input_channel = self.connection.channel()

self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, type='topic', durable=True)
self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE,
type='topic',
durable=True)
self.input_channel.queue_declare(queue=self.INPUT_QUEUE_NAME)
self.input_channel.queue_bind(exchange=self.INPUT_EXCHANGE, queue=self.INPUT_QUEUE_NAME)
log.info("Bind to queue named '%s' queue with exchange '%s'" % (self.INPUT_QUEUE_NAME, self.INPUT_EXCHANGE))
log.info("Bind to queue named '%s' queue with exchange '%s'" % (self.INPUT_QUEUE_NAME,
self.INPUT_EXCHANGE))

def run(self):
"""
Expand Down Expand Up @@ -172,11 +175,10 @@ def handle_message(self, ch, method, properties, body):

def send_output(self, output, sessid):
self.client_queue.sess_id = sessid
# TODO: This is ugly
if 'login_process' not in output:
self.client_queue.user_id = self.current.user_id
self.client_queue.send_to_queue(output)
# self.output_channel.basic_publish(exchange='',
# routing_key=sessid,
# body=json.dumps(output))
# except ConnectionClosed:


def run_workers(no_subprocess):
Expand Down

0 comments on commit 5b712ab

Please sign in to comment.