From 8b25c6b6eadecb12cc6e9899d61eee55f87f531a Mon Sep 17 00:00:00 2001 From: Stefan Majoor Date: Tue, 26 Mar 2024 14:17:46 +0100 Subject: [PATCH 1/6] xxx --- binder/websocket.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/binder/websocket.py b/binder/websocket.py index 10c9c108..7579c134 100644 --- a/binder/websocket.py +++ b/binder/websocket.py @@ -29,19 +29,24 @@ def list_rooms_for_user(self, user): return rooms +channel = Noneq +def get_websocket_channel(): + import pika + from pika import BlockingConnection + global channel + if channel and channel.is_open: + return channel + connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'], + settings.HIGH_TEMPLAR['rabbitmq']['password']) + connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'], + credentials=connection_credentials) + connection = BlockingConnection(parameters=connection_parameters) + channel = connection.channel() + return channel def trigger(data, rooms): if 'rabbitmq' in getattr(settings, 'HIGH_TEMPLAR', {}): - import pika - from pika import BlockingConnection - - connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'], - settings.HIGH_TEMPLAR['rabbitmq']['password']) - connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'], - credentials=connection_credentials) - connection = BlockingConnection(parameters=connection_parameters) - channel = connection.channel() - + channel = get_websocket_channel() channel.basic_publish('hightemplar', routing_key='*', body=jsondumps({ 'data': data, 'rooms': rooms, From 55ae412241b648b2fb51bf6e2ebf2f8ecf1a499d Mon Sep 17 00:00:00 2001 From: Stefan Majoor Date: Tue, 26 Mar 2024 14:55:06 +0100 Subject: [PATCH 2/6] xxx --- binder/websocket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binder/websocket.py b/binder/websocket.py index 7579c134..faba04fd 100644 --- a/binder/websocket.py +++ b/binder/websocket.py @@ -29,7 +29,7 @@ def list_rooms_for_user(self, user): return rooms -channel = Noneq +channel = None def get_websocket_channel(): import pika from pika import BlockingConnection From 2e7c99dc4b02f422d00ac2105202e37e790f8759 Mon Sep 17 00:00:00 2001 From: Stefan Majoor Date: Tue, 26 Mar 2024 15:15:10 +0100 Subject: [PATCH 3/6] xxx --- binder/websocket.py | 107 ++++++++++++++++++++++++++------------------ 1 file changed, 64 insertions(+), 43 deletions(-) diff --git a/binder/websocket.py b/binder/websocket.py index faba04fd..205b7515 100644 --- a/binder/websocket.py +++ b/binder/websocket.py @@ -6,57 +6,78 @@ class RoomController(object): - def __init__(self): - self.room_listings = [] + def __init__(self): + self.room_listings = [] - def register(self, superclass): - for view in superclass.__subclasses__(): - if view.register_for_model and view.model is not None: - listing = getattr(view, 'get_rooms_for_user', None) + def register(self, superclass): + for view in superclass.__subclasses__(): + if view.register_for_model and view.model is not None: + listing = getattr(view, 'get_rooms_for_user', None) - if listing and callable(listing): - self.room_listings.append(listing) + if listing and callable(listing): + self.room_listings.append(listing) - self.register(view) + self.register(view) - return self + return self - def list_rooms_for_user(self, user): - rooms = [] + def list_rooms_for_user(self, user): + rooms = [] - for listing in self.room_listings: - rooms += listing(user) + for listing in self.room_listings: + rooms += listing(user) + + return rooms - return rooms channel = None -def get_websocket_channel(): - import pika - from pika import BlockingConnection - global channel - if channel and channel.is_open: - return channel - connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'], - settings.HIGH_TEMPLAR['rabbitmq']['password']) - connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'], - credentials=connection_credentials) - connection = BlockingConnection(parameters=connection_parameters) - channel = connection.channel() - return channel + + +def get_websocket_channel(force_new=False): + import pika + from pika import BlockingConnection + global channel + if channel and channel.is_open: + if not force_new: + return channel + if force_new: + try: + channel.close() + except ChannelWrongStateError: + pass + finally: + channel = None + + connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'], + settings.HIGH_TEMPLAR['rabbitmq']['password']) + connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'], + credentials=connection_credentials) + connection = BlockingConnection(parameters=connection_parameters) + channel = connection.channel() + return channel + + +def _trigger_rabbitmq(data, rooms, tries=2): + try: + channel = get_websocket_channel() + channel.basic_publish('hightemplar', routing_key='*', body=jsondumps({ + 'data': data, + 'rooms': rooms, + })) + except (pika.exceptions.StreamLostError, pika.exceptions.AMQPHeartbeatTimeout): + chanen + + def trigger(data, rooms): - if 'rabbitmq' in getattr(settings, 'HIGH_TEMPLAR', {}): - channel = get_websocket_channel() - channel.basic_publish('hightemplar', routing_key='*', body=jsondumps({ - 'data': data, - 'rooms': rooms, - })) - if getattr(settings, 'HIGH_TEMPLAR_URL', None): - url = getattr(settings, 'HIGH_TEMPLAR_URL') - try: - requests.post('{}/trigger/'.format(url), data=jsondumps({ - 'data': data, - 'rooms': rooms, - })) - except RequestException: - pass + if 'rabbitmq' in getattr(settings, 'HIGH_TEMPLAR', {}): + _trigger_rabbitmq(data, rooms) + if getattr(settings, 'HIGH_TEMPLAR_URL', None): + url = getattr(settings, 'HIGH_TEMPLAR_URL') + try: + requests.post('{}/trigger/'.format(url), data=jsondumps({ + 'data': data, + 'rooms': rooms, + })) + except RequestException: + pass From 32a4eb9108400eee572dafd295ae16b359378423 Mon Sep 17 00:00:00 2001 From: Stefan Majoor Date: Tue, 26 Mar 2024 15:18:43 +0100 Subject: [PATCH 4/6] xxx --- binder/websocket.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/binder/websocket.py b/binder/websocket.py index 205b7515..bd30b193 100644 --- a/binder/websocket.py +++ b/binder/websocket.py @@ -65,7 +65,11 @@ def _trigger_rabbitmq(data, rooms, tries=2): 'rooms': rooms, })) except (pika.exceptions.StreamLostError, pika.exceptions.AMQPHeartbeatTimeout): - chanen + if tries == 0: + raise + get_websocket_channel(force_new=True) + _trigger_rabbitmq(data, rooms, tries=tries - 1) + From 65058f34fd9abbcb93d8a968682e70e278016a8d Mon Sep 17 00:00:00 2001 From: Stefan Majoor Date: Tue, 26 Mar 2024 15:22:24 +0100 Subject: [PATCH 5/6] threadsafe --- binder/websocket.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/binder/websocket.py b/binder/websocket.py index bd30b193..82e1d657 100644 --- a/binder/websocket.py +++ b/binder/websocket.py @@ -3,7 +3,7 @@ from .json import jsondumps import requests from requests.exceptions import RequestException - +import os class RoomController(object): def __init__(self): @@ -29,32 +29,34 @@ def list_rooms_for_user(self, user): return rooms +channels = { -channel = None +} def get_websocket_channel(force_new=False): import pika from pika import BlockingConnection - global channel - if channel and channel.is_open: + global channels + + pid = os.getpid() + + if channels.get(pid) and channel.get(pid).is_open: if not force_new: return channel if force_new: try: - channel.close() + channels.get(pid).close() except ChannelWrongStateError: - pass - finally: - channel = None + channels[pid] = None connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'], settings.HIGH_TEMPLAR['rabbitmq']['password']) connection_parameters = pika.ConnectionParameters(settings.HIGH_TEMPLAR['rabbitmq']['host'], credentials=connection_credentials) connection = BlockingConnection(parameters=connection_parameters) - channel = connection.channel() - return channel + channels[pid] = connection.channel() + return channel[pid] def _trigger_rabbitmq(data, rooms, tries=2): From 13f804c18e3f67fcacdc5fb345521c4889dbf98c Mon Sep 17 00:00:00 2001 From: Stefan Majoor Date: Tue, 26 Mar 2024 15:24:43 +0100 Subject: [PATCH 6/6] xxx --- binder/websocket.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/binder/websocket.py b/binder/websocket.py index 82e1d657..06e14ca6 100644 --- a/binder/websocket.py +++ b/binder/websocket.py @@ -47,7 +47,7 @@ def get_websocket_channel(force_new=False): if force_new: try: channels.get(pid).close() - except ChannelWrongStateError: + except pika.exceptions.ChannelWrongStateError: channels[pid] = None connection_credentials = pika.PlainCredentials(settings.HIGH_TEMPLAR['rabbitmq']['username'], @@ -60,6 +60,7 @@ def get_websocket_channel(force_new=False): def _trigger_rabbitmq(data, rooms, tries=2): + import pika try: channel = get_websocket_channel() channel.basic_publish('hightemplar', routing_key='*', body=jsondumps({