Skip to content

Commit

Permalink
ML-4799: Simplify and optimize emit source offset handling for perfor…
Browse files Browse the repository at this point in the history
…mance (#462)

* ML-4799: Simplify emit source offset handling for performance

* Implement timeout differently

* `AsyncEmitSource` and `_commit_handled_events`

* Proper count of offsets

* Fixes

* Count events and time since last commit to decide whether to commit again

* Fix `StreamTarget`

* Increase max wait before commit to 5s

* Rename local vars and fix comments

* Add param to docstring

---------

Co-authored-by: Gal Topper <[email protected]>
  • Loading branch information
gtopper and Gal Topper authored Nov 2, 2023
1 parent 9d62d65 commit ca426ae
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 54 deletions.
115 changes: 65 additions & 50 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import gc
import queue
import threading
import time
import uuid
import warnings
import weakref
Expand Down Expand Up @@ -249,6 +250,8 @@ class SyncEmitSource(Flow):
:param buffer_size: size of the incoming event buffer. Defaults to 8.
:param key_field: Field to extract and use as the key. Optional.
:param max_events_before_commit: Maximum number of events to be processed before committing offsets.
Defaults to 20,000.
:param max_time_before_commit: Maximum number of seconds before committing offsets. Defaults to 45.
:param explicit_ack: Whether to explicitly commit offsets. Defaults to False.
:param name: Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource).
:type name: string
Expand All @@ -257,14 +260,14 @@ class SyncEmitSource(Flow):
"""

_legal_first_step = True
_backoff = [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1]
_backoff_last_index = len(_backoff) - 1
_max_wait_before_commit = 5

def __init__(
self,
buffer_size: Optional[int] = None,
key_field: Union[list, str, int, None] = None,
max_events_before_commit=None,
max_time_before_commit=None,
explicit_ack=False,
**kwargs,
):
Expand All @@ -279,7 +282,8 @@ def __init__(
raise ValueError("Buffer size must be positive")
self._q = queue.Queue(buffer_size)
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 1000
self._max_events_before_commit = max_events_before_commit or 20000
self._max_time_before_commit = max_time_before_commit or 45
self._explicit_ack = explicit_ack
self._termination_q = queue.Queue(1)
self._ex = None
Expand All @@ -294,36 +298,41 @@ async def _run_loop(self):
loop = asyncio.get_running_loop()
self._termination_future = loop.create_future()
committer = None
num_events_handled_without_commit = 0
num_offsets_not_committed = 0
events_handled_since_commit = 0
last_commit_time = time.monotonic()
if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"):
committer = self.context.platform.explicit_ack
while True:
event = None
if (
num_events_handled_without_commit > 0
and self._q.empty()
or num_events_handled_without_commit >= self._max_events_before_commit
):
num_events_handled_without_commit = 0
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
iteration = 0
# In case we can't block because there are outstanding events
while not can_block:
sleep = self._backoff[min(iteration, self._backoff_last_index)]
iteration += 1
await asyncio.sleep(sleep)
if self._q.qsize() > 0:
event = self._q.get_nowait()
if event:
break
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
if not event:
if committer:
if (
events_handled_since_commit >= self._max_events_before_commit
or num_offsets_not_committed > 1
and time.monotonic() >= last_commit_time + self._max_time_before_commit
):
num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer)
events_handled_since_commit = 0
last_commit_time = time.monotonic()
# Due to the last event not being garbage collected, we tolerate a single unhandled event
# TODO: Fix after transitioning to AsyncEmitSource, which would solve the underlying problem
while num_offsets_not_committed > 1:
try:
event = await loop.run_in_executor(None, self._q.get, True, self._max_wait_before_commit)
break
except queue.Empty:
pass
num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer)
events_handled_since_commit = 0
last_commit_time = time.monotonic()
if event is None:
event = await loop.run_in_executor(None, self._q.get)
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_events_handled_without_commit += 1
num_offsets_not_committed += 1
events_handled_since_commit += 1
try:
termination_result = await self._do_downstream(event)
if event is _termination_obj:
Expand Down Expand Up @@ -508,7 +517,7 @@ async def await_termination(self):


async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committer, commit_all=False):
all_offsets_handled = True
num_offsets_not_handled = 0
if not commit_all:
gc.collect()
for qualified_shard, offsets in outstanding_offsets_by_qualified_shard.items():
Expand All @@ -519,17 +528,17 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ
num_to_clear = 0
last_handled_offset = None
# go over offsets in the qualified shard by arrival order until we reach an unhandled offset
for offset in offsets:
for i, offset in enumerate(offsets):
if not offset.is_ready_to_commit():
all_offsets_handled = False
num_offsets_not_handled += len(offsets) - i
break
last_handled_offset = offset.offset
num_to_clear += 1
if last_handled_offset is not None:
path, shard_id = qualified_shard
await committer(QualifiedOffset(path, shard_id, last_handled_offset))
outstanding_offsets_by_qualified_shard[qualified_shard] = offsets[num_to_clear:]
return all_offsets_handled
return num_offsets_not_handled


class AsyncEmitSource(Flow):
Expand All @@ -540,6 +549,8 @@ class AsyncEmitSource(Flow):
:param buffer_size: size of the incoming event buffer. Defaults to 8.
:param key_field: Field to extract and use as the key. Optional.
:param max_events_before_commit: Maximum number of events to be processed before committing offsets.
Defaults to 20,000.
:param max_time_before_commit: Maximum number of seconds before committing offsets. Defaults to 45.
:param explicit_ack: Whether to explicitly commit offsets. Defaults to False.
:param name: Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource).
:type name: string
Expand All @@ -548,14 +559,14 @@ class AsyncEmitSource(Flow):
"""

_legal_first_step = True
_backoff = [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1]
_backoff_last_index = len(_backoff) - 1
_max_wait_before_commit = 5

def __init__(
self,
buffer_size: int = None,
key_field: Union[list, str, None] = None,
max_events_before_commit=None,
max_time_before_commit=None,
explicit_ack=False,
**kwargs,
):
Expand All @@ -568,7 +579,8 @@ def __init__(
kwargs["buffer_size"] = buffer_size
self._q = asyncio.Queue(buffer_size)
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 1000
self._max_events_before_commit = max_events_before_commit or 20000
self._max_time_before_commit = max_time_before_commit or 45
self._explicit_ack = explicit_ack
self._ex = None
self._closeables = []
Expand All @@ -580,37 +592,40 @@ def _init(self):

async def _run_loop(self):
committer = None
num_events_handled_without_commit = 0
num_offsets_not_handled = 0
events_handled_since_commit = 0
last_commit_time = time.monotonic()
if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"):
committer = self.context.platform.explicit_ack
while True:
event = None
if (
num_events_handled_without_commit > 0
and self._q.empty()
or num_events_handled_without_commit >= self._max_events_before_commit
):
num_events_handled_without_commit = 0
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
iteration = 0
if committer:
if (
events_handled_since_commit >= self._max_events_before_commit
or num_offsets_not_handled > 0
and time.monotonic() >= last_commit_time + self._max_time_before_commit
):
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
events_handled_since_commit = 0
last_commit_time = time.monotonic()
# In case we can't block because there are outstanding events
while not can_block:
# Sleep to yield and to avoid busy-wait
sleep = self._backoff[min(iteration, self._backoff_last_index)]
iteration += 1
await asyncio.sleep(sleep)
if not self._q.empty():
event = self._q.get_nowait()
if event:
while num_offsets_not_handled > 0:
try:
event = await asyncio.wait_for(self._q.get(), self._max_wait_before_commit)
break
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
except asyncio.TimeoutError:
pass
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
events_handled_since_commit = 0
last_commit_time = time.monotonic()
if not event:
event = await self._q.get()
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_events_handled_without_commit += 1
num_offsets_not_handled += 1
events_handled_since_commit += 1
try:
termination_result = await self._do_downstream(event)
if event is _termination_obj:
Expand Down
9 changes: 7 additions & 2 deletions storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,13 +890,18 @@ async def _worker(self):
in_flight_events.append(None)
while True:
try:
for shard_id in range(self._shards):
if self._q.empty():
request_sent_on_empty_queue = False
if self._q.empty():
for shard_id in range(self._shards):
req = in_flight_reqs[shard_id]
if req:
request_sent_on_empty_queue = True
in_flight_reqs[shard_id] = None
await self._handle_response(req)
in_flight_events[shard_id] = None
self._send_batch(buffers, in_flight_reqs, buffer_events, in_flight_events, shard_id)
if request_sent_on_empty_queue:
continue
event = await self._q.get()
if event is _termination_obj: # handle outstanding batches and in flight requests on termination
for req in in_flight_reqs:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def test_offset_commit_before_termination():
event.offset = offset
controller.emit(event)

time.sleep(1)
time.sleep(SyncEmitSource._max_wait_before_commit + 1)

expected_offsets = {("/", i): num_records_per_shard for i in range(num_shards)}
# TODO: Remove when commit of last record is fixed
Expand Down Expand Up @@ -269,7 +269,7 @@ async def async_offset_commit_before_termination():

del event

await asyncio.sleep(1)
await asyncio.sleep(AsyncEmitSource._max_wait_before_commit + 1)

try:
offsets = copy.copy(platform.offsets)
Expand Down

0 comments on commit ca426ae

Please sign in to comment.