From 677d597c1a806bd39e1c14bed11a56c2eef64a44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Evren=20Esat=20=C3=96zkan?= Date: Mon, 15 Aug 2016 11:20:26 +0300 Subject: [PATCH] it seems fixed and also now it works much faster then before. ref #GH-85 rref #5440 p80 #5440 --- zengine/messaging/lib.py | 3 + zengine/tornado_server/server.py | 91 ++++++++++++----------- zengine/tornado_server/ws_to_queue.py | 100 +------------------------- 3 files changed, 57 insertions(+), 137 deletions(-) diff --git a/zengine/messaging/lib.py b/zengine/messaging/lib.py index f475e866..600e6829 100644 --- a/zengine/messaging/lib.py +++ b/zengine/messaging/lib.py @@ -63,6 +63,9 @@ def set_password(self, raw_password): salt_size=10) def is_online(self, status=None): + if not self.key: + # FIXME: This should not happen! + return if status is None: return ConnectionStatus(self.key).get() or False else: diff --git a/zengine/tornado_server/server.py b/zengine/tornado_server/server.py index 4c575c33..8784ad8e 100644 --- a/zengine/tornado_server/server.py +++ b/zengine/tornado_server/server.py @@ -11,15 +11,15 @@ import traceback from uuid import uuid4 from tornado import websocket, web, ioloop -from tornado.escape import json_decode +from tornado.escape import json_decode, json_encode from tornado.httpclient import HTTPError sys.path.insert(0, os.path.realpath(os.path.dirname(__file__))) -from ws_to_queue import QueueManager, BlockingConnectionForHTTP, log, settings +from ws_to_queue import QueueManager, log, settings COOKIE_NAME = 'zopsess' DEBUG = os.getenv("DEBUG", False) -blocking_connection = BlockingConnectionForHTTP() +# blocking_connection = BlockingConnectionForHTTP() class SocketHandler(websocket.WebSocketHandler): @@ -112,49 +112,60 @@ def post(self, view_name): """ sess_id = None input_data = {} - try: - self._handle_headers() + # try: + self._handle_headers() - # handle input - input_data = json_decode(self.request.body) if self.request.body else {} - input_data['path'] = view_name + # handle input + input_data = json_decode(self.request.body) if self.request.body else {} + input_data['path'] = view_name - # set or get session cookie - if not self.get_cookie(COOKIE_NAME) or 'username' in input_data: - sess_id = uuid4().hex - self.set_cookie(COOKIE_NAME, sess_id) # , domain='127.0.0.1' - else: - sess_id = self.get_cookie(COOKIE_NAME) - h_sess_id = "HTTP_%s" % sess_id - input_data = {'data': input_data, '_zops_remote_ip': self.request.remote_ip} - log.info("New Request for %s: %s" % (sess_id, input_data)) - - - output = blocking_connection.send_message(h_sess_id, input_data) - out_obj = json_decode(output) - - if 'http_headers' in out_obj: - for k, v in out_obj['http_headers']: - self.set_header(k, v) - if 'response' in out_obj: - output = out_obj['response'] - - self.set_status(int(out_obj.get('code', 200))) - except HTTPError as e: - log.exception("HTTPError for %s: %s" % (sess_id, input_data)) - output = {'cmd': 'error', 'error': e.message, "code": e.code} - self.set_status(int(e.code)) - except: - log.exception("HTTPError for %s: %s" % (sess_id, input_data)) - if DEBUG: - self.set_status(500) - output = json.dumps({'error': traceback.format_exc()}) - else: - output = {'cmd': 'error', 'error': "Internal Error", "code": 500} + # set or get session cookie + if not self.get_cookie(COOKIE_NAME) or 'username' in input_data: + sess_id = uuid4().hex + self.set_cookie(COOKIE_NAME, sess_id) # , domain='127.0.0.1' + else: + sess_id = self.get_cookie(COOKIE_NAME) + # h_sess_id = "HTTP_%s" % sess_id + input_data = {'data': input_data, + '_zops_remote_ip': self.request.remote_ip} + log.info("New Request for %s: %s" % (sess_id, input_data)) + + self.application.pc.register_websocket(sess_id, self) + self.application.pc.redirect_incoming_message(sess_id, + json_encode(input_data), + self.request) + + def write_message(self, output): + log.debug("WRITE MESSAGE To CLIENT: %s" % output) self.write(output) self.finish() self.flush() + # # output = blocking_connection.send_message(h_sess_id, input_data) + # out_obj = json_decode(output) + # + # if 'http_headers' in out_obj: + # for k, v in out_obj['http_headers']: + # self.set_header(k, v) + # if 'response' in out_obj: + # output = out_obj['response'] + # + # self.set_status(int(out_obj.get('code', 200))) + # except HTTPError as e: + # log.exception("HTTPError for %s: %s" % (sess_id, input_data)) + # output = {'cmd': 'error', 'error': e.message, "code": e.code} + # self.set_status(int(e.code)) + # except: + # log.exception("HTTPError for %s: %s" % (sess_id, input_data)) + # if DEBUG: + # self.set_status(500) + # output = json.dumps({'error': traceback.format_exc()}) + # else: + # output = {'cmd': 'error', 'error': "Internal Error", "code": 500} + # self.write_message(output) + + + URL_CONFS = [ (r'/ws', SocketHandler), diff --git a/zengine/tornado_server/ws_to_queue.py b/zengine/tornado_server/ws_to_queue.py index c600423a..e71b9ba3 100644 --- a/zengine/tornado_server/ws_to_queue.py +++ b/zengine/tornado_server/ws_to_queue.py @@ -51,80 +51,6 @@ ) -class BlockingConnectionForHTTP(object): - REPLY_TIMEOUT = 105 # sec - - def __init__(self): - try: - self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) - self.input_channel = self.connection.channel() - except ConnectionClosed: - if os.environ.get('READTHEDOCS') == 'True': - pass - else: - raise - - - def create_channel(self): - try: - return self.connection.channel() - except (ConnectionClosed, AttributeError, KeyError): - self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS) - return self.connection.channel() - - def _send_message(self, sess_id, input_data): - log.info("sending data for %s" % sess_id) - self.input_channel.basic_publish(exchange='input_exc', - routing_key=sess_id, - body=json_encode(input_data)) - - def _store_user_id(self, sess_id, body): - log.debug("SET SESSUSERS: %s" % sys.sessid_to_userid) - sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id'].lower() - - def _wait_for_reply(self, sess_id, input_data): - channel = self.create_channel() - channel.queue_declare(queue=sess_id, - arguments={'x-expires': 4000} - # auto_delete=True - ) - timeout_start = time.time() - while 1: - method_frame, header_frame, body = channel.basic_get(sess_id) - log.debug("\n%s\n%s\n%s\n%s" % (sess_id, method_frame, header_frame, body)) - if method_frame: - reply = json_decode(body) - if 'callbackID' in reply and reply['callbackID'] == input_data['callbackID']: - channel.basic_ack(method_frame.delivery_tag) - channel.close() - log.info('Returned view message for %s: %s' % (sess_id, body)) - if 'upgrade' in body: - self._store_user_id(sess_id, body) - return body - else: - if time.time() - json_decode(body)['reply_timestamp'] > self.REPLY_TIMEOUT: - channel.basic_ack(method_frame.delivery_tag) - continue - if time.time() - timeout_start > self.REPLY_TIMEOUT: - break - else: - time.sleep(1) - log.info('No message returned for %s' % sess_id) - channel.close() - - def send_message(self, sess_id, input_data): - input_data['callbackID'] = uuid4().hex - input_data['timestamp'] = time.time() - try: - self._send_message(sess_id, input_data) - except (ConnectionClosed, ChannelClosed, AttributeError): - self.input_channel = self.create_channel() - self._send_message(sess_id, input_data) - - return self._wait_for_reply(sess_id, input_data) or json.dumps( - {'code': 503, 'error': 'Retry'}) - - class QueueManager(object): """ Async RabbitMQ & Tornado websocket connector @@ -194,11 +120,7 @@ def on_input_queue_declare(self, queue): exchange='input_exc', queue=self.INPUT_QUEUE_NAME, routing_key="#") - def ask_for_user_id(self, sess_id): - log.debug(sess_id) - # TODO: add remote ip - self.publish_incoming_message(dict(_zops_remote_ip='', - data={'view': 'sessid_to_userid'}), sess_id) + def register_websocket(self, sess_id, ws): @@ -208,14 +130,6 @@ def register_websocket(self, sess_id, ws): sess_id: ws: """ - # log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid) - # try: - # user_id = sys.sessid_to_userid[sess_id] - # self.websockets[user_id] = ws - # except KeyError: - # self.ask_for_user_id(sess_id) - # self.websockets[sess_id] = ws - # user_id = sess_id self.websockets[sess_id] = ws self.create_out_channel(sess_id) @@ -243,8 +157,9 @@ def unregister_websocket(self, sess_id): def create_out_channel(self, sess_id): def _on_output_channel_creation(channel): def _on_output_queue_decleration(queue): + # differentiate and identify incoming message with registered consumer channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=sess_id) - log.debug("BIND QUEUE TO WS Q.%s" % sess_id) + log.debug("BINDED QUEUE TO WS Q.%s" % sess_id) self.out_channels[sess_id] = channel channel.queue_declare(callback=_on_output_queue_decleration, @@ -274,13 +189,4 @@ def on_message(self, channel, method, header, body): if sess_id in self.websockets: log.info("write msg to client") self.websockets[sess_id].write_message(body) - # channel.basic_ack(delivery_tag=method.delivery_tag) - # elif 'sessid_to_userid' in body: - # reply = json_decode(body) - # sys.sessid_to_userid[reply['sess_id']] = reply['user_id'] - # self.websockets[reply['user_id']] = self.websockets[reply['sess_id']] - # del self.websockets[reply['sess_id']] channel.basic_ack(delivery_tag=method.delivery_tag) - - # else: - # channel.basic_reject(delivery_tag=method.delivery_tag)