Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366
ref #66
ref #65
  • Loading branch information
evrenesat committed Jun 22, 2016
1 parent 849249a commit 989d1f9
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 20 deletions.
17 changes: 17 additions & 0 deletions zengine/management_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,20 @@ def run(self):
else:
worker = Worker()
worker.run()


class PrepareMQ(Command):
"""
Creates necessary exchanges, queues and bindings
"""
CMD_NAME = 'preparemq'
HELP = 'Creates necessary exchanges, queues and bindings'

def run(self):
from zengine.wf_daemon import run_workers, Worker
worker_count = int(self.manager.args.workers or 1)
if worker_count > 1:
run_workers(worker_count)
else:
worker = Worker()
worker.run()
27 changes: 27 additions & 0 deletions zengine/messaging/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
"""
"""

# Copyright (C) 2015 ZetaOps Inc.
#
# This file is licensed under the GNU General Public License v3
# (GPLv3). See LICENSE.txt for details.
import pika


class BaseUser(object):
connection = None
channel = None


def _connect_mq(self):
if not self.connection is None or self.connection.is_closed:
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
self.channel = selfconnection.channel()
return self.channel


def send_message(self, title, message, sender=None, url=None, typ=1):
channel = self._connect_mq()
mq_msg = json.dumps(dict(sender=sender, body=message, msg_title=title, url=url, typ=typ))
channel.basic_publish(exchange=self.key, body=mq_msg)
31 changes: 19 additions & 12 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,34 @@
UserModel = get_object_from_path(settings.USER_MODEL)



def get_mq_connection():
connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
channel = connection.channel()
return connection, channel


CHANNEL_TYPES = (
# (1, "Notification"),
(10, "System Broadcast"),
(15, "User Broadcast"),
(20, "Chat"),
(25, "Direct"),
)
# CHANNEL_TYPES = (
# (1, "Notification"),
# (10, "System Broadcast"),
# (20, "Chat"),
# (25, "Direct"),
# )


class Channel(Model):
channel = None
connection = None

name = field.String("Name")
code_name = field.String("Internal name")
description = field.String("Description")
owner = UserModel(reverse_name='created_channels')
typ = field.Integer("Type", choices=CHANNEL_TYPES)
# is this users private exchange
is_private = field.Boolean()
# is this a One-To-One channel
is_direct = field.Boolean()
# typ = field.Integer("Type", choices=CHANNEL_TYPES)

class Managers(ListNode):
user = UserModel(reverse_name='managed_channels')
Expand All @@ -50,7 +57,8 @@ def add_message(self, body, title, sender=None, url=None, typ=2):
Message(sender=sender, body=body, msg_title=title, url=url, typ=typ, channel=self).save()

def _connect_mq(self):
self.connection, self.channel = get_mq_connection()
if not self.connection is None or self.connection.is_closed:
self.connection, self.channel = get_mq_connection()
return self.channel

def create_exchange(self):
Expand Down Expand Up @@ -113,7 +121,7 @@ def __unicode__(self):
(11, "Error"),
(111, "Success"),
(2, "Direct Message"),
(3, "Broadcast Message")
(3, "Broadcast Message"),
(4, "Channel Message")
)
MESSAGE_STATUS = (
Expand All @@ -130,7 +138,6 @@ class Message(Model):
"""
Permission model
"""

typ = field.Integer("Type", choices=MSG_TYPES)
status = field.Integer("Status", choices=MESSAGE_STATUS)
msg_title = field.String("Title")
Expand Down Expand Up @@ -172,7 +179,7 @@ def __unicode__(self):

class Favorite(Model):
"""
A model to store users favorited messages
A model to store users bookmarked messages
"""
channel = Channel()
user = UserModel()
Expand Down
14 changes: 8 additions & 6 deletions zengine/tornado_server/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def create_channel(self):

def _send_message(self, sess_id, input_data):
log.info("sending data for %s" % sess_id)
self.input_channel.basic_publish(exchange='tornado_input',
self.input_channel.basic_publish(exchange='input_exc',
routing_key=sess_id,
body=json_encode(input_data))

Expand Down Expand Up @@ -160,7 +160,7 @@ def on_conn_open(self, channel):
Args:
channel: input channel
"""
self.in_channel.exchange_declare(exchange='tornado_input', type='topic')
self.in_channel.exchange_declare(exchange='input_exc', type='topic', durable=True)
channel.queue_declare(callback=self.on_input_queue_declare, queue=self.INPUT_QUEUE_NAME)

def on_input_queue_declare(self, queue):
Expand All @@ -172,7 +172,7 @@ def on_input_queue_declare(self, queue):
queue: input queue
"""
self.in_channel.queue_bind(callback=None,
exchange='tornado_input',
exchange='input_exc',
queue=self.INPUT_QUEUE_NAME,
routing_key="#")

Expand Down Expand Up @@ -212,10 +212,12 @@ def _on_output_queue_decleration(queue):
self.connection.channel(_on_output_channel_creation)

def redirect_incoming_message(self, sess_id, message, request):
message = message[:-1] + ',"_zops_remote_ip":"%s"}' % request.remote_ip
self.in_channel.basic_publish(exchange='tornado_input',
message = json_decode(message)
message['_zops_sess_id'] = sess_id
message['_zops_remote_ip'] = request.remote_ip
self.in_channel.basic_publish(exchange='input_exc',
routing_key=sess_id,
body=message)
body=json_encode(message))

def on_message(self, channel, method, header, body):
sess_id = method.routing_key
Expand Down
4 changes: 2 additions & 2 deletions zengine/wf_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Worker(object):
Workflow runner worker object
"""
INPUT_QUEUE_NAME = 'in_queue'
INPUT_EXCHANGE = 'tornado_input'
INPUT_EXCHANGE = 'input_exc'

def __init__(self):
self.connect()
Expand All @@ -62,7 +62,7 @@ def connect(self):
self.client_queue = ClientQueue()
self.input_channel = self.connection.channel()

self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, type='topic')
self.input_channel.exchange_declare(exchange=self.INPUT_EXCHANGE, type='topic', durable=True)
self.input_channel.queue_declare(queue=self.INPUT_QUEUE_NAME)
self.input_channel.queue_bind(exchange=self.INPUT_EXCHANGE, queue=self.INPUT_QUEUE_NAME)
log.info("Bind to queue named '%s' queue with exchange '%s'" % (self.INPUT_QUEUE_NAME, self.INPUT_EXCHANGE))
Expand Down

0 comments on commit 989d1f9

Please sign in to comment.