diff --git a/docs/CHANGELOG.rst b/docs/CHANGELOG.rst index beee18ae0..e83799d68 100644 --- a/docs/CHANGELOG.rst +++ b/docs/CHANGELOG.rst @@ -14,11 +14,14 @@ Changed ------- - Redis cache in listener is updated when data fields are retrieved from the database + Added ----- - Add processes allow and ignore list to dispatcher, controlled by environmental variables ``FLOW_PROCESSES_ALLOW_LIST`` and ``FLOW_PROCESSES_IGNORE_LIST``` +- Allow ovirriding the maximal number of commands listener can process + concurrently Fixed ----- diff --git a/resolwe/flow/managers/listener/listener.py b/resolwe/flow/managers/listener/listener.py index e2dad755c..6c3612f2d 100644 --- a/resolwe/flow/managers/listener/listener.py +++ b/resolwe/flow/managers/listener/listener.py @@ -534,6 +534,7 @@ def __init__( port: int, protocol: str, zmq_socket: Optional[zmq.asyncio.Socket] = None, + max_concurrent_commands: int = 10, ): """Initialize.""" if zmq_socket is None: @@ -546,6 +547,7 @@ def __init__( super().__init__( ZMQCommunicator(zmq_socket, "listener <-> workers", logger), logger, + max_concurrent_commands, ) self.communicator.heartbeat_handler = self.heartbeat_handler self._message_processor = Processor(self) @@ -688,6 +690,13 @@ def __init__( getattr(settings, "LISTENER_CONNECTION", {}).get("protocol", "tcp"), ) + self.max_concurrent_commands = kwargs.get( + "max_concurrent_commands", + getattr(settings, "LISTENER_CONNECTION", {}).get( + "max_concurrent_commands", 10 + ), + ) + # When zmq_socket kwarg is not None, use this one instead of creating # a new one. self.zmq_socket = kwargs.get("zmq_socket") @@ -729,7 +738,11 @@ def listener_protocol(self) -> ListenerProtocol: """ if self._listener_protocol is None: self._listener_protocol = ListenerProtocol( - self.hosts, self.port, self.protocol, self.zmq_socket + self.hosts, + self.port, + self.protocol, + self.zmq_socket, + self.max_concurrent_commands, ) return self._listener_protocol diff --git a/tests/settings.py b/tests/settings.py index 746d8f714..fd047cd35 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -112,6 +112,10 @@ "min_port": 50000, "max_port": 60000, "protocol": "tcp", + # Define the max number of commands listener can process simultaneously. + "max_concurrent_commands": config( + "RESOLWE_LISTENER_MAX_CONCURRENT_COMMANDS", cast=int + ), } # The IP address where listener is available from the communication container.