From 5c9ff31d5834b909e3babf16e3ad92f57e8700e3 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Sun, 21 Apr 2024 19:29:45 +0800 Subject: [PATCH] Fix serialization bwc broken in #511 (#515) --- storey/flow.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/storey/flow.py b/storey/flow.py index f2fd9a6e..837d1f12 100644 --- a/storey/flow.py +++ b/storey/flow.py @@ -797,16 +797,17 @@ def __init__(self, status, body): class _ConcurrentJobExecution(Flow): _BACKOFF_MAX = 120 + _DEFAULT_MAX_IN_FLIGHT = 8 def __init__(self, max_in_flight=None, retries=None, backoff_factor=None, **kwargs): Flow.__init__(self, **kwargs) if max_in_flight is not None and max_in_flight < 1: raise ValueError(f"max_in_flight may not be less than 1 (got {max_in_flight})") + self.max_in_flight = max_in_flight self.retries = retries self.backoff_factor = backoff_factor - self._max_in_flight = max_in_flight or 8 - self._queue_size = self._max_in_flight - 1 + self._queue_size = (max_in_flight or self._DEFAULT_MAX_IN_FLIGHT) - 1 def _init(self): super()._init() @@ -963,9 +964,9 @@ def __init__( self._executor = None if concurrency_mechanism == "threading": - self._executor = ThreadPoolExecutor(max_workers=self._max_in_flight) + self._executor = ThreadPoolExecutor(max_workers=self.max_in_flight or self._DEFAULT_MAX_IN_FLIGHT) elif concurrency_mechanism == "multiprocessing": - self._executor = ProcessPoolExecutor(max_workers=self._max_in_flight) + self._executor = ProcessPoolExecutor(max_workers=self.max_in_flight or self._DEFAULT_MAX_IN_FLIGHT) self._pass_context = pass_context