From 7c640289e08cc043ca89978526dc99591ba47e36 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Thu, 26 Oct 2023 13:29:28 +0800 Subject: [PATCH 01/10] ML-4799: Simplify emit source offset handling for performance --- storey/sources.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 52393a61..0776082e 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -257,8 +257,7 @@ class SyncEmitSource(Flow): """ _legal_first_step = True - _backoff = [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1] - _backoff_last_index = len(_backoff) - 1 + _max_wait_before_commit = 2 def __init__( self, @@ -299,26 +298,23 @@ async def _run_loop(self): committer = self.context.platform.explicit_ack while True: event = None + next_event_future = loop.run_in_executor(None, self._q.get) if ( num_events_handled_without_commit > 0 and self._q.empty() or num_events_handled_without_commit >= self._max_events_before_commit ): num_events_handled_without_commit = 0 - can_block = await _commit_handled_events(self._outstanding_offsets, committer) - iteration = 0 - # In case we can't block because there are outstanding events - while not can_block: - sleep = self._backoff[min(iteration, self._backoff_last_index)] - iteration += 1 - await asyncio.sleep(sleep) - if self._q.qsize() > 0: - event = self._q.get_nowait() - if event: - break + while event is None: can_block = await _commit_handled_events(self._outstanding_offsets, committer) - if not event: - event = await loop.run_in_executor(None, self._q.get) + if can_block: + event = await next_event_future + try: + event = await asyncio.wait_for(next_event_future, timeout=self._max_wait_before_commit) + except TimeoutError: + pass + else: + event = await next_event_future if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): qualified_shard = (event.path, event.shard_id) offsets = self._outstanding_offsets[qualified_shard] From 349fea0076630f3edff416a0ad7cdc87084b50c1 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Thu, 26 Oct 2023 15:32:27 +0800 Subject: [PATCH 02/10] Implement timeout differently --- storey/sources.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 0776082e..c8dec0a8 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -298,7 +298,6 @@ async def _run_loop(self): committer = self.context.platform.explicit_ack while True: event = None - next_event_future = loop.run_in_executor(None, self._q.get) if ( num_events_handled_without_commit > 0 and self._q.empty() @@ -308,13 +307,13 @@ async def _run_loop(self): while event is None: can_block = await _commit_handled_events(self._outstanding_offsets, committer) if can_block: - event = await next_event_future + break try: - event = await asyncio.wait_for(next_event_future, timeout=self._max_wait_before_commit) - except TimeoutError: + event = await loop.run_in_executor(None, self._q.get, True, self._max_wait_before_commit) + except queue.Empty: pass - else: - event = await next_event_future + if event is None: + event = await loop.run_in_executor(None, self._q.get) if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): qualified_shard = (event.path, event.shard_id) offsets = self._outstanding_offsets[qualified_shard] From 2d570e13bc14e42ea720f2727559e306128112d5 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Thu, 26 Oct 2023 16:20:56 +0800 Subject: [PATCH 03/10] `AsyncEmitSource` and `_commit_handled_events` --- storey/sources.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index c8dec0a8..364005c9 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -305,7 +305,10 @@ async def _run_loop(self): ): num_events_handled_without_commit = 0 while event is None: - can_block = await _commit_handled_events(self._outstanding_offsets, committer) + num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + # Due to the last event not being garbage collected, we tolerate a single unhandled event + # TODO: Remove after transitioning to AsyncEmitSource, which would solve the underlying problem + can_block = num_offsets_not_handled <= 1 if can_block: break try: @@ -503,7 +506,7 @@ async def await_termination(self): async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committer, commit_all=False): - all_offsets_handled = True + num_offsets_not_handled = 0 if not commit_all: gc.collect() for qualified_shard, offsets in outstanding_offsets_by_qualified_shard.items(): @@ -516,7 +519,7 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ # go over offsets in the qualified shard by arrival order until we reach an unhandled offset for offset in offsets: if not offset.is_ready_to_commit(): - all_offsets_handled = False + num_offsets_not_handled += 1 break last_handled_offset = offset.offset num_to_clear += 1 @@ -524,7 +527,7 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ path, shard_id = qualified_shard await committer(QualifiedOffset(path, shard_id, last_handled_offset)) outstanding_offsets_by_qualified_shard[qualified_shard] = offsets[num_to_clear:] - return all_offsets_handled + return num_offsets_not_handled class AsyncEmitSource(Flow): @@ -543,8 +546,7 @@ class AsyncEmitSource(Flow): """ _legal_first_step = True - _backoff = [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1] - _backoff_last_index = len(_backoff) - 1 + _max_wait_before_commit = 2 def __init__( self, @@ -586,19 +588,16 @@ async def _run_loop(self): or num_events_handled_without_commit >= self._max_events_before_commit ): num_events_handled_without_commit = 0 - can_block = await _commit_handled_events(self._outstanding_offsets, committer) - iteration = 0 # In case we can't block because there are outstanding events - while not can_block: - # Sleep to yield and to avoid busy-wait - sleep = self._backoff[min(iteration, self._backoff_last_index)] - iteration += 1 - await asyncio.sleep(sleep) - if not self._q.empty(): - event = self._q.get_nowait() - if event: + while event is None: + num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + can_block = num_offsets_not_handled == 0 + if can_block: break - can_block = await _commit_handled_events(self._outstanding_offsets, committer) + try: + event = await asyncio.wait_for(self._q.get(), self._max_wait_before_commit) + except TimeoutError: + pass if not event: event = await self._q.get() if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): From 7e3cc2a6852cb54efba0e5c9a2bdb2d58101d123 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Thu, 26 Oct 2023 17:41:20 +0800 Subject: [PATCH 04/10] Proper count of offsets --- storey/sources.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 364005c9..323e0966 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -517,9 +517,9 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ num_to_clear = 0 last_handled_offset = None # go over offsets in the qualified shard by arrival order until we reach an unhandled offset - for offset in offsets: + for i, offset in enumerate(offsets): if not offset.is_ready_to_commit(): - num_offsets_not_handled += 1 + num_offsets_not_handled += len(offsets) - i + 1 break last_handled_offset = offset.offset num_to_clear += 1 From 8ee3267e83f242bc5ffcdc1094d330978425453c Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Sun, 29 Oct 2023 14:30:37 +0800 Subject: [PATCH 05/10] Fixes --- storey/sources.py | 52 ++++++++++++++++++++-------------------------- tests/test_flow.py | 4 ++-- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 323e0966..41420df5 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -249,6 +249,7 @@ class SyncEmitSource(Flow): :param buffer_size: size of the incoming event buffer. Defaults to 8. :param key_field: Field to extract and use as the key. Optional. :param max_events_before_commit: Maximum number of events to be processed before committing offsets. + Defaults to 10,000. :param explicit_ack: Whether to explicitly commit offsets. Defaults to False. :param name: Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource). :type name: string @@ -278,7 +279,7 @@ def __init__( raise ValueError("Buffer size must be positive") self._q = queue.Queue(buffer_size) self._key_field = key_field - self._max_events_before_commit = max_events_before_commit or 1000 + self._max_events_before_commit = max_events_before_commit or 10000 self._explicit_ack = explicit_ack self._termination_q = queue.Queue(1) self._ex = None @@ -293,35 +294,30 @@ async def _run_loop(self): loop = asyncio.get_running_loop() self._termination_future = loop.create_future() committer = None - num_events_handled_without_commit = 0 + num_offsets_not_handled = 0 if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"): committer = self.context.platform.explicit_ack while True: event = None - if ( - num_events_handled_without_commit > 0 - and self._q.empty() - or num_events_handled_without_commit >= self._max_events_before_commit - ): - num_events_handled_without_commit = 0 - while event is None: + if committer: + if num_offsets_not_handled >= self._max_events_before_commit: num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) # Due to the last event not being garbage collected, we tolerate a single unhandled event - # TODO: Remove after transitioning to AsyncEmitSource, which would solve the underlying problem - can_block = num_offsets_not_handled <= 1 - if can_block: - break + # TODO: Fix after transitioning to AsyncEmitSource, which would solve the underlying problem + while num_offsets_not_handled > 1: try: event = await loop.run_in_executor(None, self._q.get, True, self._max_wait_before_commit) + break except queue.Empty: pass + num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) if event is None: event = await loop.run_in_executor(None, self._q.get) if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): qualified_shard = (event.path, event.shard_id) offsets = self._outstanding_offsets[qualified_shard] offsets.append(_EventOffset(event)) - num_events_handled_without_commit += 1 + num_offsets_not_handled += 1 try: termination_result = await self._do_downstream(event) if event is _termination_obj: @@ -519,7 +515,7 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ # go over offsets in the qualified shard by arrival order until we reach an unhandled offset for i, offset in enumerate(offsets): if not offset.is_ready_to_commit(): - num_offsets_not_handled += len(offsets) - i + 1 + num_offsets_not_handled += len(offsets) - i break last_handled_offset = offset.offset num_to_clear += 1 @@ -538,6 +534,7 @@ class AsyncEmitSource(Flow): :param buffer_size: size of the incoming event buffer. Defaults to 8. :param key_field: Field to extract and use as the key. Optional. :param max_events_before_commit: Maximum number of events to be processed before committing offsets. + Defaults to 10,000. :param explicit_ack: Whether to explicitly commit offsets. Defaults to False. :param name: Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource). :type name: string @@ -565,7 +562,7 @@ def __init__( kwargs["buffer_size"] = buffer_size self._q = asyncio.Queue(buffer_size) self._key_field = key_field - self._max_events_before_commit = max_events_before_commit or 1000 + self._max_events_before_commit = max_events_before_commit or 10000 self._explicit_ack = explicit_ack self._ex = None self._closeables = [] @@ -577,34 +574,29 @@ def _init(self): async def _run_loop(self): committer = None - num_events_handled_without_commit = 0 + num_offsets_not_handled = 0 if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"): committer = self.context.platform.explicit_ack while True: event = None - if ( - num_events_handled_without_commit > 0 - and self._q.empty() - or num_events_handled_without_commit >= self._max_events_before_commit - ): - num_events_handled_without_commit = 0 - # In case we can't block because there are outstanding events - while event is None: + if committer: + if num_offsets_not_handled >= self._max_events_before_commit: num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) - can_block = num_offsets_not_handled == 0 - if can_block: - break + # In case we can't block because there are outstanding events + while num_offsets_not_handled > 0: try: event = await asyncio.wait_for(self._q.get(), self._max_wait_before_commit) - except TimeoutError: + break + except asyncio.TimeoutError: pass + num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) if not event: event = await self._q.get() if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): qualified_shard = (event.path, event.shard_id) offsets = self._outstanding_offsets[qualified_shard] offsets.append(_EventOffset(event)) - num_events_handled_without_commit += 1 + num_offsets_not_handled += 1 try: termination_result = await self._do_downstream(event) if event is _termination_obj: diff --git a/tests/test_flow.py b/tests/test_flow.py index 3baf3152..c52714d2 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -228,7 +228,7 @@ def test_offset_commit_before_termination(): event.offset = offset controller.emit(event) - time.sleep(1) + time.sleep(SyncEmitSource._max_wait_before_commit + 1) expected_offsets = {("/", i): num_records_per_shard for i in range(num_shards)} # TODO: Remove when commit of last record is fixed @@ -269,7 +269,7 @@ async def async_offset_commit_before_termination(): del event - await asyncio.sleep(1) + await asyncio.sleep(AsyncEmitSource._max_wait_before_commit + 1) try: offsets = copy.copy(platform.offsets) From 6d9855335c378801581562c63b914eedf495cf85 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Sun, 29 Oct 2023 18:14:44 +0800 Subject: [PATCH 06/10] Count events and time since last commit to decide whether to commit again --- storey/sources.py | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 41420df5..38e94d9a 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -17,6 +17,7 @@ import gc import queue import threading +import time import uuid import warnings import weakref @@ -265,6 +266,7 @@ def __init__( buffer_size: Optional[int] = None, key_field: Union[list, str, int, None] = None, max_events_before_commit=None, + max_time_before_commit=None, explicit_ack=False, **kwargs, ): @@ -279,7 +281,8 @@ def __init__( raise ValueError("Buffer size must be positive") self._q = queue.Queue(buffer_size) self._key_field = key_field - self._max_events_before_commit = max_events_before_commit or 10000 + self._max_events_before_commit = max_events_before_commit or 20000 + self._max_time_before_commit = max_time_before_commit or 45 self._explicit_ack = explicit_ack self._termination_q = queue.Queue(1) self._ex = None @@ -295,14 +298,22 @@ async def _run_loop(self): self._termination_future = loop.create_future() committer = None num_offsets_not_handled = 0 + events_handled_since_commit = 0 + last_commit_time = time.monotonic() if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"): committer = self.context.platform.explicit_ack while True: event = None if committer: - if num_offsets_not_handled >= self._max_events_before_commit: + if ( + events_handled_since_commit >= self._max_events_before_commit + or num_offsets_not_handled > 1 + and time.monotonic() >= last_commit_time + self._max_time_before_commit + ): num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) - # Due to the last event not being garbage collected, we tolerate a single unhandled event + events_handled_since_commit = 0 + last_commit_time = time.monotonic() + # Due to the last event not being garbage collected, we tolerate a single unhandled event # TODO: Fix after transitioning to AsyncEmitSource, which would solve the underlying problem while num_offsets_not_handled > 1: try: @@ -311,6 +322,8 @@ async def _run_loop(self): except queue.Empty: pass num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + events_handled_since_commit = 0 + last_commit_time = time.monotonic() if event is None: event = await loop.run_in_executor(None, self._q.get) if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): @@ -318,6 +331,7 @@ async def _run_loop(self): offsets = self._outstanding_offsets[qualified_shard] offsets.append(_EventOffset(event)) num_offsets_not_handled += 1 + events_handled_since_commit += 1 try: termination_result = await self._do_downstream(event) if event is _termination_obj: @@ -550,6 +564,7 @@ def __init__( buffer_size: int = None, key_field: Union[list, str, None] = None, max_events_before_commit=None, + max_time_before_commit=None, explicit_ack=False, **kwargs, ): @@ -562,7 +577,8 @@ def __init__( kwargs["buffer_size"] = buffer_size self._q = asyncio.Queue(buffer_size) self._key_field = key_field - self._max_events_before_commit = max_events_before_commit or 10000 + self._max_events_before_commit = max_events_before_commit or 20000 + self._max_time_before_commit = max_time_before_commit or 45 self._explicit_ack = explicit_ack self._ex = None self._closeables = [] @@ -575,13 +591,21 @@ def _init(self): async def _run_loop(self): committer = None num_offsets_not_handled = 0 + events_handled_since_commit = 0 + last_commit_time = time.monotonic() if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"): committer = self.context.platform.explicit_ack while True: event = None if committer: - if num_offsets_not_handled >= self._max_events_before_commit: + if ( + events_handled_since_commit >= self._max_events_before_commit + or num_offsets_not_handled > 0 + and time.monotonic() >= last_commit_time + self._max_time_before_commit + ): num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + events_handled_since_commit = 0 + last_commit_time = time.monotonic() # In case we can't block because there are outstanding events while num_offsets_not_handled > 0: try: @@ -590,6 +614,8 @@ async def _run_loop(self): except asyncio.TimeoutError: pass num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + events_handled_since_commit = 0 + last_commit_time = time.monotonic() if not event: event = await self._q.get() if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): @@ -597,6 +623,7 @@ async def _run_loop(self): offsets = self._outstanding_offsets[qualified_shard] offsets.append(_EventOffset(event)) num_offsets_not_handled += 1 + events_handled_since_commit += 1 try: termination_result = await self._do_downstream(event) if event is _termination_obj: From c68e54c3e5ec2ffbe36780a45bc8d78555361162 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Mon, 30 Oct 2023 16:38:38 +0800 Subject: [PATCH 07/10] Fix `StreamTarget` --- storey/targets.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/storey/targets.py b/storey/targets.py index d8652ee0..c693511a 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -890,13 +890,18 @@ async def _worker(self): in_flight_events.append(None) while True: try: - for shard_id in range(self._shards): - if self._q.empty(): + request_sent = False + if self._q.empty(): + for shard_id in range(self._shards): req = in_flight_reqs[shard_id] + if req: + request_sent = True in_flight_reqs[shard_id] = None await self._handle_response(req) in_flight_events[shard_id] = None self._send_batch(buffers, in_flight_reqs, buffer_events, in_flight_events, shard_id) + if request_sent: + continue event = await self._q.get() if event is _termination_obj: # handle outstanding batches and in flight requests on termination for req in in_flight_reqs: From c85cdf1ac49d29f61b614879578fa5541e506f96 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Tue, 31 Oct 2023 12:45:32 +0800 Subject: [PATCH 08/10] Increase max wait before commit to 5s --- storey/sources.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 38e94d9a..76b190cf 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -259,7 +259,7 @@ class SyncEmitSource(Flow): """ _legal_first_step = True - _max_wait_before_commit = 2 + _max_wait_before_commit = 5 def __init__( self, @@ -557,7 +557,7 @@ class AsyncEmitSource(Flow): """ _legal_first_step = True - _max_wait_before_commit = 2 + _max_wait_before_commit = 5 def __init__( self, From 270b0f1e7c670d03b07bf9bd396d030bd365cafe Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Wed, 1 Nov 2023 19:10:03 +0800 Subject: [PATCH 09/10] Rename local vars and fix comments --- storey/sources.py | 16 ++++++++-------- storey/targets.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 76b190cf..abe95fbc 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -250,7 +250,7 @@ class SyncEmitSource(Flow): :param buffer_size: size of the incoming event buffer. Defaults to 8. :param key_field: Field to extract and use as the key. Optional. :param max_events_before_commit: Maximum number of events to be processed before committing offsets. - Defaults to 10,000. + Defaults to 20,000. :param explicit_ack: Whether to explicitly commit offsets. Defaults to False. :param name: Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource). :type name: string @@ -297,7 +297,7 @@ async def _run_loop(self): loop = asyncio.get_running_loop() self._termination_future = loop.create_future() committer = None - num_offsets_not_handled = 0 + num_offsets_not_committed = 0 events_handled_since_commit = 0 last_commit_time = time.monotonic() if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"): @@ -307,21 +307,21 @@ async def _run_loop(self): if committer: if ( events_handled_since_commit >= self._max_events_before_commit - or num_offsets_not_handled > 1 + or num_offsets_not_committed > 1 and time.monotonic() >= last_commit_time + self._max_time_before_commit ): - num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer) events_handled_since_commit = 0 last_commit_time = time.monotonic() # Due to the last event not being garbage collected, we tolerate a single unhandled event # TODO: Fix after transitioning to AsyncEmitSource, which would solve the underlying problem - while num_offsets_not_handled > 1: + while num_offsets_not_committed > 1: try: event = await loop.run_in_executor(None, self._q.get, True, self._max_wait_before_commit) break except queue.Empty: pass - num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer) events_handled_since_commit = 0 last_commit_time = time.monotonic() if event is None: @@ -330,7 +330,7 @@ async def _run_loop(self): qualified_shard = (event.path, event.shard_id) offsets = self._outstanding_offsets[qualified_shard] offsets.append(_EventOffset(event)) - num_offsets_not_handled += 1 + num_offsets_not_committed += 1 events_handled_since_commit += 1 try: termination_result = await self._do_downstream(event) @@ -548,7 +548,7 @@ class AsyncEmitSource(Flow): :param buffer_size: size of the incoming event buffer. Defaults to 8. :param key_field: Field to extract and use as the key. Optional. :param max_events_before_commit: Maximum number of events to be processed before committing offsets. - Defaults to 10,000. + Defaults to 20,000. :param explicit_ack: Whether to explicitly commit offsets. Defaults to False. :param name: Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource). :type name: string diff --git a/storey/targets.py b/storey/targets.py index c693511a..0092908f 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -890,17 +890,17 @@ async def _worker(self): in_flight_events.append(None) while True: try: - request_sent = False + request_sent_on_empty_queue = False if self._q.empty(): for shard_id in range(self._shards): req = in_flight_reqs[shard_id] if req: - request_sent = True + request_sent_on_empty_queue = True in_flight_reqs[shard_id] = None await self._handle_response(req) in_flight_events[shard_id] = None self._send_batch(buffers, in_flight_reqs, buffer_events, in_flight_events, shard_id) - if request_sent: + if request_sent_on_empty_queue: continue event = await self._q.get() if event is _termination_obj: # handle outstanding batches and in flight requests on termination From 7fdb62a91a53f396ddda9abdc202d74f2f4dcf98 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Thu, 2 Nov 2023 16:57:40 +0800 Subject: [PATCH 10/10] Add param to docstring --- storey/sources.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/storey/sources.py b/storey/sources.py index abe95fbc..57ec89f9 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -251,6 +251,7 @@ class SyncEmitSource(Flow): :param key_field: Field to extract and use as the key. Optional. :param max_events_before_commit: Maximum number of events to be processed before committing offsets. Defaults to 20,000. + :param max_time_before_commit: Maximum number of seconds before committing offsets. Defaults to 45. :param explicit_ack: Whether to explicitly commit offsets. Defaults to False. :param name: Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource). :type name: string @@ -549,6 +550,7 @@ class AsyncEmitSource(Flow): :param key_field: Field to extract and use as the key. Optional. :param max_events_before_commit: Maximum number of events to be processed before committing offsets. Defaults to 20,000. + :param max_time_before_commit: Maximum number of seconds before committing offsets. Defaults to 45. :param explicit_ack: Whether to explicitly commit offsets. Defaults to False. :param name: Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource). :type name: string