From 6928e29594e639060ad80fabf67663fe815392c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Wed, 20 Jul 2016 13:09:27 +0300 Subject: [PATCH] rref #5367 rref #5366 ref zetaops/zengine#66 ref zetaops/zengine#65 --- tests/async_amqp/messaging_tests.py | 50 +++++++++ zengine/lib/concurrent_amqp_test_client.py | 116 ++++++++++++++------- zengine/messaging/model.py | 26 +++-- zengine/messaging/views.py | 8 +- zengine/tornado_server/ws_to_queue.py | 4 +- 5 files changed, 148 insertions(+), 56 deletions(-) create mode 100644 tests/async_amqp/messaging_tests.py diff --git a/tests/async_amqp/messaging_tests.py b/tests/async_amqp/messaging_tests.py new file mode 100644 index 00000000..e17a6e78 --- /dev/null +++ b/tests/async_amqp/messaging_tests.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +""" +""" + +# Copyright (C) 2015 ZetaOps Inc. +# +# This file is licensed under the GNU General Public License v3 +# (GPLv3). See LICENSE.txt for details. +from zengine.lib.concurrent_amqp_test_client import ConcurrentTestCase, TestQueueManager + + +class TestCase(ConcurrentTestCase): + def test_channel_list(self): + self.post('ulakbus', {"view": "_zops_list_channels"}) + + def test_search_user(self): + self.post('ulakbus', {"view": "_zops_search_user", + "query": "x"}) + + def test_show_channel(self): + self.post('ulakbus', + {"view": "_zops_show_channel", + 'channel_key': 'iG4mvjQrfkvTDvM6Jk56X5ILoJ_CoqwpemOHnknn3hYu1BlAghb3dm'}) + + def test_create_message(self): + self.post('ulakbus', + {"view": "_zops_create_message", + "message": dict( + body='test_body', title='testtitle', + channel='iG4mvjQrfkvTDvM6Jk56X5ILoJ_CoqwpemOHnknn3hYu1BlAghb3dm', + receiver='', + type=2 + )}) + + +def main(): + from tornado import ioloop + # initiate amqp manager + ioloop = ioloop.IOLoop.instance() + qm = TestQueueManager(io_loop=ioloop) + + # initiate test case + qm.set_test_class(TestCase) + + qm.connect() + ioloop.start() + + +if __name__ == '__main__': + main() diff --git a/zengine/lib/concurrent_amqp_test_client.py b/zengine/lib/concurrent_amqp_test_client.py index 0406f1bf..df42fb0a 100644 --- a/zengine/lib/concurrent_amqp_test_client.py +++ b/zengine/lib/concurrent_amqp_test_client.py @@ -13,7 +13,11 @@ # # This file is licensed under the GNU General Public License v3 # (GPLv3). See LICENSE.txt for details. +from __future__ import print_function + +import inspect import uuid +from pprint import pprint import pika from tornado.escape import json_encode, json_decode @@ -29,6 +33,7 @@ from zengine.views.auth import Login sys.sessid_to_userid = {} +sys.test_method_names = {} UserModel = get_object_from_path(settings.USER_MODEL) @@ -70,7 +75,6 @@ def __init__(self, queue_manager, username, sess_id=None): self.request = type('MockWSRequestObject', (object,), {'remote_ip': '127.0.0.1'}) self.queue_manager = queue_manager - # order is important! self.sess_id = sess_id or uuid.uuid4().hex sys.sessid_to_userid[self.sess_id] = self.user.key.lower() self.queue_manager.register_websocket(self.sess_id, self) @@ -96,13 +100,17 @@ def backend_to_client(self, body): self.message_stack[body['callbackID']] = body log.info("WRITE MESSAGE TO CLIENT:\n%s" % (body,)) - def client_to_backend(self, message, callback): + def client_to_backend(self, message, callback, caller_fn_name): """ from client to backend """ cbid = uuid.uuid4().hex - self.message_callbacks[cbid] = callback message = json_encode({"callbackID": cbid, "data": message}) + def cb(res): + print("Testing: %s :: " % caller_fn_name, end='') + callback(res, message) + # self.message_callbacks[cbid] = lambda res: callable(res, message) + self.message_callbacks[cbid] = cb log.info("GOT MESSAGE FOR BACKEND %s: %s" % (self.sess_id, message)) self.queue_manager.redirect_incoming_message(self.sess_id, message, self.request) @@ -116,18 +124,14 @@ class ConcurrentTestCase(object): """ def __init__(self, queue_manager): - from tornado import ioloop log.info("ConcurrentTestCase class init with %s" % queue_manager) - ioloop = ioloop.IOLoop.instance() - self.ws1 = self.get_client('ulakbus') - self.ws2 = self.get_client('ogrenci_isleri_1') - self.queue_manager = TestQueueManager(io_loop=ioloop) - # initiate amqp manager - self.queue_manager.set_test_class(self.run_tests) - self.queue_manager.connect() - ioloop.start() - - def get_client(self, username): + self.queue_manager = queue_manager + self.clients = {} + self.make_client('ulakbus') + self.test_fn_name = '' + self.run_tests() + + def make_client(self, username): """ Args: username: username for this client instance @@ -135,28 +139,70 @@ def get_client(self, username): Returns: Logged in TestWSClient instance for given username """ - return TestWSClient(self.queue_manager, username) + self.clients[username] = TestWSClient(self.queue_manager, username) + + + + def post(self, username, data, callback=None): + if username not in self.clients: + self.make_client(username) + callback = callback or self.stc + self.clients[username].client_to_backend(data, callback, self.test_fn_name) def run_tests(self): for name in sorted(self.__class__.__dict__): if name.startswith("test_"): - try: - getattr(self, name)() - print("%s succesfully passed" % name) - except: - print("%s FAIL" % name) - - def success_test_callback(self, response, request=None): - # print(response) - assert response['code'] in (200, 201), "Process response not successful: \n %s \n %s" % ( - response, request - ) - - def test_channel_list(self): - self.ws1.client_to_backend({"view": "_zops_list_channels"}, - self.success_test_callback) - - def test_search_user(self): - self.ws1.client_to_backend({"view": "_zops_search_user", - "query": "x"}, - self.success_test_callback) + self.test_fn_name = name[5:] + getattr(self, name)() + + def process_error_reponse(self, resp): + if 'error' in resp: + print(resp['error'].replace('\\n','\n').replace('u\\', '')) + return True + + def stc(self, response, request=None): + """ + STC means Success Test Callback. Looks for 200 or 201 codes in response code. + + Args: + response: + request: + """ + if not response['code'] in (200, 201): + print("FAILED: Response not successful: \n") + if not self.process_error_reponse(response): + print("\nRESP:\n%s") + print("\nREQ:\n %s" % (response, request)) + else: + print("PASS!\n") + + def pstc(self, response, request=None): + """ + Same as self.stc() (success request callback) but printing response/request + for debugging purposes + + Args: + response: + request: + + """ + self.stc(response, request) + print("\n\n=================\n\nRESPONSE: %s \n\nREQUEST: %s\n" % (response, request)) + + + +def main(): + from tornado import ioloop + # initiate amqp manager + ioloop = ioloop.IOLoop.instance() + qm = TestQueueManager(io_loop=ioloop) + + # initiate test case + qm.set_test_class(ConcurrentTestCase) + + qm.connect() + ioloop.start() + + +if __name__ == '__main__': + main() diff --git a/zengine/messaging/model.py b/zengine/messaging/model.py index 361a8db1..6d4483f8 100644 --- a/zengine/messaging/model.py +++ b/zengine/messaging/model.py @@ -14,6 +14,7 @@ from pyoko import Model, field, ListNode from pyoko.conf import settings from pyoko.exceptions import IntegrityError +from pyoko.fields import DATE_TIME_FORMAT from pyoko.lib.utils import get_object_from_path from zengine.client_queue import BLOCKING_MQ_PARAMS from zengine.lib.utils import to_safe_str @@ -276,18 +277,15 @@ class Message(Model): url = field.String("URL") def get_actions_for(self, user): - actions = [ - ('Favorite', '_zops_favorite_message') - ] - if self.sender == user: - actions.extend([ - ('Delete', '_zops_delete_message'), - ('Edit', '_zops_edit_message') - ]) - elif user: - actions.extend([ - ('Flag', '_zops_flag_message') - ]) + actions = [('Favorite', '_zops_favorite_message')] + if user: + actions.extend([('Flag', '_zops_flag_message')]) + if self.sender == user: + actions.extend([ + ('Delete', '_zops_delete_message'), + ('Edit', '_zops_edit_message') + ]) + def serialize(self, user=None): """ @@ -305,8 +303,8 @@ def serialize(self, user=None): return { 'content': self.body, 'type': self.typ, - 'updated_at': self.updated_at, - 'timestamp': self.timestamp, + 'updated_at': self.updated_at.strftime(DATE_TIME_FORMAT) if self.updated_at else None, + 'timestamp': self.timestamp.strftime(DATE_TIME_FORMAT), 'is_update': self.exist, 'attachments': [attachment.serialize() for attachment in self.attachment_set], 'title': self.msg_title, diff --git a/zengine/messaging/views.py b/zengine/messaging/views.py index 390fd6a3..71a8a5ec 100644 --- a/zengine/messaging/views.py +++ b/zengine/messaging/views.py @@ -110,15 +110,13 @@ def create_message(current): """ msg = current.input['message'] - ch = Channel(current).objects.get(msg['channel']) - msg_obj = ch.add_message(body=msg['body'], typ=msg['typ'], sender=current.user, + msg_obj = Channel.add_message(msg['channel'], body=msg['body'], typ=msg['type'], sender=current.user, title=msg['title'], receiver=msg['receiver'] or None) if 'attachment' in msg: for atch in msg['attachments']: - # TODO: Attachment type detection typ = current._dedect_file_type(atch['name'], atch['content']) - Attachment(channel=ch, msg=msg_obj, name=atch['name'], file=atch['content'], - description=atch['description'], typ=typ).save() + Attachment(channel_id=msg['channel'], msg=msg_obj, name=atch['name'], + file=atch['content'], description=atch['description'], typ=typ).save() def show_channel(current): diff --git a/zengine/tornado_server/ws_to_queue.py b/zengine/tornado_server/ws_to_queue.py index 037333e3..cf8e9f4b 100644 --- a/zengine/tornado_server/ws_to_queue.py +++ b/zengine/tornado_server/ws_to_queue.py @@ -268,13 +268,13 @@ def on_message(self, channel, method, header, body): if user_id in self.websockets: log.info("write msg to client") self.websockets[user_id].write_message(body) - # channel.basic_ack(delivery_tag=method.delivery_tag) + channel.basic_ack(delivery_tag=method.delivery_tag) elif 'sessid_to_userid' in body: reply = json_decode(body) sys.sessid_to_userid[reply['sess_id']] = reply['user_id'] self.websockets[reply['user_id']] = self.websockets[reply['sess_id']] del self.websockets[reply['sess_id']] - channel.basic_ack(delivery_tag=method.delivery_tag) + channel.basic_ack(delivery_tag=method.delivery_tag) # else: # channel.basic_reject(delivery_tag=method.delivery_tag)