Skip to content

Commit e2c6ac2

Browse files
committed
Add Publisher.zip
Motivation: The zip operator helps combine results from multiple Publishers into a single Publisher.
1 parent 2d1c8ca commit e2c6ac2

19 files changed

+2873
-19
lines changed

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java

+724
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/*
2+
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.concurrent.api;
17+
18+
import io.servicetalk.concurrent.PublisherSource.Subscriber;
19+
import io.servicetalk.concurrent.PublisherSource.Subscription;
20+
import io.servicetalk.concurrent.api.SingleZipper.ZipArg;
21+
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
22+
import io.servicetalk.concurrent.internal.FlowControlUtils;
23+
24+
import java.lang.reflect.Array;
25+
import java.util.ArrayDeque;
26+
import java.util.ArrayList;
27+
import java.util.Iterator;
28+
import java.util.List;
29+
import java.util.Queue;
30+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
31+
import java.util.function.BiFunction;
32+
import java.util.function.Function;
33+
import javax.annotation.Nullable;
34+
35+
import static io.servicetalk.concurrent.api.Publisher.defer;
36+
import static io.servicetalk.concurrent.api.Publisher.from;
37+
import static io.servicetalk.concurrent.api.SubscriberApiUtils.unwrapNullUnchecked;
38+
import static io.servicetalk.concurrent.api.SubscriberApiUtils.wrapNull;
39+
import static io.servicetalk.concurrent.internal.ConcurrentUtils.calculateSourceRequested;
40+
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
41+
import static java.util.Objects.requireNonNull;
42+
import static java.util.function.Function.identity;
43+
44+
final class PublisherZipper {
45+
static final int ZIP_MAX_CONCURRENCY = 64;
46+
47+
private PublisherZipper() {
48+
}
49+
50+
@SuppressWarnings("unchecked")
51+
static <T1, T2, R> Publisher<R> zip(
52+
boolean delayError, int maxOutstandingDemand, Publisher<? extends T1> p1, Publisher<? extends T2> p2,
53+
BiFunction<? super T1, ? super T2, ? extends R> zipper) {
54+
return zip(delayError, maxOutstandingDemand, objects -> zipper.apply((T1) objects[0], (T2) objects[1]), p1, p2);
55+
}
56+
57+
@SuppressWarnings("unchecked")
58+
static <T1, T2, T3, R> Publisher<R> zip(
59+
boolean delayError, int maxOutstandingDemand, Publisher<? extends T1> p1, Publisher<? extends T2> p2,
60+
Publisher<? extends T3> p3, Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper) {
61+
return zip(delayError, maxOutstandingDemand,
62+
objects -> zipper.apply((T1) objects[0], (T2) objects[1], (T3) objects[2]), p1, p2, p3);
63+
}
64+
65+
@SuppressWarnings("unchecked")
66+
static <T1, T2, T3, T4, R> Publisher<R> zip(
67+
boolean delayError, int maxOutstandingDemand,
68+
Publisher<? extends T1> p1, Publisher<? extends T2> p2, Publisher<? extends T3> p3,
69+
Publisher<? extends T4> p4, Function4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipper) {
70+
return zip(delayError, maxOutstandingDemand,
71+
objects -> zipper.apply((T1) objects[0], (T2) objects[1], (T3) objects[2], (T4) objects[3]),
72+
p1, p2, p3, p4);
73+
}
74+
75+
static <R> Publisher<R> zip(boolean delayError, int maxOutstandingDemand,
76+
Function<? super Object[], ? extends R> zipper, Publisher<?>... publishers) {
77+
if (maxOutstandingDemand <= 0) {
78+
throw new IllegalArgumentException("maxOutstandingDemand: " + maxOutstandingDemand + " (expected>0)");
79+
}
80+
return defer(() -> {
81+
// flatMap doesn't require any ordering and so it always optimistically requests from all mapped Publishers
82+
// as long as there is demand from downstream to ensure forward progress is made if some Publishers aren't
83+
// producing. However, for the zip use case we queue all signals internally before applying the zipper, and
84+
// request more from upstream if we don't have a signal from all Publishers. If we let flatMap request
85+
// unrestricted we could end up queuing infinitely from 1 Publisher while none of the others are producing
86+
// (or producing much more slowly) and run out of memory. To prevent this issue we limit upstream demand
87+
// to each Publisher until we can deliver a zipped result downstream which means the zip queues are bounded.
88+
@SuppressWarnings("unchecked")
89+
RequestLimiterSubscriber<ZipArg>[] demandSubscribers = (RequestLimiterSubscriber<ZipArg>[])
90+
Array.newInstance(RequestLimiterSubscriber.class, publishers.length);
91+
@SuppressWarnings("unchecked")
92+
Publisher<ZipArg>[] mappedPublishers = new Publisher[publishers.length];
93+
for (int i = 0; i < publishers.length; ++i) {
94+
final int finalI = i;
95+
mappedPublishers[i] = publishers[i].map(v -> new ZipArg(finalI, v)).liftSync(subscriber -> {
96+
RequestLimiterSubscriber<ZipArg> demandSubscriber =
97+
new RequestLimiterSubscriber<>(subscriber, maxOutstandingDemand);
98+
demandSubscribers[finalI] = demandSubscriber;
99+
return demandSubscriber;
100+
});
101+
}
102+
103+
return (delayError ? from(mappedPublishers)
104+
.flatMapMergeDelayError(identity(), mappedPublishers.length, mappedPublishers.length) :
105+
from(mappedPublishers).flatMapMerge(identity(), mappedPublishers.length))
106+
.liftSync(new ZipPublisherOperator<>(mappedPublishers.length, zipper, demandSubscribers))
107+
.shareContextOnSubscribe();
108+
});
109+
}
110+
111+
private static final class ZipPublisherOperator<R> implements PublisherOperator<ZipArg, R> {
112+
private final int zipperArity;
113+
private final Function<? super Object[], ? extends R> zipper;
114+
private final RequestLimiterSubscriber<?>[] demandSubscribers;
115+
116+
private ZipPublisherOperator(final int zipperArity, final Function<? super Object[], ? extends R> zipper,
117+
final RequestLimiterSubscriber<?>[] demandSubscribers) {
118+
if (zipperArity > 64 || zipperArity <= 0) { // long used as bit mask to check for non-empty queues.
119+
throw new IllegalArgumentException("zipperArity " + zipperArity + "(expected: <64 && >0)");
120+
}
121+
this.zipperArity = zipperArity;
122+
this.zipper = requireNonNull(zipper);
123+
this.demandSubscribers = requireNonNull(demandSubscribers);
124+
}
125+
126+
@Override
127+
public Subscriber<? super ZipArg> apply(final Subscriber<? super R> subscriber) {
128+
return new ZipSubscriber<>(subscriber, zipperArity, zipper, demandSubscribers);
129+
}
130+
131+
private static final class ZipSubscriber<R> implements Subscriber<ZipArg> {
132+
private static final long ALL_NON_EMPTY_MASK = 0xFFFFFFFFFFFFFFFFL;
133+
private final Subscriber<? super R> subscriber;
134+
private final Queue<Object>[] array;
135+
private final Function<? super Object[], ? extends R> zipper;
136+
private final RequestLimiterSubscriber<?>[] demandSubscribers;
137+
private long nonEmptyQueueIndexes;
138+
@Nullable
139+
private Subscription subscription;
140+
141+
@SuppressWarnings("unchecked")
142+
private ZipSubscriber(final Subscriber<? super R> subscriber,
143+
final int zipperArity,
144+
final Function<? super Object[], ? extends R> zipper,
145+
final RequestLimiterSubscriber<?>[] demandSubscribers) {
146+
this.subscriber = subscriber;
147+
array = (Queue<Object>[]) Array.newInstance(Queue.class, zipperArity);
148+
for (int i = 0; i < zipperArity; ++i) {
149+
array[i] = new ArrayDeque<>();
150+
}
151+
this.demandSubscribers = requireNonNull(demandSubscribers);
152+
this.zipper = requireNonNull(zipper);
153+
for (int i = 63; i >= zipperArity; --i) {
154+
nonEmptyQueueIndexes |= (1L << i);
155+
}
156+
}
157+
158+
@Override
159+
public void onSubscribe(final Subscription s) {
160+
this.subscription = ConcurrentSubscription.wrap(s);
161+
subscriber.onSubscribe(subscription);
162+
}
163+
164+
@Override
165+
public void onNext(@Nullable final ZipArg zipArg) {
166+
assert zipArg != null;
167+
array[zipArg.index].add(wrapNull(zipArg.value));
168+
nonEmptyQueueIndexes |= 1L << zipArg.index;
169+
if (nonEmptyQueueIndexes == ALL_NON_EMPTY_MASK) {
170+
Object[] zipArray = new Object[array.length];
171+
for (int i = 0; i < array.length; ++i) {
172+
final Queue<Object> arrayQueue = array[i];
173+
final Object queuePoll = arrayQueue.poll();
174+
assert queuePoll != null;
175+
if (arrayQueue.isEmpty()) {
176+
nonEmptyQueueIndexes &= ~(1L << i);
177+
}
178+
zipArray[i] = unwrapNullUnchecked(queuePoll);
179+
// Allow this subscriber to request more if demand is pending.
180+
// Reentry: note that we call out to request more before we dequeued the current set of signals
181+
// which in theory may result in out of order delivery. However, flatMap protects against
182+
// reentry so no need to provide double protection in this method.
183+
demandSubscribers[i].incrementSourceEmitted();
184+
}
185+
subscriber.onNext(zipper.apply(zipArray));
186+
} else {
187+
assert subscription != null;
188+
subscription.request(1);
189+
}
190+
}
191+
192+
@Override
193+
public void onError(final Throwable t) {
194+
subscriber.onError(t);
195+
}
196+
197+
@Override
198+
public void onComplete() {
199+
List<Integer> nonEmptyIndexes = new ArrayList<>();
200+
for (int i = 0; i < array.length; ++i) {
201+
if ((nonEmptyQueueIndexes & (1L << i)) != 0) {
202+
nonEmptyIndexes.add(i);
203+
}
204+
}
205+
if (nonEmptyIndexes.isEmpty()) {
206+
subscriber.onComplete();
207+
} else {
208+
StringBuilder sb = new StringBuilder(20 + 68 + nonEmptyIndexes.size() * 4);
209+
sb.append("Publisher indexes: [");
210+
Iterator<Integer> itr = nonEmptyIndexes.iterator();
211+
sb.append(itr.next()); // safe to call next(), already checked is not empty.
212+
while (itr.hasNext()) {
213+
sb.append(", ").append(itr.next());
214+
}
215+
sb.append("] had onNext signals queued when onComplete terminal signal received");
216+
subscriber.onError(new IllegalStateException(sb.toString()));
217+
}
218+
}
219+
}
220+
}
221+
222+
/**
223+
* Limits the outstanding demand upstream to a positive {@link Integer} value.
224+
* @param <T> The type of data.
225+
*/
226+
private static final class RequestLimiterSubscriber<T> implements Subscriber<T> {
227+
@SuppressWarnings("rawtypes")
228+
private static final AtomicLongFieldUpdater<RequestLimiterSubscriber> sourceEmittedUpdater =
229+
AtomicLongFieldUpdater.newUpdater(RequestLimiterSubscriber.class, "sourceEmitted");
230+
@SuppressWarnings("rawtypes")
231+
private static final AtomicLongFieldUpdater<RequestLimiterSubscriber> sourceRequestedUpdater =
232+
AtomicLongFieldUpdater.newUpdater(RequestLimiterSubscriber.class, "sourceRequested");
233+
@SuppressWarnings("rawtypes")
234+
private static final AtomicLongFieldUpdater<RequestLimiterSubscriber> requestedUpdater =
235+
AtomicLongFieldUpdater.newUpdater(RequestLimiterSubscriber.class, "requested");
236+
private final Subscriber<? super T> subscriber;
237+
private volatile long sourceEmitted;
238+
@SuppressWarnings("unused")
239+
private volatile long sourceRequested;
240+
private volatile long requested;
241+
private final int maxConcurrency;
242+
@Nullable
243+
private Subscription subscription;
244+
245+
RequestLimiterSubscriber(final Subscriber<? super T> subscriber,
246+
final int maxConcurrency) {
247+
this.subscriber = subscriber;
248+
this.maxConcurrency = maxConcurrency;
249+
}
250+
251+
@Override
252+
public void onSubscribe(final Subscription s) {
253+
this.subscription = ConcurrentSubscription.wrap(s);
254+
subscriber.onSubscribe(new Subscription() {
255+
@Override
256+
public void request(final long n) {
257+
if (isRequestNValid(n)) {
258+
requestedUpdater.accumulateAndGet(RequestLimiterSubscriber.this, n,
259+
FlowControlUtils::addWithOverflowProtection);
260+
final long actualSourceRequestN = calculateSourceRequested(requestedUpdater,
261+
sourceRequestedUpdater, sourceEmittedUpdater, maxConcurrency,
262+
RequestLimiterSubscriber.this);
263+
if (actualSourceRequestN != 0) {
264+
subscription.request(actualSourceRequestN);
265+
}
266+
} else {
267+
subscription.request(n);
268+
}
269+
}
270+
271+
@Override
272+
public void cancel() {
273+
subscription.cancel();
274+
}
275+
});
276+
}
277+
278+
@Override
279+
public void onNext(@Nullable final T t) {
280+
subscriber.onNext(t);
281+
}
282+
283+
@Override
284+
public void onError(final Throwable t) {
285+
subscriber.onError(t);
286+
}
287+
288+
@Override
289+
public void onComplete() {
290+
subscriber.onComplete();
291+
}
292+
293+
void incrementSourceEmitted() {
294+
sourceEmittedUpdater.incrementAndGet(this);
295+
final long actualSourceRequestN = calculateSourceRequested(requestedUpdater, sourceRequestedUpdater,
296+
sourceEmittedUpdater, maxConcurrency, this);
297+
if (actualSourceRequestN != 0) {
298+
assert subscription != null;
299+
subscription.request(actualSourceRequestN);
300+
}
301+
}
302+
}
303+
}

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Single.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,7 @@ public final Publisher<T> concatPropagateCancel(Publisher<? extends T> next) {
858858
* return zipper.apply(f1.get(), other.get());
859859
* }</pre>
860860
* @param other The other {@link Single} to zip with.
861-
* @param zipper Used to combine the completed results for each item from {@code singles}.
861+
* @param zipper Used to combine the completed results for each item from each {@link Single}.
862862
* @param <T2> The type of {@code other}.
863863
* @param <R> The result type of the zipper.
864864
* @return a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by
@@ -884,7 +884,7 @@ public final <T2, R> Single<R> zipWith(Single<? extends T2> other,
884884
* return zipper.apply(f1.get(), other.get());
885885
* }</pre>
886886
* @param other The other {@link Single} to zip with.
887-
* @param zipper Used to combine the completed results for each item from {@code singles}.
887+
* @param zipper Used to combine the completed results for each item from each {@link Single}.
888888
* @param <T2> The type of {@code other}.
889889
* @param <R> The result type of the zipper.
890890
* @return a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/SingleZipper.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,12 @@ static <R> Single<R> zipDelayError(Function<? super Object[], ? extends R> zippe
127127
}).map(zipper);
128128
}
129129

130-
private static final class ZipArg {
131-
private final int index;
130+
static final class ZipArg {
131+
final int index;
132132
@Nullable
133-
private final Object value;
133+
final Object value;
134134

135-
private ZipArg(final int index, final Object value) {
135+
ZipArg(final int index, final Object value) {
136136
this.index = index;
137137
this.value = value;
138138
}

0 commit comments

Comments
 (0)