Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ML-4799: Simplify and optimize emit source offset handling for performance #462

Merged
merged 10 commits into from
Nov 2, 2023
115 changes: 65 additions & 50 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import gc
import queue
import threading
import time
import uuid
import warnings
import weakref
Expand Down Expand Up @@ -249,6 +250,8 @@ class SyncEmitSource(Flow):
:param buffer_size: size of the incoming event buffer. Defaults to 8.
:param key_field: Field to extract and use as the key. Optional.
:param max_events_before_commit: Maximum number of events to be processed before committing offsets.
Defaults to 20,000.
:param max_time_before_commit: Maximum number of seconds before committing offsets. Defaults to 45.
:param explicit_ack: Whether to explicitly commit offsets. Defaults to False.
:param name: Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource).
:type name: string
Expand All @@ -257,14 +260,14 @@ class SyncEmitSource(Flow):
"""

_legal_first_step = True
_backoff = [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1]
_backoff_last_index = len(_backoff) - 1
_max_wait_before_commit = 5

def __init__(
self,
buffer_size: Optional[int] = None,
key_field: Union[list, str, int, None] = None,
max_events_before_commit=None,
max_time_before_commit=None,
explicit_ack=False,
**kwargs,
):
Expand All @@ -279,7 +282,8 @@ def __init__(
raise ValueError("Buffer size must be positive")
self._q = queue.Queue(buffer_size)
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 1000
self._max_events_before_commit = max_events_before_commit or 20000
self._max_time_before_commit = max_time_before_commit or 45
self._explicit_ack = explicit_ack
self._termination_q = queue.Queue(1)
self._ex = None
Expand All @@ -294,36 +298,41 @@ async def _run_loop(self):
loop = asyncio.get_running_loop()
self._termination_future = loop.create_future()
committer = None
num_events_handled_without_commit = 0
num_offsets_not_committed = 0
events_handled_since_commit = 0
last_commit_time = time.monotonic()
if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"):
committer = self.context.platform.explicit_ack
while True:
event = None
if (
num_events_handled_without_commit > 0
and self._q.empty()
or num_events_handled_without_commit >= self._max_events_before_commit
):
num_events_handled_without_commit = 0
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
iteration = 0
# In case we can't block because there are outstanding events
while not can_block:
sleep = self._backoff[min(iteration, self._backoff_last_index)]
iteration += 1
await asyncio.sleep(sleep)
if self._q.qsize() > 0:
event = self._q.get_nowait()
if event:
break
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
if not event:
if committer:
if (
events_handled_since_commit >= self._max_events_before_commit
or num_offsets_not_committed > 1
and time.monotonic() >= last_commit_time + self._max_time_before_commit
):
num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer)
events_handled_since_commit = 0
last_commit_time = time.monotonic()
# Due to the last event not being garbage collected, we tolerate a single unhandled event
# TODO: Fix after transitioning to AsyncEmitSource, which would solve the underlying problem
while num_offsets_not_committed > 1:
try:
event = await loop.run_in_executor(None, self._q.get, True, self._max_wait_before_commit)
break
except queue.Empty:
pass
num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer)
events_handled_since_commit = 0
last_commit_time = time.monotonic()
if event is None:
event = await loop.run_in_executor(None, self._q.get)
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_events_handled_without_commit += 1
num_offsets_not_committed += 1
events_handled_since_commit += 1
try:
termination_result = await self._do_downstream(event)
if event is _termination_obj:
Expand Down Expand Up @@ -508,7 +517,7 @@ async def await_termination(self):


async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committer, commit_all=False):
all_offsets_handled = True
num_offsets_not_handled = 0
if not commit_all:
gc.collect()
for qualified_shard, offsets in outstanding_offsets_by_qualified_shard.items():
Expand All @@ -519,17 +528,17 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ
num_to_clear = 0
last_handled_offset = None
# go over offsets in the qualified shard by arrival order until we reach an unhandled offset
for offset in offsets:
for i, offset in enumerate(offsets):
if not offset.is_ready_to_commit():
all_offsets_handled = False
num_offsets_not_handled += len(offsets) - i
break
last_handled_offset = offset.offset
num_to_clear += 1
if last_handled_offset is not None:
path, shard_id = qualified_shard
await committer(QualifiedOffset(path, shard_id, last_handled_offset))
outstanding_offsets_by_qualified_shard[qualified_shard] = offsets[num_to_clear:]
return all_offsets_handled
return num_offsets_not_handled


class AsyncEmitSource(Flow):
Expand All @@ -540,6 +549,8 @@ class AsyncEmitSource(Flow):
:param buffer_size: size of the incoming event buffer. Defaults to 8.
:param key_field: Field to extract and use as the key. Optional.
:param max_events_before_commit: Maximum number of events to be processed before committing offsets.
Defaults to 20,000.
:param max_time_before_commit: Maximum number of seconds before committing offsets. Defaults to 45.
:param explicit_ack: Whether to explicitly commit offsets. Defaults to False.
:param name: Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource).
:type name: string
Expand All @@ -548,14 +559,14 @@ class AsyncEmitSource(Flow):
"""

_legal_first_step = True
_backoff = [0, 1 / 16, 1 / 8, 1 / 4, 1 / 2, 1]
_backoff_last_index = len(_backoff) - 1
_max_wait_before_commit = 5

def __init__(
self,
buffer_size: int = None,
key_field: Union[list, str, None] = None,
max_events_before_commit=None,
max_time_before_commit=None,
explicit_ack=False,
**kwargs,
):
Expand All @@ -568,7 +579,8 @@ def __init__(
kwargs["buffer_size"] = buffer_size
self._q = asyncio.Queue(buffer_size)
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 1000
self._max_events_before_commit = max_events_before_commit or 20000
self._max_time_before_commit = max_time_before_commit or 45
self._explicit_ack = explicit_ack
self._ex = None
self._closeables = []
Expand All @@ -580,37 +592,40 @@ def _init(self):

async def _run_loop(self):
committer = None
num_events_handled_without_commit = 0
num_offsets_not_handled = 0
events_handled_since_commit = 0
last_commit_time = time.monotonic()
if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"):
committer = self.context.platform.explicit_ack
while True:
event = None
if (
num_events_handled_without_commit > 0
and self._q.empty()
or num_events_handled_without_commit >= self._max_events_before_commit
):
num_events_handled_without_commit = 0
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
iteration = 0
if committer:
if (
events_handled_since_commit >= self._max_events_before_commit
or num_offsets_not_handled > 0
and time.monotonic() >= last_commit_time + self._max_time_before_commit
):
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
events_handled_since_commit = 0
last_commit_time = time.monotonic()
# In case we can't block because there are outstanding events
while not can_block:
# Sleep to yield and to avoid busy-wait
sleep = self._backoff[min(iteration, self._backoff_last_index)]
iteration += 1
await asyncio.sleep(sleep)
if not self._q.empty():
event = self._q.get_nowait()
if event:
while num_offsets_not_handled > 0:
try:
event = await asyncio.wait_for(self._q.get(), self._max_wait_before_commit)
break
can_block = await _commit_handled_events(self._outstanding_offsets, committer)
except asyncio.TimeoutError:
pass
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
events_handled_since_commit = 0
last_commit_time = time.monotonic()
if not event:
event = await self._q.get()
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_events_handled_without_commit += 1
num_offsets_not_handled += 1
gtopper marked this conversation as resolved.
Show resolved Hide resolved
events_handled_since_commit += 1
try:
termination_result = await self._do_downstream(event)
if event is _termination_obj:
Expand Down
9 changes: 7 additions & 2 deletions storey/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,13 +890,18 @@ async def _worker(self):
in_flight_events.append(None)
while True:
try:
for shard_id in range(self._shards):
if self._q.empty():
request_sent_on_empty_queue = False
if self._q.empty():
for shard_id in range(self._shards):
req = in_flight_reqs[shard_id]
if req:
request_sent_on_empty_queue = True
in_flight_reqs[shard_id] = None
await self._handle_response(req)
in_flight_events[shard_id] = None
self._send_batch(buffers, in_flight_reqs, buffer_events, in_flight_events, shard_id)
if request_sent_on_empty_queue:
continue
event = await self._q.get()
if event is _termination_obj: # handle outstanding batches and in flight requests on termination
for req in in_flight_reqs:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def test_offset_commit_before_termination():
event.offset = offset
controller.emit(event)

time.sleep(1)
time.sleep(SyncEmitSource._max_wait_before_commit + 1)

expected_offsets = {("/", i): num_records_per_shard for i in range(num_shards)}
# TODO: Remove when commit of last record is fixed
Expand Down Expand Up @@ -269,7 +269,7 @@ async def async_offset_commit_before_termination():

del event

await asyncio.sleep(1)
await asyncio.sleep(AsyncEmitSource._max_wait_before_commit + 1)

try:
offsets = copy.copy(platform.offsets)
Expand Down