Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366
ref #66
ref #65
  • Loading branch information
evrenesat committed Jun 30, 2016
1 parent bf87064 commit f13faf9
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 40 deletions.
31 changes: 1 addition & 30 deletions zengine/client_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import time
from pika.exceptions import ConnectionClosed, ChannelClosed

from zengine.lib.cache import UserSessionID


BLOCKING_MQ_PARAMS = pika.ConnectionParameters(
host=settings.MQ_HOST,
Expand Down Expand Up @@ -47,11 +47,6 @@ def get_channel(self):
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
self.channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
return self.channel
#
# def get_sess_id(self):
# if not self.sess_id:
# self.sess_id = UserSessionID(self.user_id).get()
# return self.sess_id

def send_to_default_exchange(self, sess_id, message=None):
msg = json.dumps(message)
Expand All @@ -65,27 +60,3 @@ def send_to_prv_exchange(self, user_id, message=None):
log.debug("Sending following users \"%s\" exchange:\n%s " % (exchange, msg))
self.get_channel().publish(exchange=exchange, routing_key='', body=msg)

# def old_to_new_queue(self, old_sess_id):
# """
# Somehow if users old (obsolete) queue has
# undelivered messages, we should redirect them to
# current queue.
# """
# old_input_channel = self.connection.channel()
# while True:
# try:
# method_frame, header_frame, body = old_input_channel.basic_get(old_sess_id)
# if method_frame:
# self.send_to_queue(json_message=body)
# old_input_channel.basic_ack(method_frame.delivery_tag)
# else:
# old_input_channel.queue_delete(old_sess_id)
# old_input_channel.close()
# break
# except ChannelClosed as e:
# if e[0] == 404:
# break
# # e => (404, "NOT_FOUND - no queue 'sess_id' in vhost '/'")
# else:
# raise
# # old_input_channel = self.connection.channel()
8 changes: 5 additions & 3 deletions zengine/management_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# (GPLv3). See LICENSE.txt for details.
import six

from pyoko.db.adapter.db_riak import BlockSave
from pyoko.exceptions import ObjectDoesNotExist
from pyoko.lib.utils import get_object_from_path
from pyoko.manage import *
Expand Down Expand Up @@ -200,9 +201,10 @@ def run(self):
def create_user_channels(self):
from zengine.messaging.model import Channel
user_model = get_object_from_path(settings.USER_MODEL)
for usr in user_model.objects.filter():
ch, new = Channel.objects.get_or_create(owner=usr, is_private=True)
print("%s exchange: %s" % ('created' if new else 'existing', ch.code_name))
with BlockSave(Channel):
for usr in user_model.objects.filter():
ch, new = Channel.objects.get_or_create(owner=usr, is_private=True)
print("%s exchange: %s" % ('created' if new else 'existing', ch.code_name))

def create_channel_exchanges(self):
from zengine.messaging.model import Channel
Expand Down
2 changes: 1 addition & 1 deletion zengine/messaging/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def full_name(self):

@property
def prv_exchange(self):
return 'prv_%s' % self.key.lower()
return 'prv_%s' % str(self.key).lower()

def bind_private_channel(self, sess_id):
mq_channel = pika.BlockingConnection(BLOCKING_MQ_PARAMS).channel()
Expand Down
2 changes: 1 addition & 1 deletion zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def get_or_create_direct_channel(cls, initiator, receiver):
def add_message(self, channel_key, body, title=None, sender=None, url=None, typ=2, receiver=None):
mq_channel = self._connect_mq()
mq_msg = json.dumps(dict(sender=sender, body=body, msg_title=title, url=url, typ=typ))
mq_channel.basic_publish(exchange=channel_key, body=mq_msg)
mq_channel.basic_publish(exchange=channel_key, routing_key='', body=mq_msg)
return Message(sender=sender, body=body, msg_title=title, url=url,
typ=typ, channel_id=channel_key, receiver=receiver).save()

Expand Down
1 change: 1 addition & 0 deletions zengine/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
#: ('URI template', 'python path to view method/class'),
VIEW_URLS = {
'dashboard': 'zengine.views.menu.Menu',
'sessid_to_userid': 'zengine.views.system.sessid_to_userid',
'ping': 'zengine.views.dev_utils.Ping',
'_zops_create_message': 'zengine.messaging.views.create_message',
'mark_offline_user': 'zengine.messaging.views.mark_offline_user',
Expand Down
4 changes: 2 additions & 2 deletions zengine/tornado_server/get_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def get_logger(settings):

# create formatter
if settings.DEBUG:
# make log messages concise and readble for developemnt
format_str = '%(created)d - %(filename)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s'
# make log messages more readable at development
format_str = '%(asctime)s - %(filename)s:%(lineno)d %(module)s.%(funcName)s \n> %(message)s\n\n'
else:
format_str = '%(asctime)s - %(process)d - %(pathname)s:%(lineno)d [%(module)s > %(funcName)s] - %(name)s - %(levelname)s - %(message)s'

Expand Down
2 changes: 1 addition & 1 deletion zengine/tornado_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from tornado.httpclient import HTTPError

sys.path.insert(0, os.path.realpath(os.path.dirname(__file__)))
from queue_manager import QueueManager, BlockingConnectionForHTTP, log
from ws_to_queue import QueueManager, BlockingConnectionForHTTP, log

COOKIE_NAME = 'zopsess'
DEBUG = os.getenv("DEBUG", False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ def on_input_queue_declare(self, queue):
exchange='input_exc',
queue=self.INPUT_QUEUE_NAME,
routing_key="#")
def ask_for_user_id(self, sess_id):
log.debug(sess_id)
# TODO: add remote ip
self.publish_incoming_message({'view': 'sessid_to_userid',
'_zops_remote_ip': '',
}, sess_id)


def register_websocket(self, sess_id, ws):
"""
Expand All @@ -195,9 +202,14 @@ def register_websocket(self, sess_id, ws):
ws:
"""
log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid)
user_id = sys.sessid_to_userid[sess_id]
try:
user_id = sys.sessid_to_userid[sess_id]
except KeyError:
self.ask_for_user_id(sess_id)
self.websockets[sess_id] = ws
self.websockets[user_id] = ws
self.create_out_channel(sess_id, user_id)
return True

def inform_disconnection(self, sess_id):
self.in_channel.basic_publish(exchange='input_exc',
Expand Down Expand Up @@ -243,6 +255,9 @@ def redirect_incoming_message(self, sess_id, message, request):
message = json_decode(message)
message['_zops_sess_id'] = sess_id
message['_zops_remote_ip'] = request.remote_ip
self.publish_incoming_message(message, sess_id)

def publish_incoming_message(self, message, sess_id):
self.in_channel.basic_publish(exchange='input_exc',
routing_key=sess_id,
body=json_encode(message))
Expand All @@ -252,7 +267,11 @@ def on_message(self, channel, method, header, body):
log.debug("WS RPLY for %s: %s" % (user_id, body))
if user_id in self.websockets:
self.websockets[user_id].write_message(body)

channel.basic_ack(delivery_tag=method.delivery_tag)
elif 'sessid_to_userid' in body:
reply = json_decode(body)
sys.sessid_to_userid[reply['sess_id']] = reply['user_id']
self.websockets[reply['user_id']] = self.websockets[reply['sess_id']]
channel.basic_ack(delivery_tag=method.delivery_tag)
# else:
# channel.basic_reject(delivery_tag=method.delivery_tag)
15 changes: 15 additions & 0 deletions zengine/views/system.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
"""
"""

# Copyright (C) 2015 ZetaOps Inc.
#
# This file is licensed under the GNU General Public License v3
# (GPLv3). See LICENSE.txt for details.

from zengine.lib.cache import UserSessionID

def sessid_to_userid(current):
current.output['user_id'] = current.user_id.lower()
current.output['sess_id'] = current.session.sess_id
current.output['sessid_to_userid'] = True

0 comments on commit f13faf9

Please sign in to comment.