Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366

ref #66
ref #65
  • Loading branch information
evrenesat committed Jul 20, 2016
1 parent 09da78b commit 6928e29
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 56 deletions.
50 changes: 50 additions & 0 deletions tests/async_amqp/messaging_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
"""
"""

# Copyright (C) 2015 ZetaOps Inc.
#
# This file is licensed under the GNU General Public License v3
# (GPLv3). See LICENSE.txt for details.
from zengine.lib.concurrent_amqp_test_client import ConcurrentTestCase, TestQueueManager


class TestCase(ConcurrentTestCase):
def test_channel_list(self):
self.post('ulakbus', {"view": "_zops_list_channels"})

def test_search_user(self):
self.post('ulakbus', {"view": "_zops_search_user",
"query": "x"})

def test_show_channel(self):
self.post('ulakbus',
{"view": "_zops_show_channel",
'channel_key': 'iG4mvjQrfkvTDvM6Jk56X5ILoJ_CoqwpemOHnknn3hYu1BlAghb3dm'})

def test_create_message(self):
self.post('ulakbus',
{"view": "_zops_create_message",
"message": dict(
body='test_body', title='testtitle',
channel='iG4mvjQrfkvTDvM6Jk56X5ILoJ_CoqwpemOHnknn3hYu1BlAghb3dm',
receiver='',
type=2
)})


def main():
from tornado import ioloop
# initiate amqp manager
ioloop = ioloop.IOLoop.instance()
qm = TestQueueManager(io_loop=ioloop)

# initiate test case
qm.set_test_class(TestCase)

qm.connect()
ioloop.start()


if __name__ == '__main__':
main()
116 changes: 81 additions & 35 deletions zengine/lib/concurrent_amqp_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
#
# This file is licensed under the GNU General Public License v3
# (GPLv3). See LICENSE.txt for details.
from __future__ import print_function

import inspect
import uuid
from pprint import pprint

import pika
from tornado.escape import json_encode, json_decode
Expand All @@ -29,6 +33,7 @@
from zengine.views.auth import Login

sys.sessid_to_userid = {}
sys.test_method_names = {}
UserModel = get_object_from_path(settings.USER_MODEL)


Expand Down Expand Up @@ -70,7 +75,6 @@ def __init__(self, queue_manager, username, sess_id=None):

self.request = type('MockWSRequestObject', (object,), {'remote_ip': '127.0.0.1'})
self.queue_manager = queue_manager
# order is important!
self.sess_id = sess_id or uuid.uuid4().hex
sys.sessid_to_userid[self.sess_id] = self.user.key.lower()
self.queue_manager.register_websocket(self.sess_id, self)
Expand All @@ -96,13 +100,17 @@ def backend_to_client(self, body):
self.message_stack[body['callbackID']] = body
log.info("WRITE MESSAGE TO CLIENT:\n%s" % (body,))

def client_to_backend(self, message, callback):
def client_to_backend(self, message, callback, caller_fn_name):
"""
from client to backend
"""
cbid = uuid.uuid4().hex
self.message_callbacks[cbid] = callback
message = json_encode({"callbackID": cbid, "data": message})
def cb(res):
print("Testing: %s :: " % caller_fn_name, end='')
callback(res, message)
# self.message_callbacks[cbid] = lambda res: callable(res, message)
self.message_callbacks[cbid] = cb
log.info("GOT MESSAGE FOR BACKEND %s: %s" % (self.sess_id, message))
self.queue_manager.redirect_incoming_message(self.sess_id, message, self.request)

Expand All @@ -116,47 +124,85 @@ class ConcurrentTestCase(object):
"""

def __init__(self, queue_manager):
from tornado import ioloop
log.info("ConcurrentTestCase class init with %s" % queue_manager)
ioloop = ioloop.IOLoop.instance()
self.ws1 = self.get_client('ulakbus')
self.ws2 = self.get_client('ogrenci_isleri_1')
self.queue_manager = TestQueueManager(io_loop=ioloop)
# initiate amqp manager
self.queue_manager.set_test_class(self.run_tests)
self.queue_manager.connect()
ioloop.start()

def get_client(self, username):
self.queue_manager = queue_manager
self.clients = {}
self.make_client('ulakbus')
self.test_fn_name = ''
self.run_tests()

def make_client(self, username):
"""
Args:
username: username for this client instance
Returns:
Logged in TestWSClient instance for given username
"""
return TestWSClient(self.queue_manager, username)
self.clients[username] = TestWSClient(self.queue_manager, username)



def post(self, username, data, callback=None):
if username not in self.clients:
self.make_client(username)
callback = callback or self.stc
self.clients[username].client_to_backend(data, callback, self.test_fn_name)

def run_tests(self):
for name in sorted(self.__class__.__dict__):
if name.startswith("test_"):
try:
getattr(self, name)()
print("%s succesfully passed" % name)
except:
print("%s FAIL" % name)

def success_test_callback(self, response, request=None):
# print(response)
assert response['code'] in (200, 201), "Process response not successful: \n %s \n %s" % (
response, request
)

def test_channel_list(self):
self.ws1.client_to_backend({"view": "_zops_list_channels"},
self.success_test_callback)

def test_search_user(self):
self.ws1.client_to_backend({"view": "_zops_search_user",
"query": "x"},
self.success_test_callback)
self.test_fn_name = name[5:]
getattr(self, name)()

def process_error_reponse(self, resp):
if 'error' in resp:
print(resp['error'].replace('\\n','\n').replace('u\\', ''))
return True

def stc(self, response, request=None):
"""
STC means Success Test Callback. Looks for 200 or 201 codes in response code.
Args:
response:
request:
"""
if not response['code'] in (200, 201):
print("FAILED: Response not successful: \n")
if not self.process_error_reponse(response):
print("\nRESP:\n%s")
print("\nREQ:\n %s" % (response, request))
else:
print("PASS!\n")

def pstc(self, response, request=None):
"""
Same as self.stc() (success request callback) but printing response/request
for debugging purposes
Args:
response:
request:
"""
self.stc(response, request)
print("\n\n=================\n\nRESPONSE: %s \n\nREQUEST: %s\n" % (response, request))



def main():
from tornado import ioloop
# initiate amqp manager
ioloop = ioloop.IOLoop.instance()
qm = TestQueueManager(io_loop=ioloop)

# initiate test case
qm.set_test_class(ConcurrentTestCase)

qm.connect()
ioloop.start()


if __name__ == '__main__':
main()
26 changes: 12 additions & 14 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pyoko import Model, field, ListNode
from pyoko.conf import settings
from pyoko.exceptions import IntegrityError
from pyoko.fields import DATE_TIME_FORMAT
from pyoko.lib.utils import get_object_from_path
from zengine.client_queue import BLOCKING_MQ_PARAMS
from zengine.lib.utils import to_safe_str
Expand Down Expand Up @@ -276,18 +277,15 @@ class Message(Model):
url = field.String("URL")

def get_actions_for(self, user):
actions = [
('Favorite', '_zops_favorite_message')
]
if self.sender == user:
actions.extend([
('Delete', '_zops_delete_message'),
('Edit', '_zops_edit_message')
])
elif user:
actions.extend([
('Flag', '_zops_flag_message')
])
actions = [('Favorite', '_zops_favorite_message')]
if user:
actions.extend([('Flag', '_zops_flag_message')])
if self.sender == user:
actions.extend([
('Delete', '_zops_delete_message'),
('Edit', '_zops_edit_message')
])


def serialize(self, user=None):
"""
Expand All @@ -305,8 +303,8 @@ def serialize(self, user=None):
return {
'content': self.body,
'type': self.typ,
'updated_at': self.updated_at,
'timestamp': self.timestamp,
'updated_at': self.updated_at.strftime(DATE_TIME_FORMAT) if self.updated_at else None,
'timestamp': self.timestamp.strftime(DATE_TIME_FORMAT),
'is_update': self.exist,
'attachments': [attachment.serialize() for attachment in self.attachment_set],
'title': self.msg_title,
Expand Down
8 changes: 3 additions & 5 deletions zengine/messaging/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,13 @@ def create_message(current):
"""
msg = current.input['message']
ch = Channel(current).objects.get(msg['channel'])
msg_obj = ch.add_message(body=msg['body'], typ=msg['typ'], sender=current.user,
msg_obj = Channel.add_message(msg['channel'], body=msg['body'], typ=msg['type'], sender=current.user,
title=msg['title'], receiver=msg['receiver'] or None)
if 'attachment' in msg:
for atch in msg['attachments']:
# TODO: Attachment type detection
typ = current._dedect_file_type(atch['name'], atch['content'])
Attachment(channel=ch, msg=msg_obj, name=atch['name'], file=atch['content'],
description=atch['description'], typ=typ).save()
Attachment(channel_id=msg['channel'], msg=msg_obj, name=atch['name'],
file=atch['content'], description=atch['description'], typ=typ).save()


def show_channel(current):
Expand Down
4 changes: 2 additions & 2 deletions zengine/tornado_server/ws_to_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,13 @@ def on_message(self, channel, method, header, body):
if user_id in self.websockets:
log.info("write msg to client")
self.websockets[user_id].write_message(body)
# channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_ack(delivery_tag=method.delivery_tag)
elif 'sessid_to_userid' in body:
reply = json_decode(body)
sys.sessid_to_userid[reply['sess_id']] = reply['user_id']
self.websockets[reply['user_id']] = self.websockets[reply['sess_id']]
del self.websockets[reply['sess_id']]
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_ack(delivery_tag=method.delivery_tag)

# else:
# channel.basic_reject(delivery_tag=method.delivery_tag)

0 comments on commit 6928e29

Please sign in to comment.