Skip to content

Commit

Permalink
it seems fixed and also now it works much faster then before.
Browse files Browse the repository at this point in the history
ref #GH-85
rref #5440
p80 #5440
  • Loading branch information
evrenesat committed Aug 15, 2016
1 parent 1303a12 commit 677d597
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 137 deletions.
3 changes: 3 additions & 0 deletions zengine/messaging/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def set_password(self, raw_password):
salt_size=10)

def is_online(self, status=None):
if not self.key:
# FIXME: This should not happen!
return
if status is None:
return ConnectionStatus(self.key).get() or False
else:
Expand Down
91 changes: 51 additions & 40 deletions zengine/tornado_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
import traceback
from uuid import uuid4
from tornado import websocket, web, ioloop
from tornado.escape import json_decode
from tornado.escape import json_decode, json_encode
from tornado.httpclient import HTTPError

sys.path.insert(0, os.path.realpath(os.path.dirname(__file__)))
from ws_to_queue import QueueManager, BlockingConnectionForHTTP, log, settings
from ws_to_queue import QueueManager, log, settings

COOKIE_NAME = 'zopsess'
DEBUG = os.getenv("DEBUG", False)
blocking_connection = BlockingConnectionForHTTP()
# blocking_connection = BlockingConnectionForHTTP()


class SocketHandler(websocket.WebSocketHandler):
Expand Down Expand Up @@ -112,49 +112,60 @@ def post(self, view_name):
"""
sess_id = None
input_data = {}
try:
self._handle_headers()
# try:
self._handle_headers()

# handle input
input_data = json_decode(self.request.body) if self.request.body else {}
input_data['path'] = view_name
# handle input
input_data = json_decode(self.request.body) if self.request.body else {}
input_data['path'] = view_name

# set or get session cookie
if not self.get_cookie(COOKIE_NAME) or 'username' in input_data:
sess_id = uuid4().hex
self.set_cookie(COOKIE_NAME, sess_id) # , domain='127.0.0.1'
else:
sess_id = self.get_cookie(COOKIE_NAME)
h_sess_id = "HTTP_%s" % sess_id
input_data = {'data': input_data, '_zops_remote_ip': self.request.remote_ip}
log.info("New Request for %s: %s" % (sess_id, input_data))


output = blocking_connection.send_message(h_sess_id, input_data)
out_obj = json_decode(output)

if 'http_headers' in out_obj:
for k, v in out_obj['http_headers']:
self.set_header(k, v)
if 'response' in out_obj:
output = out_obj['response']

self.set_status(int(out_obj.get('code', 200)))
except HTTPError as e:
log.exception("HTTPError for %s: %s" % (sess_id, input_data))
output = {'cmd': 'error', 'error': e.message, "code": e.code}
self.set_status(int(e.code))
except:
log.exception("HTTPError for %s: %s" % (sess_id, input_data))
if DEBUG:
self.set_status(500)
output = json.dumps({'error': traceback.format_exc()})
else:
output = {'cmd': 'error', 'error': "Internal Error", "code": 500}
# set or get session cookie
if not self.get_cookie(COOKIE_NAME) or 'username' in input_data:
sess_id = uuid4().hex
self.set_cookie(COOKIE_NAME, sess_id) # , domain='127.0.0.1'
else:
sess_id = self.get_cookie(COOKIE_NAME)
# h_sess_id = "HTTP_%s" % sess_id
input_data = {'data': input_data,
'_zops_remote_ip': self.request.remote_ip}
log.info("New Request for %s: %s" % (sess_id, input_data))

self.application.pc.register_websocket(sess_id, self)
self.application.pc.redirect_incoming_message(sess_id,
json_encode(input_data),
self.request)

def write_message(self, output):
log.debug("WRITE MESSAGE To CLIENT: %s" % output)
self.write(output)
self.finish()
self.flush()

# # output = blocking_connection.send_message(h_sess_id, input_data)
# out_obj = json_decode(output)
#
# if 'http_headers' in out_obj:
# for k, v in out_obj['http_headers']:
# self.set_header(k, v)
# if 'response' in out_obj:
# output = out_obj['response']
#
# self.set_status(int(out_obj.get('code', 200)))
# except HTTPError as e:
# log.exception("HTTPError for %s: %s" % (sess_id, input_data))
# output = {'cmd': 'error', 'error': e.message, "code": e.code}
# self.set_status(int(e.code))
# except:
# log.exception("HTTPError for %s: %s" % (sess_id, input_data))
# if DEBUG:
# self.set_status(500)
# output = json.dumps({'error': traceback.format_exc()})
# else:
# output = {'cmd': 'error', 'error': "Internal Error", "code": 500}
# self.write_message(output)




URL_CONFS = [
(r'/ws', SocketHandler),
Expand Down
100 changes: 3 additions & 97 deletions zengine/tornado_server/ws_to_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,80 +51,6 @@
)


class BlockingConnectionForHTTP(object):
REPLY_TIMEOUT = 105 # sec

def __init__(self):
try:
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
self.input_channel = self.connection.channel()
except ConnectionClosed:
if os.environ.get('READTHEDOCS') == 'True':
pass
else:
raise


def create_channel(self):
try:
return self.connection.channel()
except (ConnectionClosed, AttributeError, KeyError):
self.connection = pika.BlockingConnection(BLOCKING_MQ_PARAMS)
return self.connection.channel()

def _send_message(self, sess_id, input_data):
log.info("sending data for %s" % sess_id)
self.input_channel.basic_publish(exchange='input_exc',
routing_key=sess_id,
body=json_encode(input_data))

def _store_user_id(self, sess_id, body):
log.debug("SET SESSUSERS: %s" % sys.sessid_to_userid)
sys.sessid_to_userid[sess_id[5:]] = json_decode(body)['user_id'].lower()

def _wait_for_reply(self, sess_id, input_data):
channel = self.create_channel()
channel.queue_declare(queue=sess_id,
arguments={'x-expires': 4000}
# auto_delete=True
)
timeout_start = time.time()
while 1:
method_frame, header_frame, body = channel.basic_get(sess_id)
log.debug("\n%s\n%s\n%s\n%s" % (sess_id, method_frame, header_frame, body))
if method_frame:
reply = json_decode(body)
if 'callbackID' in reply and reply['callbackID'] == input_data['callbackID']:
channel.basic_ack(method_frame.delivery_tag)
channel.close()
log.info('Returned view message for %s: %s' % (sess_id, body))
if 'upgrade' in body:
self._store_user_id(sess_id, body)
return body
else:
if time.time() - json_decode(body)['reply_timestamp'] > self.REPLY_TIMEOUT:
channel.basic_ack(method_frame.delivery_tag)
continue
if time.time() - timeout_start > self.REPLY_TIMEOUT:
break
else:
time.sleep(1)
log.info('No message returned for %s' % sess_id)
channel.close()

def send_message(self, sess_id, input_data):
input_data['callbackID'] = uuid4().hex
input_data['timestamp'] = time.time()
try:
self._send_message(sess_id, input_data)
except (ConnectionClosed, ChannelClosed, AttributeError):
self.input_channel = self.create_channel()
self._send_message(sess_id, input_data)

return self._wait_for_reply(sess_id, input_data) or json.dumps(
{'code': 503, 'error': 'Retry'})


class QueueManager(object):
"""
Async RabbitMQ & Tornado websocket connector
Expand Down Expand Up @@ -194,11 +120,7 @@ def on_input_queue_declare(self, queue):
exchange='input_exc',
queue=self.INPUT_QUEUE_NAME,
routing_key="#")
def ask_for_user_id(self, sess_id):
log.debug(sess_id)
# TODO: add remote ip
self.publish_incoming_message(dict(_zops_remote_ip='',
data={'view': 'sessid_to_userid'}), sess_id)



def register_websocket(self, sess_id, ws):
Expand All @@ -208,14 +130,6 @@ def register_websocket(self, sess_id, ws):
sess_id:
ws:
"""
# log.debug("GET SESSUSERS: %s" % sys.sessid_to_userid)
# try:
# user_id = sys.sessid_to_userid[sess_id]
# self.websockets[user_id] = ws
# except KeyError:
# self.ask_for_user_id(sess_id)
# self.websockets[sess_id] = ws
# user_id = sess_id
self.websockets[sess_id] = ws
self.create_out_channel(sess_id)

Expand Down Expand Up @@ -243,8 +157,9 @@ def unregister_websocket(self, sess_id):
def create_out_channel(self, sess_id):
def _on_output_channel_creation(channel):
def _on_output_queue_decleration(queue):
# differentiate and identify incoming message with registered consumer
channel.basic_consume(self.on_message, queue=sess_id, consumer_tag=sess_id)
log.debug("BIND QUEUE TO WS Q.%s" % sess_id)
log.debug("BINDED QUEUE TO WS Q.%s" % sess_id)
self.out_channels[sess_id] = channel

channel.queue_declare(callback=_on_output_queue_decleration,
Expand Down Expand Up @@ -274,13 +189,4 @@ def on_message(self, channel, method, header, body):
if sess_id in self.websockets:
log.info("write msg to client")
self.websockets[sess_id].write_message(body)
# channel.basic_ack(delivery_tag=method.delivery_tag)
# elif 'sessid_to_userid' in body:
# reply = json_decode(body)
# sys.sessid_to_userid[reply['sess_id']] = reply['user_id']
# self.websockets[reply['user_id']] = self.websockets[reply['sess_id']]
# del self.websockets[reply['sess_id']]
channel.basic_ack(delivery_tag=method.delivery_tag)

# else:
# channel.basic_reject(delivery_tag=method.delivery_tag)

0 comments on commit 677d597

Please sign in to comment.