compose(
* This method will block until all requests are read in. The responses are computed and
* written to {@code asyncResponse} asynchronously.
*/
- public final void resume(AsyncResponse asyncResponse) {
- log.debug("Resuming StreamingResponse");
+ public final void resume(AsyncResponse asyncResponse) throws InterruptedException {
+ log.error("Resuming StreamingResponse - about to call new async response queue");
AsyncResponseQueue responseQueue = new AsyncResponseQueue(chunkedOutputFactory);
responseQueue.asyncResume(asyncResponse);
ScheduledExecutorService executorService = null;
@@ -170,20 +200,9 @@ public final void resume(AsyncResponse asyncResponse) {
// need to recheck closingStarted because hasNext can take time to respond
if (!closingStarted
&& Duration.between(streamStartTime, clock.instant()).compareTo(maxDuration) > 0) {
- if (executorService == null) {
- executorService = Executors.newSingleThreadScheduledExecutor();
- executorService.schedule(
- () -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS);
- }
- next();
- responseQueue.push(
- CompletableFuture.completedFuture(
- ResultOrError.error(
- EXCEPTION_MAPPER.toErrorResponse(
- new StatusCodeException(
- Status.REQUEST_TIMEOUT,
- "Streaming connection open for longer than allowed",
- "Connection will be closed.")))));
+ handleOverMaxStreamDuration(executorService, responseQueue);
+ } else if (!closingStarted && responseQueue.getTailLength().get() >= throttleDepth) {
+ handleResponseQueueDepthTooLarge(executorService, responseQueue);
} else if (!closingStarted) {
responseQueue.push(next().handle(this::handleNext));
} else {
@@ -191,13 +210,28 @@ public final void resume(AsyncResponse asyncResponse) {
}
}
} catch (Exception e) {
- log.debug("Exception thrown when processing stream ", e);
+ log.error("Exception thrown when processing stream ", e);
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e))));
} finally {
- close();
- responseQueue.close();
+ System.out.println(
+ "why isfinally triggered closing started"
+ + closingStarted); // can't call hasnext because of the mocking
+ // if there are still outstanding response to send back, for example hasNext has returned
+ // false, but the executorService hasn't triggered yet, give everything a chance to
+ // complete within the grace period if there are still outstanding responses.
+ System.out.println("FINALLY before tail length wait");
+ if (responseQueue.getTailLength().get() > 0) {
+ System.out.println("in tail length wait");
+ Thread.sleep(gracePeriod.toMillis()); // TODO make this smarter, grace period could be HUGE
+ }
+ System.out.println("FINALL calling a close " + this.getClass());
+ //if (!closingFinished) {
+ close();
+ System.out.println("FINALLY calling responseQueue close" + this.getClass());
+ responseQueue.close();
+ //}
if (executorService != null) {
executorService.shutdown();
try {
@@ -211,12 +245,47 @@ public final void resume(AsyncResponse asyncResponse) {
}
}
- private void closeAll(AsyncResponseQueue responseQueue) {
- closingStarted = true;
- close();
- responseQueue.close();
+ private void handleResponseQueueDepthTooLarge(
+ ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
+ if (responseQueue.getTailLength().get() >= disconnectDepth) {
+ triggerDelayedClose(executorService, responseQueue);
+ }
+ next();
+ responseQueue.push(
+ CompletableFuture.completedFuture(
+ ResultOrError.error(
+ EXCEPTION_MAPPER.toErrorResponse(
+ new StatusCodeException(
+ Status.TOO_MANY_REQUESTS,
+ "Backlog of messages waiting to be sent to Kafka is too large",
+ "Not sending to Kafka.")))));
+ }
+
+ private void handleOverMaxStreamDuration(
+ ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
+ triggerDelayedClose(executorService, responseQueue);
+ next();
+ responseQueue.push(
+ CompletableFuture.completedFuture(
+ ResultOrError.error(
+ EXCEPTION_MAPPER.toErrorResponse(
+ new StatusCodeException(
+ Status.REQUEST_TIMEOUT,
+ "Streaming connection open for longer than allowed",
+ "Connection will be closed.")))));
+ }
+
+ private void triggerDelayedClose(
+ ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
+ if (executorService == null) {
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ executorService.schedule(
+ () -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS);
+ }
}
+ abstract void closeAll(AsyncResponseQueue responseQueue);
+
private ResultOrError handleNext(T result, @Nullable Throwable error) {
if (error == null) {
return ResultOrError.result(result);
@@ -241,12 +310,21 @@ private InputStreamingResponse(
ChunkedOutputFactory chunkedOutputFactory,
Duration maxDuration,
Duration gracePeriod,
+ Integer throttleDepth,
+ Integer disconnectDepth,
Clock clock) {
- super(chunkedOutputFactory, maxDuration, gracePeriod, clock);
+ super(chunkedOutputFactory, maxDuration, gracePeriod, throttleDepth, disconnectDepth, clock);
this.inputStream = requireNonNull(inputStream);
}
+ public void closeAll(AsyncResponseQueue responseQueue) {
+ System.out.println("!!! do nothing");
+ }
+
public void close() {
+ System.out.println(
+ "CLOSE could be either from THREAD or FINALLY -> calls through to inputStreaming close"
+ + this.getClass());
try {
inputStream.close();
} catch (IOException e) {
@@ -288,12 +366,37 @@ private ComposingStreamingResponse(
Function super I, ? extends CompletableFuture> transform,
ChunkedOutputFactory chunkedOutputFactory,
Duration maxDuration,
- Duration gracePeriod) {
- super(chunkedOutputFactory, maxDuration, gracePeriod, streamingResponseInput.clock);
+ Duration gracePeriod,
+ Integer throttleDepth,
+ Integer disconnectDepth) {
+ super(
+ chunkedOutputFactory,
+ maxDuration,
+ gracePeriod,
+ throttleDepth,
+ disconnectDepth,
+ streamingResponseInput.clock);
this.streamingResponseInput = requireNonNull(streamingResponseInput);
this.transform = requireNonNull(transform);
}
+ @Override
+ protected void closeAll(AsyncResponseQueue responseQueue) {
+ System.out.println("THREAD EXECUTOR TRIGGERS" + this.getClass());
+ streamingResponseInput.closingStarted = true;
+ closingStarted = true;
+ System.out.println(
+ "THREAD EXECUTOR closeAll from thread executor call through to a sub close "
+ + this.getClass());
+ close();
+ System.out.println(
+ "THREAD EXECUTOR closeAll from thread executor calling responsequeue.close");
+ responseQueue.close();
+ System.out.println("THREAD EXECUTOR closeAll from thread executor after responsequeue.close");
+ streamingResponseInput.closingFinished = true;
+ closingFinished = true;
+ }
+
@Override
public boolean hasNext() {
try {
@@ -317,6 +420,9 @@ public CompletableFuture next() {
}
public void close() {
+ System.out.println(
+ "CLOSE could be from either THREAD or FINALLY -> calls through to inputStreaming close"
+ + this.getClass());
streamingResponseInput.close();
}
}
@@ -347,26 +453,30 @@ private void asyncResume(AsyncResponse asyncResponse) {
asyncResponse.resume(Response.ok(sink).build());
}
- private volatile boolean sinkClosed = false;
+ volatile boolean sinkClosed = false;
+
+ private volatile AtomicInteger queueDepth = new AtomicInteger(0);
- private boolean isClosed() {
- return sinkClosed;
+ private AtomicInteger getTailLength() {
+ return queueDepth;
}
private void push(CompletableFuture result) {
- log.debug("Pushing to response queue");
+ queueDepth.incrementAndGet();
tail =
CompletableFuture.allOf(tail, result)
.thenApply(
unused -> {
try {
if (sinkClosed || sink.isClosed()) {
+ System.out.println("closing sink " + this.getClass());
sinkClosed = true;
return null;
}
ResultOrError res = result.join();
log.debug("Writing to sink");
sink.write(res);
+ queueDepth.decrementAndGet();
} catch (IOException e) {
log.error("Error when writing streaming result to response channel.", e);
}
@@ -375,10 +485,14 @@ private void push(CompletableFuture result) {
}
private void close() {
+ System.out.println("CLOSE response queue from FINALLY or THREAD " + this.getClass());
tail.whenComplete(
(unused, throwable) -> {
try {
sinkClosed = true;
+ System.out.println(
+ "*** actually closing tail from FINALLY or TRHEAD (Is an Async call)"
+ + this.getClass());
sink.close();
} catch (IOException e) {
log.error("Error when closing response channel.", e);
diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java
index 7b5af17d73..a92e18e6e2 100644
--- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java
+++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java
@@ -19,6 +19,8 @@
import io.confluent.kafkarest.config.ConfigModule.StreamingMaxConnectionDurationConfig;
import io.confluent.kafkarest.config.ConfigModule.StreamingMaxConnectionGracePeriod;
+import io.confluent.kafkarest.config.ConfigModule.getStreamingConnectionMaxQueueDepthBeforeDisconnect;
+import io.confluent.kafkarest.config.ConfigModule.getStreamingConnectionMaxQueueDepthBeforeThrottling;
import java.time.Duration;
import javax.inject.Inject;
@@ -27,18 +29,30 @@ public final class StreamingResponseFactory {
private final ChunkedOutputFactory chunkedOutputFactory;
private final Duration maxDuration;
private final Duration gracePeriod;
+ private final Integer throttleDepth;
+ private final Integer disconnectDepth;
@Inject
public StreamingResponseFactory(
ChunkedOutputFactory chunkedOutputFactory,
@StreamingMaxConnectionDurationConfig Duration maxDuration,
- @StreamingMaxConnectionGracePeriod Duration gracePeriod) {
+ @StreamingMaxConnectionGracePeriod Duration gracePeriod,
+ @getStreamingConnectionMaxQueueDepthBeforeThrottling Integer throttleDepth,
+ @getStreamingConnectionMaxQueueDepthBeforeDisconnect Integer disconnectDepth) {
this.chunkedOutputFactory = requireNonNull(chunkedOutputFactory);
this.maxDuration = maxDuration;
this.gracePeriod = gracePeriod;
+ this.throttleDepth = throttleDepth;
+ this.disconnectDepth = disconnectDepth;
}
public StreamingResponse from(JsonStream inputStream) {
- return StreamingResponse.from(inputStream, chunkedOutputFactory, maxDuration, gracePeriod);
+ return StreamingResponse.from(
+ inputStream,
+ chunkedOutputFactory,
+ maxDuration,
+ gracePeriod,
+ throttleDepth,
+ disconnectDepth);
}
}
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java
index 992d9b184f..802db21f23 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java
@@ -77,6 +77,7 @@
public class ProduceActionTest {
private static final Duration FIVE_SECONDS_MS = Duration.ofMillis(5000);
+ private static final int DEPTH = 100;
@AfterAll
public static void cleanUp() {
@@ -839,7 +840,8 @@ private static ProduceAction getProduceAction(
replay(produceControllerProvider, produceController);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS);
+ new StreamingResponseFactory(
+ chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS, DEPTH, DEPTH);
// get the current thread so that the call counts can be seen by easy mock
ExecutorService executorService = MoreExecutors.newDirectExecutorService();
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
index 37ceef3814..e0d414104d 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
@@ -33,17 +33,21 @@
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
import org.easymock.EasyMock;
import org.eclipse.jetty.http.HttpStatus;
import org.glassfish.jersey.server.ChunkedOutput;
import org.junit.jupiter.api.Test;
+// import org.awaitility.Awaitility;
+
public class StreamingResponseTest {
private static final Duration DURATION = Duration.ofMillis(5000);
+ private static final int DEPTH = 100;
@Test
- public void testGracePeriodExceededExceptionThrown() throws IOException {
+ public void testGracePeriodExceededExceptionThrown() throws IOException, InterruptedException {
String key = "foo";
String value = "bar";
ProduceRequest request =
@@ -92,7 +96,7 @@ public void testGracePeriodExceededExceptionThrown() throws IOException {
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION);
+ new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -109,7 +113,7 @@ public void testGracePeriodExceededExceptionThrown() throws IOException {
}
@Test
- public void testWriteToChunkedOutput() throws IOException {
+ public void testWriteToChunkedOutput() throws IOException, InterruptedException {
String key = "foo";
String value = "bar";
ProduceRequest request =
@@ -156,7 +160,7 @@ public void testWriteToChunkedOutput() throws IOException {
replay(mockedChunkedOutput, mockedChunkedOutputFactory);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION);
+ new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requestsMappingIterator));
@@ -173,7 +177,7 @@ public void testWriteToChunkedOutput() throws IOException {
}
@Test
- public void testHasNextMappingException() throws IOException {
+ public void testHasNextMappingException() throws IOException, InterruptedException {
MappingIterator requests = mock(MappingIterator.class);
expect(requests.hasNext())
@@ -201,7 +205,7 @@ public void testHasNextMappingException() throws IOException {
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION);
+ new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -215,7 +219,7 @@ public void testHasNextMappingException() throws IOException {
}
@Test
- public void testHasNextRuntimeException() throws IOException {
+ public void testHasNextRuntimeException() throws IOException, InterruptedException {
MappingIterator requests = mock(MappingIterator.class);
expect(requests.hasNext())
.andThrow(
@@ -241,7 +245,7 @@ public void testHasNextRuntimeException() throws IOException {
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION);
+ new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -256,6 +260,7 @@ public void testHasNextRuntimeException() throws IOException {
@Test
public void testWriteToChunkedOutputAfterTimeout() throws IOException, InterruptedException {
+ Thread.sleep(1000);
String key = "foo";
String value = "bar";
ProduceRequest request =
@@ -300,25 +305,185 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class);
expect(clock.instant())
- .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response (check)
- expect(clock.instant())
- .andReturn(Instant.ofEpochMilli(0)); // stream start - composing response (check)
+ .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response
+
+ //first message
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response
expect(requestsMappingIterator.hasNext()).andReturn(true); // first message - OK
expect(requestsMappingIterator.nextValue()).andReturn(request);
expect(clock.instant())
.andReturn(Instant.ofEpochMilli(1)); // first comparison duration. within timeout
-
expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput);
- mockedChunkedOutput.write(sucessResult);
expect(mockedChunkedOutput.isClosed()).andReturn(false);
+ mockedChunkedOutput.write(sucessResult);
- expect(requestsMappingIterator.hasNext()).andReturn(true); // is another message
- expect(requestsMappingIterator.nextValue()).andReturn(request); // second message - bad gateway
+ //second message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
+ expect(requestsMappingIterator.nextValue()).andReturn(request);
expect(clock.instant())
.andReturn(Instant.ofEpochMilli(timeout + 5)); // second message beyond timeout
+ expect(mockedChunkedOutput.isClosed()).andReturn(false);
+ mockedChunkedOutput.write(error);
+
+ //no third message - the thread executor should kick in after 50 ms to close the mapping iterator
+ //and the chunkedOutput at this point:
+
+ requestsMappingIterator.close(); // call from thread executor
+ mockedChunkedOutput.close(); //call from thread executor
+
+
+ final CompletableFuture finished = new CompletableFuture<>();
+ //we don't let the hasNext fire until the "finished" future has completed
+ //This future is defined below, after the streaming response, because we need information from the streaming response to create it
+ //It completes when the two close calls from the "closeAll" called by the Thread executor have both fired
+
+ expect(requestsMappingIterator.hasNext())
+ .andAnswer(
+ () -> { // return hasnext true AFTER the thread executor has timed out and closed the
+ // connections. Then expect no other calls except the connection closing.
+ System.out.println("** has next delayed response");
+ while (!finished.isDone()) {
+ Thread.sleep(10);
+ }
+ return false;
+ });
+
+ requestsMappingIterator.close(); // call from finally
+ mockedChunkedOutput.close(); //call from finally
+
+ replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock);
+
+ final StreamingResponse streamingResponse =
+ StreamingResponse.fromWithClock(
+ new JsonStream<>(() -> requestsMappingIterator),
+ mockedChunkedOutputFactory,
+ Duration.ofMillis(timeout),
+ Duration.ofMillis(50),
+ DEPTH,
+ DEPTH,
+ clock);
+
+ //This finished future fires when the streamingResponse.closingFinished volatile boolean fires. This happens when the thread executor has finished doing its thing and doing the two closes
+ Executors.newCachedThreadPool()
+ .submit(
+ () -> {
+ try {
+ System.out.println(
+ "&&& TEST thread streaming response object" + streamingResponse.getClass());
+ while (!streamingResponse.closingFinished) {
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ System.out.println("&& test thread completing future");
+ finished.complete(true);
+ });
+
+ CompletableFuture produceResponseFuture = new CompletableFuture<>();
+ produceResponseFuture.complete(produceResponse);
+
+ FakeAsyncResponse response = new FakeAsyncResponse();
+ streamingResponse.compose(result -> produceResponseFuture).resume(response);
+
+ try {
+ EasyMock.verify(mockedChunkedOutput);
+ EasyMock.verify(mockedChunkedOutputFactory);
+ EasyMock.verify(requestsMappingIterator);
+ EasyMock.verify(clock);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testSlowWritesToKafka429ThenDisconnect() throws IOException, InterruptedException {
+
+ String key = "foo";
+ String value = "bar";
+ ProduceRequest request =
+ ProduceRequest.builder()
+ .setKey(
+ ProduceRequestData.builder()
+ .setFormat(EmbeddedFormat.AVRO)
+ .setRawSchema("{\"type\": \"string\"}")
+ .setData(TextNode.valueOf(key))
+ .build())
+ .setValue(
+ ProduceRequestData.builder()
+ .setFormat(EmbeddedFormat.AVRO)
+ .setRawSchema("{\"type\": \"string\"}")
+ .setData(TextNode.valueOf(value))
+ .build())
+ .setOriginalSize(0L)
+ .build();
+
+ MappingIterator requestsMappingIterator = mock(MappingIterator.class);
+
+ long timeout = 10;
+ Clock clock = mock(Clock.class);
+
+ CompletableFuture produceResponseFuture = new CompletableFuture<>();
+
+ ProduceResponse produceResponse =
+ ProduceResponse.builder()
+ .setClusterId("clusterId")
+ .setTopicName("topicName")
+ .setPartitionId(1)
+ .setOffset(1L)
+ .setErrorCode(HttpStatus.OK_200)
+ .build();
+ ResultOrError sucessResult = ResultOrError.result(produceResponse);
+
+ ResultOrError error =
+ ResultOrError.error(
+ ErrorResponse.create(
+ 429,
+ "Backlog of messages waiting to be sent to Kafka is too large: Not "
+ + "sending to Kafka."));
+
+ ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class);
+ ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class);
+
+ expect(clock.instant())
+ .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response
+
+ expect(requestsMappingIterator.hasNext()).andReturn(true); // first message
+ expect(clock.instant())
+ .andReturn(Instant.ofEpochMilli(1)); // first comparison duration. before timeout
+ expect(requestsMappingIterator.nextValue()).andReturn(request);
+ expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput);
+ expect(mockedChunkedOutputFactory.getChunkedOutput())
+ .andReturn(mockedChunkedOutput); // todo see if we can remove this extra cration of an async
+ // response quueue due to the inheritance
+ expect(mockedChunkedOutput.isClosed()).andReturn(false);
+ mockedChunkedOutput.write(sucessResult);
+ // second message
+ expect(requestsMappingIterator.hasNext()).andReturn(true); // second message
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(2));
+ expect(requestsMappingIterator.nextValue())
+ .andAnswer(
+ () -> {
+ // produceResponseFuture.complete(produceResponse);
+ return request;
+ });
+ expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(error);
+
+ // third message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(2));
+ expect(requestsMappingIterator.nextValue())
+ .andAnswer(
+ () -> {
+ produceResponseFuture.complete(produceResponse);
+ return request;
+ });
+
expect(mockedChunkedOutput.isClosed()).andReturn(false);
+ mockedChunkedOutput.write(error);
expect(requestsMappingIterator.hasNext())
.andAnswer(
@@ -335,28 +500,18 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock);
- StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(
- mockedChunkedOutputFactory, Duration.ofMillis(timeout), Duration.ofMillis(50));
-
StreamingResponse streamingResponse =
StreamingResponse.fromWithClock(
new JsonStream<>(() -> requestsMappingIterator),
mockedChunkedOutputFactory,
Duration.ofMillis(timeout),
Duration.ofMillis(50),
+ 1,
+ 2,
clock);
- CompletableFuture produceResponseFuture = new CompletableFuture<>();
- produceResponseFuture.complete(produceResponse);
-
FakeAsyncResponse response = new FakeAsyncResponse();
- streamingResponse
- .compose(
- result -> {
- return produceResponseFuture;
- })
- .resume(response);
+ streamingResponse.compose(result -> produceResponseFuture).resume(response);
EasyMock.verify(mockedChunkedOutput);
EasyMock.verify(mockedChunkedOutputFactory);
diff --git a/pom.xml b/pom.xml
index 51a6b81355..897eb5a7ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,6 +122,12 @@
$(hamcrest.version)
test
+
+ org.awaitility
+ awaitility
+ 4.0.2
+ test
+