From ca426ae22de136780c721b3367223c6d7fc26760 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Thu, 2 Nov 2023 17:05:35 +0800 Subject: [PATCH] ML-4799: Simplify and optimize emit source offset handling for performance (#462) * ML-4799: Simplify emit source offset handling for performance * Implement timeout differently * `AsyncEmitSource` and `_commit_handled_events` * Proper count of offsets * Fixes * Count events and time since last commit to decide whether to commit again * Fix `StreamTarget` * Increase max wait before commit to 5s * Rename local vars and fix comments * Add param to docstring --------- Co-authored-by: Gal Topper --- storey/sources.py | 115 +++++++++++++++++++++++++-------------------- storey/targets.py | 9 +++- tests/test_flow.py | 4 +- 3 files changed, 74 insertions(+), 54 deletions(-) diff --git a/storey/sources.py b/storey/sources.py index 52393a61..57ec89f9 100644 --- a/storey/sources.py +++ b/storey/sources.py @@ -17,6 +17,7 @@ import gc import queue import threading +import time import uuid import warnings import weakref @@ -249,6 +250,8 @@ class SyncEmitSource(Flow): :param buffer_size: size of the incoming event buffer. Defaults to 8. :param key_field: Field to extract and use as the key. Optional. :param max_events_before_commit: Maximum number of events to be processed before committing offsets. + Defaults to 20,000. + :param max_time_before_commit: Maximum number of seconds before committing offsets. Defaults to 45. :param explicit_ack: Whether to explicitly commit offsets. Defaults to False. :param name: Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource). :type name: string @@ -257,14 +260,14 @@ class SyncEmitSource(Flow): """ _legal_first_step = True - _backoff = [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1] - _backoff_last_index = len(_backoff) - 1 + _max_wait_before_commit = 5 def __init__( self, buffer_size: Optional[int] = None, key_field: Union[list, str, int, None] = None, max_events_before_commit=None, + max_time_before_commit=None, explicit_ack=False, **kwargs, ): @@ -279,7 +282,8 @@ def __init__( raise ValueError("Buffer size must be positive") self._q = queue.Queue(buffer_size) self._key_field = key_field - self._max_events_before_commit = max_events_before_commit or 1000 + self._max_events_before_commit = max_events_before_commit or 20000 + self._max_time_before_commit = max_time_before_commit or 45 self._explicit_ack = explicit_ack self._termination_q = queue.Queue(1) self._ex = None @@ -294,36 +298,41 @@ async def _run_loop(self): loop = asyncio.get_running_loop() self._termination_future = loop.create_future() committer = None - num_events_handled_without_commit = 0 + num_offsets_not_committed = 0 + events_handled_since_commit = 0 + last_commit_time = time.monotonic() if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"): committer = self.context.platform.explicit_ack while True: event = None - if ( - num_events_handled_without_commit > 0 - and self._q.empty() - or num_events_handled_without_commit >= self._max_events_before_commit - ): - num_events_handled_without_commit = 0 - can_block = await _commit_handled_events(self._outstanding_offsets, committer) - iteration = 0 - # In case we can't block because there are outstanding events - while not can_block: - sleep = self._backoff[min(iteration, self._backoff_last_index)] - iteration += 1 - await asyncio.sleep(sleep) - if self._q.qsize() > 0: - event = self._q.get_nowait() - if event: - break - can_block = await _commit_handled_events(self._outstanding_offsets, committer) - if not event: + if committer: + if ( + events_handled_since_commit >= self._max_events_before_commit + or num_offsets_not_committed > 1 + and time.monotonic() >= last_commit_time + self._max_time_before_commit + ): + num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer) + events_handled_since_commit = 0 + last_commit_time = time.monotonic() + # Due to the last event not being garbage collected, we tolerate a single unhandled event + # TODO: Fix after transitioning to AsyncEmitSource, which would solve the underlying problem + while num_offsets_not_committed > 1: + try: + event = await loop.run_in_executor(None, self._q.get, True, self._max_wait_before_commit) + break + except queue.Empty: + pass + num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer) + events_handled_since_commit = 0 + last_commit_time = time.monotonic() + if event is None: event = await loop.run_in_executor(None, self._q.get) if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): qualified_shard = (event.path, event.shard_id) offsets = self._outstanding_offsets[qualified_shard] offsets.append(_EventOffset(event)) - num_events_handled_without_commit += 1 + num_offsets_not_committed += 1 + events_handled_since_commit += 1 try: termination_result = await self._do_downstream(event) if event is _termination_obj: @@ -508,7 +517,7 @@ async def await_termination(self): async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committer, commit_all=False): - all_offsets_handled = True + num_offsets_not_handled = 0 if not commit_all: gc.collect() for qualified_shard, offsets in outstanding_offsets_by_qualified_shard.items(): @@ -519,9 +528,9 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ num_to_clear = 0 last_handled_offset = None # go over offsets in the qualified shard by arrival order until we reach an unhandled offset - for offset in offsets: + for i, offset in enumerate(offsets): if not offset.is_ready_to_commit(): - all_offsets_handled = False + num_offsets_not_handled += len(offsets) - i break last_handled_offset = offset.offset num_to_clear += 1 @@ -529,7 +538,7 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ path, shard_id = qualified_shard await committer(QualifiedOffset(path, shard_id, last_handled_offset)) outstanding_offsets_by_qualified_shard[qualified_shard] = offsets[num_to_clear:] - return all_offsets_handled + return num_offsets_not_handled class AsyncEmitSource(Flow): @@ -540,6 +549,8 @@ class AsyncEmitSource(Flow): :param buffer_size: size of the incoming event buffer. Defaults to 8. :param key_field: Field to extract and use as the key. Optional. :param max_events_before_commit: Maximum number of events to be processed before committing offsets. + Defaults to 20,000. + :param max_time_before_commit: Maximum number of seconds before committing offsets. Defaults to 45. :param explicit_ack: Whether to explicitly commit offsets. Defaults to False. :param name: Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource). :type name: string @@ -548,14 +559,14 @@ class AsyncEmitSource(Flow): """ _legal_first_step = True - _backoff = [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1] - _backoff_last_index = len(_backoff) - 1 + _max_wait_before_commit = 5 def __init__( self, buffer_size: int = None, key_field: Union[list, str, None] = None, max_events_before_commit=None, + max_time_before_commit=None, explicit_ack=False, **kwargs, ): @@ -568,7 +579,8 @@ def __init__( kwargs["buffer_size"] = buffer_size self._q = asyncio.Queue(buffer_size) self._key_field = key_field - self._max_events_before_commit = max_events_before_commit or 1000 + self._max_events_before_commit = max_events_before_commit or 20000 + self._max_time_before_commit = max_time_before_commit or 45 self._explicit_ack = explicit_ack self._ex = None self._closeables = [] @@ -580,37 +592,40 @@ def _init(self): async def _run_loop(self): committer = None - num_events_handled_without_commit = 0 + num_offsets_not_handled = 0 + events_handled_since_commit = 0 + last_commit_time = time.monotonic() if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"): committer = self.context.platform.explicit_ack while True: event = None - if ( - num_events_handled_without_commit > 0 - and self._q.empty() - or num_events_handled_without_commit >= self._max_events_before_commit - ): - num_events_handled_without_commit = 0 - can_block = await _commit_handled_events(self._outstanding_offsets, committer) - iteration = 0 + if committer: + if ( + events_handled_since_commit >= self._max_events_before_commit + or num_offsets_not_handled > 0 + and time.monotonic() >= last_commit_time + self._max_time_before_commit + ): + num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + events_handled_since_commit = 0 + last_commit_time = time.monotonic() # In case we can't block because there are outstanding events - while not can_block: - # Sleep to yield and to avoid busy-wait - sleep = self._backoff[min(iteration, self._backoff_last_index)] - iteration += 1 - await asyncio.sleep(sleep) - if not self._q.empty(): - event = self._q.get_nowait() - if event: + while num_offsets_not_handled > 0: + try: + event = await asyncio.wait_for(self._q.get(), self._max_wait_before_commit) break - can_block = await _commit_handled_events(self._outstanding_offsets, committer) + except asyncio.TimeoutError: + pass + num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer) + events_handled_since_commit = 0 + last_commit_time = time.monotonic() if not event: event = await self._q.get() if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"): qualified_shard = (event.path, event.shard_id) offsets = self._outstanding_offsets[qualified_shard] offsets.append(_EventOffset(event)) - num_events_handled_without_commit += 1 + num_offsets_not_handled += 1 + events_handled_since_commit += 1 try: termination_result = await self._do_downstream(event) if event is _termination_obj: diff --git a/storey/targets.py b/storey/targets.py index d8652ee0..0092908f 100644 --- a/storey/targets.py +++ b/storey/targets.py @@ -890,13 +890,18 @@ async def _worker(self): in_flight_events.append(None) while True: try: - for shard_id in range(self._shards): - if self._q.empty(): + request_sent_on_empty_queue = False + if self._q.empty(): + for shard_id in range(self._shards): req = in_flight_reqs[shard_id] + if req: + request_sent_on_empty_queue = True in_flight_reqs[shard_id] = None await self._handle_response(req) in_flight_events[shard_id] = None self._send_batch(buffers, in_flight_reqs, buffer_events, in_flight_events, shard_id) + if request_sent_on_empty_queue: + continue event = await self._q.get() if event is _termination_obj: # handle outstanding batches and in flight requests on termination for req in in_flight_reqs: diff --git a/tests/test_flow.py b/tests/test_flow.py index 3baf3152..c52714d2 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -228,7 +228,7 @@ def test_offset_commit_before_termination(): event.offset = offset controller.emit(event) - time.sleep(1) + time.sleep(SyncEmitSource._max_wait_before_commit + 1) expected_offsets = {("/", i): num_records_per_shard for i in range(num_shards)} # TODO: Remove when commit of last record is fixed @@ -269,7 +269,7 @@ async def async_offset_commit_before_termination(): del event - await asyncio.sleep(1) + await asyncio.sleep(AsyncEmitSource._max_wait_before_commit + 1) try: offsets = copy.copy(platform.offsets)