Skip to content

Commit

Permalink
rref #5367
Browse files Browse the repository at this point in the history
rref #5366

ref #66
ref #65
  • Loading branch information
evrenesat committed Jul 21, 2016
1 parent f384d1c commit 899f02c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 49 deletions.
41 changes: 24 additions & 17 deletions tests/async_amqp/messaging_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,32 @@

class TestCase(ConcurrentTestCase):
def test_channel_list(self):
self.post('ulakbus', {"view": "_zops_list_channels"})
self.post('ulakbus', dict(view="_zops_list_channels"), self.show_channel)

def test_search_user(self):
self.post('ulakbus', {"view": "_zops_search_user", "query": "x"})

# def test_show_channel(self):
# self.post('ulakbus',
# {"view": "_zops_show_channel",
# 'channel_key': 'iG4mvjQrfkvTDvM6Jk56X5ILoJ_CoqwpemOHnknn3hYu1BlAghb3dm'})
#
# def test_create_message(self):
# self.post('ulakbus',
# {"view": "_zops_create_message",
# "message": dict(
# body='test_body', title='testtitle',
# channel='iG4mvjQrfkvTDvM6Jk56X5ILoJ_CoqwpemOHnknn3hYu1BlAghb3dm',
# receiver='',
# type=2
# )})
self.post('ulakbus',
dict(view="_zops_search_user", query="x"))

def show_channel(self, res, req):
ch_key = res['channels'][0]['key']
self.post('ulakbus',
dict(view="_zops_show_channel", channel_key=ch_key),
self.create_message)


def create_message(self, res, req):
self.post('ulakbus',
{"view": "_zops_create_message",
"message": dict(
body='test_body', title='testtitle',
channel=res['channel_key'],
receiver='',
type=2
)})

def cmd_message(self, res, req=None):
print("MESSAGE RECEIVED")
print(res)


def main():
Expand Down
36 changes: 31 additions & 5 deletions zengine/lib/concurrent_amqp_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,18 @@ def backend_to_client(self, body):
"""
from backend to client
"""
body = json_decode(body)
self.message_stack[body['callbackID']] = body
self.message_callbacks[body['callbackID']](body)
try:
body = json_decode(body)
if 'callbackID' in body:
self.message_stack[body['callbackID']] = body
self.message_callbacks[body['callbackID']](body)
elif 'cmd' in body:
self.message_callbacks[body['cmd']](body)
except:
import traceback
print("\n")
traceback.print_exc()

log.info("WRITE MESSAGE TO CLIENT:\n%s" % (pformat(body),))

def client_to_backend(self, message, callback, caller_fn_name):
Expand All @@ -104,8 +113,13 @@ def client_to_backend(self, message, callback, caller_fn_name):
cbid = uuid.uuid4().hex
message = json_encode({"callbackID": cbid, "data": message})
def cb(res):
print("API Request: %s :: " % caller_fn_name, end='')
result = callback(res, message)
print("API Request: %s :: %s\n" % (caller_fn_name, 'PASS' if result else 'FAIL!'))
if ConcurrentTestCase.stc == callback and not result:
FAIL = 'FAIL'
else:
FAIL = '--> %s' % callback.__name__
print('PASS' if result else FAIL)
# self.message_callbacks[cbid] = lambda res: callable(res, message)
self.message_callbacks[cbid] = cb
log.info("GOT MESSAGE FOR BACKEND %s: %s" % (self.sess_id, message))
Expand All @@ -126,6 +140,8 @@ def __init__(self, queue_manager):
self.clients = {}
self.make_client('ulakbus')
self.run_tests()
self.cmds = {}
self.register_cmds()

def make_client(self, username):
"""
Expand All @@ -142,14 +158,24 @@ def make_client(self, username):
def post(self, username, data, callback=None):
if username not in self.clients:
self.make_client(username)
self.clients[username].message_callbacks.update(self.cmds)
callback = callback or self.stc
view_name = data['view'] if 'view' in data else sys._getframe(1).f_code.co_name
self.clients[username].client_to_backend(data, callback, view_name)

def register_cmds(self):
for name in sorted(self.__class__.__dict__):
if name.startswith("cmd_"):
self.cmds[name[4:]] = getattr(self, name)

def run_tests(self):
for name in sorted(self.__class__.__dict__):
if name.startswith("test_"):
getattr(self, name)()
try:
getattr(self, name)()
except:
import traceback
traceback.print_exc()

def process_error_reponse(self, resp):
if 'error' in resp:
Expand Down
13 changes: 10 additions & 3 deletions zengine/messaging/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def get_subscription_for_user(self, user_id):

def get_last_messages(self):
# TODO: Try to refactor this with https://github.com/rabbitmq/rabbitmq-recent-history-exchange
return self.message_set.objects.filter().set_params(sort="timestamp asc")[:20]
return self.message_set.objects.filter().set_params(sort="updated_at desc")[:20]

@classmethod
def _connect_mq(cls):
Expand Down Expand Up @@ -297,14 +297,21 @@ class Message(Model):
url = field.String("URL")

def get_actions_for(self, user):
actions = [('Favorite', '_zops_favorite_message')]
actions = []
if Favorite.objects.filter(user=user,
channel=self.channel,
message=self).count():
actions.append(('Remove from favorites', '_zops_remove_from_favorites'))
else:
actions.append(('Add to favorites', '_zops_favorite_message'))
if user:
actions.extend([('Flag', '_zops_flag_message')])
if self.sender == user:
actions.extend([
('Delete', '_zops_delete_message'),
('Edit', '_zops_edit_message')
])
return actions

def serialize(self, user=None):
"""
Expand All @@ -329,10 +336,10 @@ def serialize(self, user=None):
'title': self.msg_title,
'sender_name': self.sender.full_name,
'sender_key': self.sender.key,
'channel_key': self.channel.key,
'cmd': 'message',
'avatar_url': self.sender.avatar,
'key': self.key,
'actions': self.get_actions_for(user),
}

def __unicode__(self):
Expand Down
48 changes: 24 additions & 24 deletions zengine/messaging/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pyoko.db.adapter.db_riak import BlockSave
from pyoko.exceptions import ObjectDoesNotExist
from pyoko.lib.utils import get_object_from_path
from zengine.log import log
from zengine.lib.exceptions import HTTPError
from zengine.messaging.model import Channel, Attachment, Subscriber, Message, Favorite, \
FlaggedMessage
Expand All @@ -34,15 +35,6 @@
'avatar_url': string,
'key': key,
'cmd': 'message',
'actions':[('action name', 'view name'),
('Add to Favorite', '_zops_add_to_favorites'), # applicable to everyone
# Additional actions should be retrieved
# from "_zops_get_message_actions" view.
('Edit', '_zops_edit_message'),
('Delete', '_zops_delete_message'),
]
'attachments': [{
'description': string,
'file_name': string,
Expand Down Expand Up @@ -169,11 +161,12 @@ def show_channel(current, waited=False):
'is_online': sb.user.is_online(),
'avatar_url': sb.user.get_avatar_url()
} for sb in ch.subscriber_set.objects.filter()],
'last_messages': [msg.serialize(current.user)
for msg in ch.get_last_messages()],
'last_messages': [],
'status': 'OK',
'code': 200
}
for msg in ch.get_last_messages():
current.output['last_messages'].insert(0, msg.serialize(current.user))


def channel_history(current):
Expand Down Expand Up @@ -583,12 +576,18 @@ def delete_channel(current):
'code': 200
}
"""
ch = Channel(current).objects.get(owner_id=current.user_id,
key=current.input['channel_key'])
for sbs in ch.subscriber_set.objects.filter():
sbs.delete()
for msg in ch.message_set.objects.filter():
msg.delete()
try:
Channel(current).objects.get(owner_id=current.user_id,
key=current.input['channel_key']).delete()
current.output = {'status': 'Deleted', 'code': 200}
except ObjectDoesNotExist:
raise HTTPError(404, "")
ch.delete()
except:
log.exception("fix this!!!!!")
current.output = {'status': 'Deleted', 'code': 200}



def edit_channel(current):
Expand All @@ -611,14 +610,15 @@ def edit_channel(current):
'code': 200
}
"""
try:
Channel(current).objects.filter(owner_id=current.user_id,
key=current.input['channel_key']
).update(name=current.input['name'],
description=current.input['description'])
current.output = {'status': 'OK', 'code': 200}
except ObjectDoesNotExist:
raise HTTPError(404, "")
ch = Channel(current).objects.get(owner_id=current.user_id,
key=current.input['channel_key'])
ch.name = current.input['name']
ch.description = current.input['description']
ch.save()
for sbs in ch.subscriber_set.objects.filter():
sbs.name = ch.name
sbs.save()
current.output = {'status': 'OK', 'code': 200}


def pin_channel(current):
Expand Down

0 comments on commit 899f02c

Please sign in to comment.