-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
java.lang.IllegalStateException: Terminal event already emitted. #25
Comments
Thanks for the report, I'll have a look. |
Thanks a lot, if you need anymore information. I was debugging a bit today and since we use the Serialized class to read, if I read 10 files I can clearly see 10 EOFException events terminating the chain:
However the chain seems to keep on reading all 10 files after that which of course fails since the stream is closed already:
it looks like an issue with double subscription/reading, though I cannot pinpoint it where. |
Yeah, I can see quite a bit from the stack trace on this one and that is
what I assumed was happening. Serialized looks fine. I'm looking at
SyncOnSubscribe and OrderedMerge.
…On 18 May 2017 at 23:03, Miha Blazin ***@***.***> wrote:
Thanks a lot, if you need anymore information.
I was debugging a bit today and since we use the Serialized class to read,
if I read 10 files I can clearly see 10 EOFException events terminating the
chain:
https://github.com/davidmoten/rxjava-extras/blob/
ec425f3/src/main/java/com/
github/davidmoten/rx/Serialized.java#L60
However the chain seems to keep on reading all 10 files after that which
of course fails since the stream is closed already:
https://github.com/davidmoten/rxjava-extras/blob/
ec425f3/src/main/java/com/
github/davidmoten/rx/Serialized.java#L64
it looks like an issue with double subscription/reading, though I cannot
pinpoint it where.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#25 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AATa68w5qvfC_1GCZQNp246KyoNIhqlSks5r7EGsgaJpZM4NeF3K>
.
|
Can you distill this to a repeatable unit test? |
I can see an improvement to make to |
Supposing I do come up with a fix for this what do you want to do about it? Upgrade to latest? Patch your existing app with fixes to a single class? |
I will try to make a repeatable Unit Test case. In case it does get fixed, I would expect a hotfix version of the 0.7.x branch ... as in 0.7.9.25. |
I get the following error when trying to merge a few files using
flatMap(list -> OrderedMerge.create(list, comparator));
where list is a List<Observable<java.io.File>> .
on version 0.7.9.24 ( I think our RxJava version is too old for the 0.8.x branch ).
java.lang.IllegalStateException: Terminal event already emitted. at rx.observables.SyncOnSubscribe$SubscriptionProducer.onError(SyncOnSubscribe.java:476) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:65) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.request(Subscriber.java:157) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.requestMore(OrderedMerge.java:312) at com.github.davidmoten.rx.internal.operators.OrderedMerge$MergeProducer.emit(OrderedMerge.java:251) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.onNext(OrderedMerge.java:336) at rx.internal.operators.OperatorCast$CastSubscriber.onNext(OperatorCast.java:69) at rx.observers.Subscribers$5.onNext(Subscribers.java:235) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) at rx.observables.SyncOnSubscribe$SubscriptionProducer.onNext(SyncOnSubscribe.java:490) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:59) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorCast$CastSubscriber.setProducer(OperatorCast.java:94) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:62) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:43) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:94) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:32) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:85) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:27) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastPath(OnSubscribeFromIterable.java:190) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:86) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:100) at rx.internal.operators.DeferredScalarSubscriber.onCompleted(DeferredScalarSubscriber.java:73) at rx.internal.operators.OperatorBufferWithSize$BufferExact.onCompleted(OperatorBufferWithSize.java:130) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.producers.SingleProducer.request(SingleProducer.java:75) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:110) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:102) at rx.internal.operators.OnSubscribeTakeLastOne$TakeLastOneSubscriber.onCompleted(OnSubscribeTakeLastOne.java:57) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OperatorTake$1.onCompleted(OperatorTake.java:56) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.checkTerminated(OnSubscribeFlattenIterable.java:308) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:180) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onCompleted(OnSubscribeFlattenIterable.java:147) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.checkTerminated(OnSubscribeFlattenIterable.java:308) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:180) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onCompleted(OnSubscribeFlattenIterable.java:147) at rx.internal.operators.OnSubscribeRedo$4$1.onCompleted(OnSubscribeRedo.java:321) at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:239) at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onCompleted(OperatorZip.java:307) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:298) at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:284) at rx.internal.operators.NotificationLite.accept(NotificationLite.java:152) at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext(SubjectSubscriptionManager.java:255) at rx.subjects.BehaviorSubject.onNext(BehaviorSubject.java:161) at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67) at rx.internal.operators.OnSubscribeRedo$2$1.onCompleted(OnSubscribeRedo.java:228) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101) at retrofit.RxSupport$2.run(RxSupport.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
it swallows the original Exception:
java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.fill(BufferedInputStream.java:214) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2303) at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2596) at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2606) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:58) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.request(Subscriber.java:157) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.requestMore(OrderedMerge.java:312) at com.github.davidmoten.rx.internal.operators.OrderedMerge$MergeProducer.emit(OrderedMerge.java:251) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.onNext(OrderedMerge.java:336) at rx.internal.operators.OperatorCast$CastSubscriber.onNext(OperatorCast.java:69) at rx.observers.Subscribers$5.onNext(Subscribers.java:235) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) at rx.observables.SyncOnSubscribe$SubscriptionProducer.onNext(SyncOnSubscribe.java:490) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:59) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorCast$CastSubscriber.setProducer(OperatorCast.java:94) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:62) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:43) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:94) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:32) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:85) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:27) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastPath(OnSubscribeFromIterable.java:190) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:86) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:100) at rx.internal.operators.DeferredScalarSubscriber.onCompleted(DeferredScalarSubscriber.java:73) at rx.internal.operators.OperatorBufferWithSize$BufferExact.onCompleted(OperatorBufferWithSize.java:130) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.producers.SingleProducer.request(SingleProducer.java:75) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:110) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:102) at rx.internal.operators.OnSubscribeTakeLastOne$TakeLastOneSubscriber.onCompleted(OnSubscribeTakeLastOne.java:57) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at retrofit.RxSupport$2.run(RxSupport.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Did anyone else experience such an issue?
The text was updated successfully, but these errors were encountered: