Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.IllegalStateException: Terminal event already emitted. #25

Open
mihab opened this issue May 17, 2017 · 7 comments
Open

java.lang.IllegalStateException: Terminal event already emitted. #25

mihab opened this issue May 17, 2017 · 7 comments

Comments

@mihab
Copy link

mihab commented May 17, 2017

I get the following error when trying to merge a few files using

flatMap(list -> OrderedMerge.create(list, comparator));

where list is a List<Observable<java.io.File>> .

on version 0.7.9.24 ( I think our RxJava version is too old for the 0.8.x branch ).

java.lang.IllegalStateException: Terminal event already emitted. at rx.observables.SyncOnSubscribe$SubscriptionProducer.onError(SyncOnSubscribe.java:476) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:65) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.request(Subscriber.java:157) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.requestMore(OrderedMerge.java:312) at com.github.davidmoten.rx.internal.operators.OrderedMerge$MergeProducer.emit(OrderedMerge.java:251) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.onNext(OrderedMerge.java:336) at rx.internal.operators.OperatorCast$CastSubscriber.onNext(OperatorCast.java:69) at rx.observers.Subscribers$5.onNext(Subscribers.java:235) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) at rx.observables.SyncOnSubscribe$SubscriptionProducer.onNext(SyncOnSubscribe.java:490) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:59) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorCast$CastSubscriber.setProducer(OperatorCast.java:94) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:62) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:43) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:94) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:32) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:85) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:27) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastPath(OnSubscribeFromIterable.java:190) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:86) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:100) at rx.internal.operators.DeferredScalarSubscriber.onCompleted(DeferredScalarSubscriber.java:73) at rx.internal.operators.OperatorBufferWithSize$BufferExact.onCompleted(OperatorBufferWithSize.java:130) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.producers.SingleProducer.request(SingleProducer.java:75) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:110) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:102) at rx.internal.operators.OnSubscribeTakeLastOne$TakeLastOneSubscriber.onCompleted(OnSubscribeTakeLastOne.java:57) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OperatorTake$1.onCompleted(OperatorTake.java:56) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.checkTerminated(OnSubscribeFlattenIterable.java:308) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:180) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onCompleted(OnSubscribeFlattenIterable.java:147) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.checkTerminated(OnSubscribeFlattenIterable.java:308) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.drain(OnSubscribeFlattenIterable.java:180) at rx.internal.operators.OnSubscribeFlattenIterable$FlattenIterableSubscriber.onCompleted(OnSubscribeFlattenIterable.java:147) at rx.internal.operators.OnSubscribeRedo$4$1.onCompleted(OnSubscribeRedo.java:321) at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:239) at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onCompleted(OperatorZip.java:307) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:298) at rx.internal.operators.OnSubscribeRedo$3$1.onNext(OnSubscribeRedo.java:284) at rx.internal.operators.NotificationLite.accept(NotificationLite.java:152) at rx.subjects.SubjectSubscriptionManager$SubjectObserver.emitNext(SubjectSubscriptionManager.java:255) at rx.subjects.BehaviorSubject.onNext(BehaviorSubject.java:161) at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92) at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67) at rx.internal.operators.OnSubscribeRedo$2$1.onCompleted(OnSubscribeRedo.java:228) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101) at retrofit.RxSupport$2.run(RxSupport.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

it swallows the original Exception:

java.io.IOException: Stream closed at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170) at java.io.BufferedInputStream.fill(BufferedInputStream.java:214) at java.io.BufferedInputStream.read(BufferedInputStream.java:265) at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2303) at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2596) at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:2606) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:58) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.request(Subscriber.java:157) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.requestMore(OrderedMerge.java:312) at com.github.davidmoten.rx.internal.operators.OrderedMerge$MergeProducer.emit(OrderedMerge.java:251) at com.github.davidmoten.rx.internal.operators.OrderedMerge$SourceSubscriber.onNext(OrderedMerge.java:336) at rx.internal.operators.OperatorCast$CastSubscriber.onNext(OperatorCast.java:69) at rx.observers.Subscribers$5.onNext(Subscribers.java:235) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) at rx.observables.SyncOnSubscribe$SubscriptionProducer.onNext(SyncOnSubscribe.java:490) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:59) at com.github.davidmoten.rx.Serialized$1.next(Serialized.java:47) at rx.observables.SyncOnSubscribe$SubscriptionProducer.nextIteration(SyncOnSubscribe.java:459) at rx.observables.SyncOnSubscribe$SubscriptionProducer.slowPath(SyncOnSubscribe.java:437) at rx.observables.SyncOnSubscribe$SubscriptionProducer.request(SyncOnSubscribe.java:395) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorCast$CastSubscriber.setProducer(OperatorCast.java:94) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.Subscriber.setProducer(Subscriber.java:205) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:62) at rx.observables.SyncOnSubscribe.call(SyncOnSubscribe.java:43) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:94) at rx.internal.operators.OnSubscribeUsing.call(OnSubscribeUsing.java:32) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:85) at com.github.davidmoten.rx.internal.operators.OrderedMerge.call(OrderedMerge.java:27) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102) at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85) at rx.internal.operators.OperatorToObservableList$1.onCompleted(OperatorToObservableList.java:98) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.fastPath(OnSubscribeFromIterable.java:190) at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:86) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:63) at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:34) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10151) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:251) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:100) at rx.internal.operators.DeferredScalarSubscriber.onCompleted(DeferredScalarSubscriber.java:73) at rx.internal.operators.OperatorBufferWithSize$BufferExact.onCompleted(OperatorBufferWithSize.java:130) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.producers.SingleProducer.request(SingleProducer.java:75) at rx.Subscriber.setProducer(Subscriber.java:211) at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:110) at rx.internal.operators.DeferredScalarSubscriber.complete(DeferredScalarSubscriber.java:102) at rx.internal.operators.OnSubscribeTakeLastOne$TakeLastOneSubscriber.onCompleted(OnSubscribeTakeLastOne.java:57) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OperatorSwitchIfEmpty$ParentSubscriber.onCompleted(OperatorSwitchIfEmpty.java:70) at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:659) at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:571) at rx.internal.operators.OperatorMerge$InnerSubscriber.onCompleted(OperatorMerge.java:860) at rx.internal.operators.OnSubscribeMap$MapSubscriber.onCompleted(OnSubscribeMap.java:97) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onCompleted(OperatorOnErrorResumeNextViaFunction.java:101) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onCompleted(OnSubscribeDoOnEach.java:70) at retrofit.RxSupport$2.run(RxSupport.java:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Did anyone else experience such an issue?

@davidmoten
Copy link
Owner

Thanks for the report, I'll have a look.

@mihab
Copy link
Author

mihab commented May 18, 2017

Thanks a lot, if you need anymore information.

I was debugging a bit today and since we use the Serialized class to read, if I read 10 files I can clearly see 10 EOFException events terminating the chain:

However the chain seems to keep on reading all 10 files after that which of course fails since the stream is closed already:

it looks like an issue with double subscription/reading, though I cannot pinpoint it where.

@davidmoten
Copy link
Owner

davidmoten commented May 18, 2017 via email

@davidmoten
Copy link
Owner

Can you distill this to a repeatable unit test?

@davidmoten
Copy link
Owner

I can see an improvement to make to OrderedMerge to ensure that SourceSubscriber does not emit multiple terminal events but I'd like to have a unit test to ensure that that is the cause of the problem (I'm not convinced yet).

@davidmoten
Copy link
Owner

Supposing I do come up with a fix for this what do you want to do about it? Upgrade to latest? Patch your existing app with fixes to a single class?

@mihab
Copy link
Author

mihab commented May 19, 2017

I will try to make a repeatable Unit Test case. In case it does get fixed, I would expect a hotfix version of the 0.7.x branch ... as in 0.7.9.25.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants