compose(
* This method will block until all requests are read in. The responses are computed and
* written to {@code asyncResponse} asynchronously.
*/
- public final void resume(AsyncResponse asyncResponse) throws InterruptedException {
- log.error("Resuming StreamingResponse - about to call new async response queue");
+ public final void resume(AsyncResponse asyncResponse) {
+ log.debug("Resuming StreamingResponse");
AsyncResponseQueue responseQueue = new AsyncResponseQueue(chunkedOutputFactory);
responseQueue.asyncResume(asyncResponse);
ScheduledExecutorService executorService = null;
@@ -210,32 +211,29 @@ public final void resume(AsyncResponse asyncResponse) throws InterruptedExceptio
}
}
} catch (Exception e) {
- log.error("Exception thrown when processing stream ", e);
+ log.debug("Exception thrown when processing stream ", e);
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e))));
} finally {
- System.out.println(
- "why isfinally triggered closing started"
- + closingStarted); // can't call hasnext because of the mocking
// if there are still outstanding response to send back, for example hasNext has returned
// false, but the executorService hasn't triggered yet, give everything a chance to
- // complete within the grace period if there are still outstanding responses.
- System.out.println("FINALLY before tail length wait");
- if (responseQueue.getTailLength().get() > 0) {
- System.out.println("in tail length wait");
- Thread.sleep(gracePeriod.toMillis()); // TODO make this smarter, grace period could be HUGE
+ // complete within the grace period plus a little bit.
+ int retries = 0;
+ while (!closingFinished && retries < MAX_CLOSE_RETRIES) {
+ retries++;
+ try {
+ Thread.sleep(gracePeriod.toMillis() / retries);
+ } catch (InterruptedException e) {
+ // do nothing
+ }
}
- System.out.println("FINALL calling a close " + this.getClass());
- //if (!closingFinished) {
- close();
- System.out.println("FINALLY calling responseQueue close" + this.getClass());
- responseQueue.close();
- //}
+ close();
+ responseQueue.close();
if (executorService != null) {
executorService.shutdown();
try {
- if (!executorService.awaitTermination(ONE_SECOND_MS, TimeUnit.MILLISECONDS)) {
+ if (!executorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
@@ -245,34 +243,34 @@ public final void resume(AsyncResponse asyncResponse) throws InterruptedExceptio
}
}
- private void handleResponseQueueDepthTooLarge(
+ private void handleOverMaxStreamDuration(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
- if (responseQueue.getTailLength().get() >= disconnectDepth) {
- triggerDelayedClose(executorService, responseQueue);
- }
+ triggerDelayedClose(executorService, responseQueue);
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
- Status.TOO_MANY_REQUESTS,
- "Backlog of messages waiting to be sent to Kafka is too large",
- "Not sending to Kafka.")))));
+ Status.REQUEST_TIMEOUT,
+ "Streaming connection open for longer than allowed",
+ "Connection will be closed.")))));
}
- private void handleOverMaxStreamDuration(
+ private void handleResponseQueueDepthTooLarge(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
- triggerDelayedClose(executorService, responseQueue);
+ if (responseQueue.getTailLength().get() >= disconnectDepth) {
+ triggerDelayedClose(executorService, responseQueue);
+ }
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
- Status.REQUEST_TIMEOUT,
- "Streaming connection open for longer than allowed",
- "Connection will be closed.")))));
+ Status.TOO_MANY_REQUESTS,
+ "Backlog of messages waiting to be sent to Kafka is too large",
+ "Not sending to Kafka.")))));
}
private void triggerDelayedClose(
@@ -318,13 +316,10 @@ private InputStreamingResponse(
}
public void closeAll(AsyncResponseQueue responseQueue) {
- System.out.println("!!! do nothing");
+ // Never called
}
public void close() {
- System.out.println(
- "CLOSE could be either from THREAD or FINALLY -> calls through to inputStreaming close"
- + this.getClass());
try {
inputStream.close();
} catch (IOException e) {
@@ -382,17 +377,10 @@ private ComposingStreamingResponse(
@Override
protected void closeAll(AsyncResponseQueue responseQueue) {
- System.out.println("THREAD EXECUTOR TRIGGERS" + this.getClass());
streamingResponseInput.closingStarted = true;
closingStarted = true;
- System.out.println(
- "THREAD EXECUTOR closeAll from thread executor call through to a sub close "
- + this.getClass());
close();
- System.out.println(
- "THREAD EXECUTOR closeAll from thread executor calling responsequeue.close");
responseQueue.close();
- System.out.println("THREAD EXECUTOR closeAll from thread executor after responsequeue.close");
streamingResponseInput.closingFinished = true;
closingFinished = true;
}
@@ -420,9 +408,6 @@ public CompletableFuture next() {
}
public void close() {
- System.out.println(
- "CLOSE could be from either THREAD or FINALLY -> calls through to inputStreaming close"
- + this.getClass());
streamingResponseInput.close();
}
}
@@ -469,7 +454,6 @@ private void push(CompletableFuture result) {
unused -> {
try {
if (sinkClosed || sink.isClosed()) {
- System.out.println("closing sink " + this.getClass());
sinkClosed = true;
return null;
}
@@ -485,14 +469,10 @@ private void push(CompletableFuture result) {
}
private void close() {
- System.out.println("CLOSE response queue from FINALLY or THREAD " + this.getClass());
tail.whenComplete(
(unused, throwable) -> {
try {
sinkClosed = true;
- System.out.println(
- "*** actually closing tail from FINALLY or TRHEAD (Is an Async call)"
- + this.getClass());
sink.close();
} catch (IOException e) {
log.error("Error when closing response channel.", e);
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java
index 802db21f23..f9252a320b 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java
@@ -77,6 +77,7 @@
public class ProduceActionTest {
private static final Duration FIVE_SECONDS_MS = Duration.ofMillis(5000);
+ private static final Duration FIVE_MS = Duration.ofMillis(5);
private static final int DEPTH = 100;
@AfterAll
@@ -840,8 +841,7 @@ private static ProduceAction getProduceAction(
replay(produceControllerProvider, produceController);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(
- chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS, DEPTH, DEPTH);
+ new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_MS, DEPTH, DEPTH);
// get the current thread so that the call counts can be seen by easy mock
ExecutorService executorService = MoreExecutors.newDirectExecutorService();
diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
index e0d414104d..c986cd6816 100644
--- a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
+++ b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java
@@ -33,21 +33,19 @@
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
import org.easymock.EasyMock;
import org.eclipse.jetty.http.HttpStatus;
import org.glassfish.jersey.server.ChunkedOutput;
import org.junit.jupiter.api.Test;
-// import org.awaitility.Awaitility;
-
public class StreamingResponseTest {
private static final Duration DURATION = Duration.ofMillis(5000);
+ private static final Duration GRACE_DURATION = Duration.ofMillis(1);
private static final int DEPTH = 100;
@Test
- public void testGracePeriodExceededExceptionThrown() throws IOException, InterruptedException {
+ public void testGracePeriodExceededExceptionThrown() throws IOException {
String key = "foo";
String value = "bar";
ProduceRequest request =
@@ -96,7 +94,8 @@ public void testGracePeriodExceededExceptionThrown() throws IOException, Interru
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
+ new StreamingResponseFactory(
+ mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -113,7 +112,7 @@ public void testGracePeriodExceededExceptionThrown() throws IOException, Interru
}
@Test
- public void testWriteToChunkedOutput() throws IOException, InterruptedException {
+ public void testWriteToChunkedOutput() throws IOException {
String key = "foo";
String value = "bar";
ProduceRequest request =
@@ -160,7 +159,8 @@ public void testWriteToChunkedOutput() throws IOException, InterruptedException
replay(mockedChunkedOutput, mockedChunkedOutputFactory);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
+ new StreamingResponseFactory(
+ mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requestsMappingIterator));
@@ -177,7 +177,7 @@ public void testWriteToChunkedOutput() throws IOException, InterruptedException
}
@Test
- public void testHasNextMappingException() throws IOException, InterruptedException {
+ public void testHasNextMappingException() throws IOException {
MappingIterator requests = mock(MappingIterator.class);
expect(requests.hasNext())
@@ -205,7 +205,8 @@ public void testHasNextMappingException() throws IOException, InterruptedExcepti
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
+ new StreamingResponseFactory(
+ mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -219,7 +220,7 @@ public void testHasNextMappingException() throws IOException, InterruptedExcepti
}
@Test
- public void testHasNextRuntimeException() throws IOException, InterruptedException {
+ public void testHasNextRuntimeException() throws IOException {
MappingIterator requests = mock(MappingIterator.class);
expect(requests.hasNext())
.andThrow(
@@ -245,7 +246,8 @@ public void testHasNextRuntimeException() throws IOException, InterruptedExcepti
replay(mockedChunkedOutput);
StreamingResponseFactory streamingResponseFactory =
- new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION, DEPTH, DEPTH);
+ new StreamingResponseFactory(
+ mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH);
StreamingResponse streamingResponse =
streamingResponseFactory.from(new JsonStream<>(() -> requests));
@@ -259,8 +261,7 @@ public void testHasNextRuntimeException() throws IOException, InterruptedExcepti
}
@Test
- public void testWriteToChunkedOutputAfterTimeout() throws IOException, InterruptedException {
- Thread.sleep(1000);
+ public void testWriteToChunkedOutputAfterTimeout() throws IOException {
String key = "foo";
String value = "bar";
ProduceRequest request =
@@ -299,7 +300,8 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
ResultOrError.error(
ErrorResponse.create(
408,
- "Streaming connection open for longer than allowed: Connection will be closed."));
+ "Streaming connection open for longer than allowed: "
+ + "Connection will be closed."));
ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class);
ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class);
@@ -307,9 +309,9 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
expect(clock.instant())
.andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response
- //first message
- expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response
- expect(requestsMappingIterator.hasNext()).andReturn(true); // first message - OK
+ // first message
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
expect(requestsMappingIterator.nextValue()).andReturn(request);
expect(clock.instant())
.andReturn(Instant.ofEpochMilli(1)); // first comparison duration. within timeout
@@ -317,7 +319,7 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(sucessResult);
- //second message
+ // second message
expect(requestsMappingIterator.hasNext()).andReturn(true);
expect(requestsMappingIterator.nextValue()).andReturn(request);
expect(clock.instant())
@@ -325,31 +327,14 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(error);
- //no third message - the thread executor should kick in after 50 ms to close the mapping iterator
- //and the chunkedOutput at this point:
+ // no third message
+ expect(requestsMappingIterator.hasNext()).andReturn(false);
requestsMappingIterator.close(); // call from thread executor
- mockedChunkedOutput.close(); //call from thread executor
-
-
- final CompletableFuture finished = new CompletableFuture<>();
- //we don't let the hasNext fire until the "finished" future has completed
- //This future is defined below, after the streaming response, because we need information from the streaming response to create it
- //It completes when the two close calls from the "closeAll" called by the Thread executor have both fired
-
- expect(requestsMappingIterator.hasNext())
- .andAnswer(
- () -> { // return hasnext true AFTER the thread executor has timed out and closed the
- // connections. Then expect no other calls except the connection closing.
- System.out.println("** has next delayed response");
- while (!finished.isDone()) {
- Thread.sleep(10);
- }
- return false;
- });
+ mockedChunkedOutput.close(); // call from thread executor
requestsMappingIterator.close(); // call from finally
- mockedChunkedOutput.close(); //call from finally
+ mockedChunkedOutput.close(); // call from finally
replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock);
@@ -363,23 +348,6 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
DEPTH,
clock);
- //This finished future fires when the streamingResponse.closingFinished volatile boolean fires. This happens when the thread executor has finished doing its thing and doing the two closes
- Executors.newCachedThreadPool()
- .submit(
- () -> {
- try {
- System.out.println(
- "&&& TEST thread streaming response object" + streamingResponse.getClass());
- while (!streamingResponse.closingFinished) {
- Thread.sleep(100);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("&& test thread completing future");
- finished.complete(true);
- });
-
CompletableFuture produceResponseFuture = new CompletableFuture<>();
produceResponseFuture.complete(produceResponse);
@@ -397,7 +365,7 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt
}
@Test
- public void testSlowWritesToKafka429ThenDisconnect() throws IOException, InterruptedException {
+ public void testSlowWritesToKafka429() throws IOException {
String key = "foo";
String value = "bar";
@@ -445,57 +413,143 @@ public void testSlowWritesToKafka429ThenDisconnect() throws IOException, Interru
ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class);
ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class);
- expect(clock.instant())
- .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response
- expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); // stream start - composing response
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
- expect(requestsMappingIterator.hasNext()).andReturn(true); // first message
- expect(clock.instant())
- .andReturn(Instant.ofEpochMilli(1)); // first comparison duration. before timeout
+ // first message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
expect(requestsMappingIterator.nextValue()).andReturn(request);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(1));
expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput);
- expect(mockedChunkedOutputFactory.getChunkedOutput())
- .andReturn(mockedChunkedOutput); // todo see if we can remove this extra cration of an async
- // response quueue due to the inheritance
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(sucessResult);
// second message
- expect(requestsMappingIterator.hasNext()).andReturn(true); // second message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
expect(clock.instant()).andReturn(Instant.ofEpochMilli(2));
expect(requestsMappingIterator.nextValue())
.andAnswer(
() -> {
- // produceResponseFuture.complete(produceResponse);
+ produceResponseFuture.complete(produceResponse);
return request;
});
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(error);
- // third message
+ expect(requestsMappingIterator.hasNext()).andReturn(false);
+
+ requestsMappingIterator.close(); // closes from the finally
+ mockedChunkedOutput.close();
+
+ replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock);
+
+ StreamingResponse streamingResponse =
+ StreamingResponse.fromWithClock(
+ new JsonStream<>(() -> requestsMappingIterator),
+ mockedChunkedOutputFactory,
+ Duration.ofMillis(timeout),
+ Duration.ofMillis(50),
+ 1,
+ 2,
+ clock);
+
+ FakeAsyncResponse response = new FakeAsyncResponse();
+ streamingResponse.compose(result -> produceResponseFuture).resume(response);
+
+ EasyMock.verify(mockedChunkedOutput);
+ EasyMock.verify(mockedChunkedOutputFactory);
+ EasyMock.verify(requestsMappingIterator);
+ EasyMock.verify(clock);
+ }
+
+ @Test
+ public void testSlowWritesToKafkaDisconnects() throws IOException {
+
+ String key = "foo";
+ String value = "bar";
+ ProduceRequest request =
+ ProduceRequest.builder()
+ .setKey(
+ ProduceRequestData.builder()
+ .setFormat(EmbeddedFormat.AVRO)
+ .setRawSchema("{\"type\": \"string\"}")
+ .setData(TextNode.valueOf(key))
+ .build())
+ .setValue(
+ ProduceRequestData.builder()
+ .setFormat(EmbeddedFormat.AVRO)
+ .setRawSchema("{\"type\": \"string\"}")
+ .setData(TextNode.valueOf(value))
+ .build())
+ .setOriginalSize(0L)
+ .build();
+
+ MappingIterator requestsMappingIterator = mock(MappingIterator.class);
+
+ long timeout = 10;
+ Clock clock = mock(Clock.class);
+
+ CompletableFuture produceResponseFuture = new CompletableFuture<>();
+
+ ProduceResponse produceResponse =
+ ProduceResponse.builder()
+ .setClusterId("clusterId")
+ .setTopicName("topicName")
+ .setPartitionId(1)
+ .setOffset(1L)
+ .setErrorCode(HttpStatus.OK_200)
+ .build();
+ ResultOrError sucessResult = ResultOrError.result(produceResponse);
+
+ ResultOrError error =
+ ResultOrError.error(
+ ErrorResponse.create(
+ 429,
+ "Backlog of messages waiting to be sent to Kafka is too large: Not "
+ + "sending to Kafka."));
+
+ ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class);
+ ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class);
+
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
+
+ // first message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(0));
+ expect(requestsMappingIterator.nextValue()).andReturn(request);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(1));
+ expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput);
+ expect(mockedChunkedOutput.isClosed()).andReturn(false);
+ mockedChunkedOutput.write(sucessResult);
+
+ // second message
expect(requestsMappingIterator.hasNext()).andReturn(true);
expect(clock.instant()).andReturn(Instant.ofEpochMilli(2));
expect(requestsMappingIterator.nextValue())
.andAnswer(
() -> {
- produceResponseFuture.complete(produceResponse);
return request;
});
-
expect(mockedChunkedOutput.isClosed()).andReturn(false);
mockedChunkedOutput.write(error);
- expect(requestsMappingIterator.hasNext())
+ // third message
+ expect(requestsMappingIterator.hasNext()).andReturn(true);
+ expect(clock.instant()).andReturn(Instant.ofEpochMilli(2));
+ expect(requestsMappingIterator.nextValue())
.andAnswer(
- () -> { // return hasnext true AFTER the thread executor has timed out and closed the
- // connections. Then expect no other calls except the connection closing.
- Thread.sleep(500);
- return true;
+ () -> {
+ produceResponseFuture.complete(produceResponse);
+ return request;
});
+ expect(mockedChunkedOutput.isClosed()).andReturn(false);
+ mockedChunkedOutput.write(error);
+
+ expect(requestsMappingIterator.hasNext()).andReturn(false);
- requestsMappingIterator.close(); // this ensures the closes have been called
+ requestsMappingIterator.close(); // closes from thread executor
mockedChunkedOutput.close();
- requestsMappingIterator.close(); // expect twice - one from the thread and one from the finally
+ requestsMappingIterator.close(); // closes from finally
mockedChunkedOutput.close();
replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock);
diff --git a/pom.xml b/pom.xml
index 897eb5a7ce..51a6b81355 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,12 +122,6 @@
$(hamcrest.version)
test
-
- org.awaitility
- awaitility
- 4.0.2
- test
-