You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug buffer_with_time_or_count loses data when the timer is the trigger and the on_next releases the GIL.
Related issue: #702, but this issue can get solved by using a scheduler since the on_next method does not release the GIL.
To Reproduce
If have a micro-service that reads data from an external source, buffers it, and aggregates and sends it to some other external service. I have added a dummy snippet that is similar to my micro-service, but has the same issue.
The issue only occurs if the timer is triggering the on_next call.
importtimefromdatetimeimporttimedelta, datetimefromrandomimportrandomimportreactivexfromreactiveximportoperatorsfromreactivex.schedulerimportThreadPoolSchedulerdefmain():
scheduler=ThreadPoolScheduler(max_workers=4)
reactivex.from_iterable(iterable()).pipe(
operators.buffer_with_time_or_count(
# set the timespan to only 2 seconds so that the timer triggers the on_nexttimespan=timedelta(seconds=2),
count=10000,
),
).subscribe(
on_next=on_next,
on_error=print,
on_completed=print,
scheduler=scheduler,
)
time.sleep(1000)
defiterable():
i=0whileTrue:
yielditime.sleep(1/10) # input network delayi+=1defon_next(data):
print(datetime.utcnow(), data[0], data[-1])
time.sleep(random() *5) # mock output network delayif__name__=='__main__':
main()
Script output
Running the script as is gives something along the lines of:
Describe the bug
buffer_with_time_or_count loses data when the timer is the trigger and the
on_next
releases the GIL.Related issue: #702, but this issue can get solved by using a scheduler since the
on_next
method does not release the GIL.To Reproduce
If have a micro-service that reads data from an external source, buffers it, and aggregates and sends it to some other external service. I have added a dummy snippet that is similar to my micro-service, but has the same issue.
The issue only occurs if the timer is triggering the
on_next
call.Script output
Running the script as is gives something along the lines of:
However, if I replace
with
the output now looks like:
Expected behavior
buffer_with_time_or_count
not losing any data. It did not matter which scheduler I used (or none at all).Additional context
4.0.0
3.10.0/3.11.0
The text was updated successfully, but these errors were encountered: