From 08e66ff7ab2ef530292ff1ee1e1f4827aafee08e Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Fri, 19 May 2017 10:17:35 +1000 Subject: [PATCH] add protection to OrderedMerge.SourceSubscriber against multiple terminal events #25 --- .../rx/internal/operators/OrderedMerge.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/github/davidmoten/rx/internal/operators/OrderedMerge.java b/src/main/java/com/github/davidmoten/rx/internal/operators/OrderedMerge.java index 35f0092d..1ab059fb 100644 --- a/src/main/java/com/github/davidmoten/rx/internal/operators/OrderedMerge.java +++ b/src/main/java/com/github/davidmoten/rx/internal/operators/OrderedMerge.java @@ -17,6 +17,7 @@ import rx.internal.operators.NotificationLite; import rx.internal.util.RxRingBuffer; import rx.internal.util.unsafe.MpscLinkedQueue; +import rx.plugins.RxJavaHooks; /** * @author David Karnokd @@ -295,7 +296,7 @@ static final class SourceSubscriber extends Subscriber { final MergeProducer parent; volatile boolean done; - public SourceSubscriber(MergeProducer parent) { + SourceSubscriber(MergeProducer parent) { queue = RxRingBuffer.getSpscInstance(); this.parent = parent; } @@ -312,6 +313,9 @@ public void requestMore(long n) { @Override public void onNext(T t) { + if (done) { + return; + } try { queue.onNext(NotificationLite.next(t)); } catch (MissingBackpressureException mbe) { @@ -336,12 +340,19 @@ public void onNext(T t) { @Override public void onError(Throwable e) { + if (done) { + RxJavaHooks.onError(e); + return; + } done = true; parent.error(e); } @Override public void onCompleted() { + if (done) { + return; + } done = true; parent.emit(); }