Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix opening up multiple request to rabbitmq when triggering websocket messages #235

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 73 additions & 40 deletions binder/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,88 @@
from .json import jsondumps
import requests
from requests.exceptions import RequestException

import os

class RoomController(object):
def __init__(self):
self.room_listings = []
def __init__(self):
self.room_listings = []

def register(self, superclass):
for view in superclass.__subclasses__():
if view.register_for_model and view.model is not None:
listing = getattr(view, 'get_rooms_for_user', None)

if listing and callable(listing):
self.room_listings.append(listing)

self.register(view)

return self

def list_rooms_for_user(self, user):
rooms = []

for listing in self.room_listings:
rooms += listing(user)

return rooms

channels = {

}


def get_websocket_channel(force_new=False):
import pika
from pika import BlockingConnection
global channels

def register(self, superclass):
for view in superclass.__subclasses__():
if view.register_for_model and view.model is not None:
listing = getattr(view, 'get_rooms_for_user', None)
pid = os.getpid()

if listing and callable(listing):
self.room_listings.append(listing)
if channels.get(pid) and channel.get(pid).is_open:
if not force_new:
return channel
if force_new:
try:
channels.get(pid).close()
except pika.exceptions.ChannelWrongStateError:
channels[pid] = None

self.register(view)
connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'],
settings.HIGH_TEMPLAR['rabbitmq']['password'])
connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'],
credentials=connection_credentials)
connection = BlockingConnection(parameters=connection_parameters)
channels[pid] = connection.channel()
return channel[pid]

return self

def list_rooms_for_user(self, user):
rooms = []
def _trigger_rabbitmq(data, rooms, tries=2):
import pika
try:
channel = get_websocket_channel()
channel.basic_publish('hightemplar', routing_key='*', body=jsondumps({
'data': data,
'rooms': rooms,
}))
except (pika.exceptions.StreamLostError, pika.exceptions.AMQPHeartbeatTimeout):
if tries == 0:
raise
get_websocket_channel(force_new=True)
_trigger_rabbitmq(data, rooms, tries=tries - 1)

for listing in self.room_listings:
rooms += listing(user)

return rooms


def trigger(data, rooms):
if 'rabbitmq' in getattr(settings, 'HIGH_TEMPLAR', {}):
import pika
from pika import BlockingConnection

connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'],
settings.HIGH_TEMPLAR['rabbitmq']['password'])
connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'],
credentials=connection_credentials)
connection = BlockingConnection(parameters=connection_parameters)
channel = connection.channel()

channel.basic_publish('hightemplar', routing_key='*', body=jsondumps({
'data': data,
'rooms': rooms,
}))
if getattr(settings, 'HIGH_TEMPLAR_URL', None):
url = getattr(settings, 'HIGH_TEMPLAR_URL')
try:
requests.post('{}/trigger/'.format(url), data=jsondumps({
'data': data,
'rooms': rooms,
}))
except RequestException:
pass
if 'rabbitmq' in getattr(settings, 'HIGH_TEMPLAR', {}):
_trigger_rabbitmq(data, rooms)
if getattr(settings, 'HIGH_TEMPLAR_URL', None):
url = getattr(settings, 'HIGH_TEMPLAR_URL')
try:
requests.post('{}/trigger/'.format(url), data=jsondumps({
'data': data,
'rooms': rooms,
}))
except RequestException:
pass
Loading