Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Gal Topper committed Oct 29, 2023
1 parent 7e3cc2a commit 8ee3267
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 32 deletions.
52 changes: 22 additions & 30 deletions storey/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class SyncEmitSource(Flow):
:param buffer_size: size of the incoming event buffer. Defaults to 8.
:param key_field: Field to extract and use as the key. Optional.
:param max_events_before_commit: Maximum number of events to be processed before committing offsets.
Defaults to 10,000.
:param explicit_ack: Whether to explicitly commit offsets. Defaults to False.
:param name: Name of this step, as it should appear in logs. Defaults to class name (SyncEmitSource).
:type name: string
Expand Down Expand Up @@ -278,7 +279,7 @@ def __init__(
raise ValueError("Buffer size must be positive")
self._q = queue.Queue(buffer_size)
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 1000
self._max_events_before_commit = max_events_before_commit or 10000
self._explicit_ack = explicit_ack
self._termination_q = queue.Queue(1)
self._ex = None
Expand All @@ -293,35 +294,30 @@ async def _run_loop(self):
loop = asyncio.get_running_loop()
self._termination_future = loop.create_future()
committer = None
num_events_handled_without_commit = 0
num_offsets_not_handled = 0
if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"):
committer = self.context.platform.explicit_ack
while True:
event = None
if (
num_events_handled_without_commit > 0
and self._q.empty()
or num_events_handled_without_commit >= self._max_events_before_commit
):
num_events_handled_without_commit = 0
while event is None:
if committer:
if num_offsets_not_handled >= self._max_events_before_commit:
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
# Due to the last event not being garbage collected, we tolerate a single unhandled event
# TODO: Remove after transitioning to AsyncEmitSource, which would solve the underlying problem
can_block = num_offsets_not_handled <= 1
if can_block:
break
# TODO: Fix after transitioning to AsyncEmitSource, which would solve the underlying problem
while num_offsets_not_handled > 1:
try:
event = await loop.run_in_executor(None, self._q.get, True, self._max_wait_before_commit)
break
except queue.Empty:
pass
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
if event is None:
event = await loop.run_in_executor(None, self._q.get)
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_events_handled_without_commit += 1
num_offsets_not_handled += 1
try:
termination_result = await self._do_downstream(event)
if event is _termination_obj:
Expand Down Expand Up @@ -519,7 +515,7 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ
# go over offsets in the qualified shard by arrival order until we reach an unhandled offset
for i, offset in enumerate(offsets):
if not offset.is_ready_to_commit():
num_offsets_not_handled += len(offsets) - i + 1
num_offsets_not_handled += len(offsets) - i
break
last_handled_offset = offset.offset
num_to_clear += 1
Expand All @@ -538,6 +534,7 @@ class AsyncEmitSource(Flow):
:param buffer_size: size of the incoming event buffer. Defaults to 8.
:param key_field: Field to extract and use as the key. Optional.
:param max_events_before_commit: Maximum number of events to be processed before committing offsets.
Defaults to 10,000.
:param explicit_ack: Whether to explicitly commit offsets. Defaults to False.
:param name: Name of this step, as it should appear in logs. Defaults to class name (AsyncEmitSource).
:type name: string
Expand Down Expand Up @@ -565,7 +562,7 @@ def __init__(
kwargs["buffer_size"] = buffer_size
self._q = asyncio.Queue(buffer_size)
self._key_field = key_field
self._max_events_before_commit = max_events_before_commit or 1000
self._max_events_before_commit = max_events_before_commit or 10000
self._explicit_ack = explicit_ack
self._ex = None
self._closeables = []
Expand All @@ -577,34 +574,29 @@ def _init(self):

async def _run_loop(self):
committer = None
num_events_handled_without_commit = 0
num_offsets_not_handled = 0
if self._explicit_ack and hasattr(self.context, "platform") and hasattr(self.context.platform, "explicit_ack"):
committer = self.context.platform.explicit_ack
while True:
event = None
if (
num_events_handled_without_commit > 0
and self._q.empty()
or num_events_handled_without_commit >= self._max_events_before_commit
):
num_events_handled_without_commit = 0
# In case we can't block because there are outstanding events
while event is None:
if committer:
if num_offsets_not_handled >= self._max_events_before_commit:
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
can_block = num_offsets_not_handled == 0
if can_block:
break
# In case we can't block because there are outstanding events
while num_offsets_not_handled > 0:
try:
event = await asyncio.wait_for(self._q.get(), self._max_wait_before_commit)
except TimeoutError:
break
except asyncio.TimeoutError:
pass
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
if not event:
event = await self._q.get()
if committer and hasattr(event, "path") and hasattr(event, "shard_id") and hasattr(event, "offset"):
qualified_shard = (event.path, event.shard_id)
offsets = self._outstanding_offsets[qualified_shard]
offsets.append(_EventOffset(event))
num_events_handled_without_commit += 1
num_offsets_not_handled += 1
try:
termination_result = await self._do_downstream(event)
if event is _termination_obj:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def test_offset_commit_before_termination():
event.offset = offset
controller.emit(event)

time.sleep(1)
time.sleep(SyncEmitSource._max_wait_before_commit + 1)

expected_offsets = {("/", i): num_records_per_shard for i in range(num_shards)}
# TODO: Remove when commit of last record is fixed
Expand Down Expand Up @@ -269,7 +269,7 @@ async def async_offset_commit_before_termination():

del event

await asyncio.sleep(1)
await asyncio.sleep(AsyncEmitSource._max_wait_before_commit + 1)

try:
offsets = copy.copy(platform.offsets)
Expand Down

0 comments on commit 8ee3267

Please sign in to comment.