diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java index 094d9fbdef..554cba2664 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/KafkaRestConfig.java @@ -466,8 +466,29 @@ public class KafkaRestConfig extends RestConfig { "streaming.connection.max.duration.grace.period.ms"; private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC = "How long after a streaming connection reaches its maximum duration outstanding " - + "requests will be processed for before the connection is closed."; + + "requests will be processed for before the connection is closed. " + + "Maximum value is 10 seconds."; private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500"; + public static final ConfigDef.Range STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR = + ConfigDef.Range.between(1, 10000); + + public static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH = + "streaming.response.queue.throttle.depth"; + private static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DOC = + "The maximum depth of the response queue that is used to hold requests as they are being " + + "processed by Kafka. If this queue grows too long it indicates that Kafka is " + + "processing messages more slowly than the user is sending them. After this " + + "queue depth is reached, then all requests will receive a 429 response until the" + + "queue depth returns under the limit."; + private static final Integer STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DEFAULT = 100; + + public static final String STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH = + "streaming.response.queue.disconnect.depth"; + private static final String STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DOC = + "The maximum depth of the response queue that is used to hold requests as they are being" + + "processed by Kafka. If the queue depth grows beyond this limit then the connection" + + "is closed."; + private static final Integer STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DEFAULT = 200; private static final ConfigDef config; private volatile Metrics metrics; @@ -846,8 +867,21 @@ protected static ConfigDef baseKafkaRestConfigDef() { STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS, Type.LONG, STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT, + STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR, Importance.LOW, - STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC); + STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC) + .define( + STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH, + Type.INT, + STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DEFAULT, + Importance.LOW, + STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DOC) + .define( + STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH, + Type.INT, + STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DEFAULT, + Importance.LOW, + STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DOC); } private static Properties getPropsFromFile(String propsFile) throws RestConfigException { @@ -1088,6 +1122,14 @@ public final Duration getStreamingConnectionMaxDurationGracePeriod() { return Duration.ofMillis(getLong(STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS)); } + public final Integer getStreamingConnectionMaxQueueDepthBeforeThrottling() { + return getInt(STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH); + } + + public final Integer getStreamingConnectionMaxQueueDepthBeforeDisconnect() { + return getInt(STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH); + } + public final int getRateLimitDefaultCost() { return getInt(RATE_LIMIT_DEFAULT_COST_CONFIG); } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/config/ConfigModule.java b/kafka-rest/src/main/java/io/confluent/kafkarest/config/ConfigModule.java index 0718caa89d..2c45a6089d 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/config/ConfigModule.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/config/ConfigModule.java @@ -169,6 +169,14 @@ protected void configure() { .qualifiedBy(new StreamingConnectionMaxDurationGracePeriodImpl()) .to(Duration.class); + bind(config.getStreamingConnectionMaxQueueDepthBeforeThrottling()) + .qualifiedBy(new getStreamingConnectionMaxQueueDepthBeforeThrottlingImpl()) + .to(Integer.class); + + bind(config.getStreamingConnectionMaxQueueDepthBeforeDisconnect()) + .qualifiedBy(new getStreamingConnectionMaxQueueDepthBeforeDisconnectImpl()) + .to(Integer.class); + bind(config.getSchemaRegistryConfigs()) .qualifiedBy(new SchemaRegistryConfigsImpl()) .to(new TypeLiteral>() {}); @@ -441,5 +449,23 @@ private static final class StreamingConnectionMaxDurationConfigImpl private static final class StreamingConnectionMaxDurationGracePeriodImpl extends AnnotationLiteral implements StreamingMaxConnectionGracePeriod {} + + @Qualifier + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER}) + public @interface getStreamingConnectionMaxQueueDepthBeforeThrottling {} + + private static final class getStreamingConnectionMaxQueueDepthBeforeThrottlingImpl + extends AnnotationLiteral + implements getStreamingConnectionMaxQueueDepthBeforeThrottling {} + + @Qualifier + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER}) + public @interface getStreamingConnectionMaxQueueDepthBeforeDisconnect {} + + private static final class getStreamingConnectionMaxQueueDepthBeforeDisconnectImpl + extends AnnotationLiteral + implements getStreamingConnectionMaxQueueDepthBeforeDisconnect {} } // CHECKSTYLE:ON:ClassDataAbstractionCoupling diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java index d0eb455b87..9412c03ade 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponse.java @@ -45,6 +45,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import javax.annotation.Nullable; import javax.ws.rs.WebApplicationException; @@ -72,6 +73,7 @@ public abstract class StreamingResponse { private static final Logger log = LoggerFactory.getLogger(StreamingResponse.class); private static final int ONE_SECOND_MS = 1000; + private static final int MAX_CLOSE_RETRIES = 2; private static final CompositeErrorMapper EXCEPTION_MAPPER = new CompositeErrorMapper.Builder() @@ -109,30 +111,45 @@ public abstract class StreamingResponse { private final ChunkedOutputFactory chunkedOutputFactory; private final Duration maxDuration; private final Duration gracePeriod; + private final Integer throttleDepth; + private final Integer disconnectDepth; private final Instant streamStartTime; private final Clock clock; volatile boolean closingStarted = false; + volatile boolean closingFinished = false; StreamingResponse( ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth, Clock clock) { this.clock = clock; this.streamStartTime = clock.instant(); this.chunkedOutputFactory = requireNonNull(chunkedOutputFactory); this.maxDuration = maxDuration; this.gracePeriod = gracePeriod; + this.throttleDepth = throttleDepth; + this.disconnectDepth = disconnectDepth; } public static StreamingResponse from( JsonStream inputStream, ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, - Duration gracePeriod) { + Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth) { return new InputStreamingResponse<>( - inputStream, chunkedOutputFactory, maxDuration, gracePeriod, Clock.systemUTC()); + inputStream, + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth, + Clock.systemUTC()); } @VisibleForTesting @@ -141,15 +158,29 @@ static StreamingResponse fromWithClock( ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth, Clock clock) { return new InputStreamingResponse<>( - inputStream, chunkedOutputFactory, maxDuration, gracePeriod, clock); + inputStream, + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth, + clock); } public final StreamingResponse compose( Function> transform) { return new ComposingStreamingResponse<>( - this, transform, chunkedOutputFactory, maxDuration, gracePeriod); + this, + transform, + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth); } /** @@ -170,20 +201,9 @@ public final void resume(AsyncResponse asyncResponse) { // need to recheck closingStarted because hasNext can take time to respond if (!closingStarted && Duration.between(streamStartTime, clock.instant()).compareTo(maxDuration) > 0) { - if (executorService == null) { - executorService = Executors.newSingleThreadScheduledExecutor(); - executorService.schedule( - () -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS); - } - next(); - responseQueue.push( - CompletableFuture.completedFuture( - ResultOrError.error( - EXCEPTION_MAPPER.toErrorResponse( - new StatusCodeException( - Status.REQUEST_TIMEOUT, - "Streaming connection open for longer than allowed", - "Connection will be closed."))))); + handleOverMaxStreamDuration(executorService, responseQueue); + } else if (!closingStarted && responseQueue.getTailLength().get() >= throttleDepth) { + handleResponseQueueDepthTooLarge(executorService, responseQueue); } else if (!closingStarted) { responseQueue.push(next().handle(this::handleNext)); } else { @@ -196,12 +216,24 @@ public final void resume(AsyncResponse asyncResponse) { CompletableFuture.completedFuture( ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e)))); } finally { + // if there are still outstanding response to send back, for example hasNext has returned + // false, but the executorService hasn't triggered yet, give everything a chance to + // complete within the grace period plus a little bit. + int retries = 0; + while (!closingFinished && retries < MAX_CLOSE_RETRIES) { + retries++; + try { + Thread.sleep(gracePeriod.toMillis() / retries); + } catch (InterruptedException e) { + // do nothing + } + } close(); responseQueue.close(); if (executorService != null) { executorService.shutdown(); try { - if (!executorService.awaitTermination(ONE_SECOND_MS, TimeUnit.MILLISECONDS)) { + if (!executorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { @@ -211,12 +243,47 @@ public final void resume(AsyncResponse asyncResponse) { } } - private void closeAll(AsyncResponseQueue responseQueue) { - closingStarted = true; - close(); - responseQueue.close(); + private void handleOverMaxStreamDuration( + ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { + triggerDelayedClose(executorService, responseQueue); + next(); + responseQueue.push( + CompletableFuture.completedFuture( + ResultOrError.error( + EXCEPTION_MAPPER.toErrorResponse( + new StatusCodeException( + Status.REQUEST_TIMEOUT, + "Streaming connection open for longer than allowed", + "Connection will be closed."))))); + } + + private void handleResponseQueueDepthTooLarge( + ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { + if (responseQueue.getTailLength().get() >= disconnectDepth) { + triggerDelayedClose(executorService, responseQueue); + } + next(); + responseQueue.push( + CompletableFuture.completedFuture( + ResultOrError.error( + EXCEPTION_MAPPER.toErrorResponse( + new StatusCodeException( + Status.TOO_MANY_REQUESTS, + "Backlog of messages waiting to be sent to Kafka is too large", + "Not sending to Kafka."))))); + } + + private void triggerDelayedClose( + ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { + if (executorService == null) { + executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.schedule( + () -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS); + } } + abstract void closeAll(AsyncResponseQueue responseQueue); + private ResultOrError handleNext(T result, @Nullable Throwable error) { if (error == null) { return ResultOrError.result(result); @@ -241,11 +308,17 @@ private InputStreamingResponse( ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth, Clock clock) { - super(chunkedOutputFactory, maxDuration, gracePeriod, clock); + super(chunkedOutputFactory, maxDuration, gracePeriod, throttleDepth, disconnectDepth, clock); this.inputStream = requireNonNull(inputStream); } + public void closeAll(AsyncResponseQueue responseQueue) { + // Never called + } + public void close() { try { inputStream.close(); @@ -288,12 +361,30 @@ private ComposingStreamingResponse( Function> transform, ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, - Duration gracePeriod) { - super(chunkedOutputFactory, maxDuration, gracePeriod, streamingResponseInput.clock); + Duration gracePeriod, + Integer throttleDepth, + Integer disconnectDepth) { + super( + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth, + streamingResponseInput.clock); this.streamingResponseInput = requireNonNull(streamingResponseInput); this.transform = requireNonNull(transform); } + @Override + protected void closeAll(AsyncResponseQueue responseQueue) { + streamingResponseInput.closingStarted = true; + closingStarted = true; + close(); + responseQueue.close(); + streamingResponseInput.closingFinished = true; + closingFinished = true; + } + @Override public boolean hasNext() { try { @@ -347,14 +438,16 @@ private void asyncResume(AsyncResponse asyncResponse) { asyncResponse.resume(Response.ok(sink).build()); } - private volatile boolean sinkClosed = false; + volatile boolean sinkClosed = false; + + private volatile AtomicInteger queueDepth = new AtomicInteger(0); - private boolean isClosed() { - return sinkClosed; + private AtomicInteger getTailLength() { + return queueDepth; } private void push(CompletableFuture result) { - log.debug("Pushing to response queue"); + queueDepth.incrementAndGet(); tail = CompletableFuture.allOf(tail, result) .thenApply( @@ -367,6 +460,7 @@ private void push(CompletableFuture result) { ResultOrError res = result.join(); log.debug("Writing to sink"); sink.write(res); + queueDepth.decrementAndGet(); } catch (IOException e) { log.error("Error when writing streaming result to response channel.", e); } diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java index 7b5af17d73..a92e18e6e2 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/response/StreamingResponseFactory.java @@ -19,6 +19,8 @@ import io.confluent.kafkarest.config.ConfigModule.StreamingMaxConnectionDurationConfig; import io.confluent.kafkarest.config.ConfigModule.StreamingMaxConnectionGracePeriod; +import io.confluent.kafkarest.config.ConfigModule.getStreamingConnectionMaxQueueDepthBeforeDisconnect; +import io.confluent.kafkarest.config.ConfigModule.getStreamingConnectionMaxQueueDepthBeforeThrottling; import java.time.Duration; import javax.inject.Inject; @@ -27,18 +29,30 @@ public final class StreamingResponseFactory { private final ChunkedOutputFactory chunkedOutputFactory; private final Duration maxDuration; private final Duration gracePeriod; + private final Integer throttleDepth; + private final Integer disconnectDepth; @Inject public StreamingResponseFactory( ChunkedOutputFactory chunkedOutputFactory, @StreamingMaxConnectionDurationConfig Duration maxDuration, - @StreamingMaxConnectionGracePeriod Duration gracePeriod) { + @StreamingMaxConnectionGracePeriod Duration gracePeriod, + @getStreamingConnectionMaxQueueDepthBeforeThrottling Integer throttleDepth, + @getStreamingConnectionMaxQueueDepthBeforeDisconnect Integer disconnectDepth) { this.chunkedOutputFactory = requireNonNull(chunkedOutputFactory); this.maxDuration = maxDuration; this.gracePeriod = gracePeriod; + this.throttleDepth = throttleDepth; + this.disconnectDepth = disconnectDepth; } public StreamingResponse from(JsonStream inputStream) { - return StreamingResponse.from(inputStream, chunkedOutputFactory, maxDuration, gracePeriod); + return StreamingResponse.from( + inputStream, + chunkedOutputFactory, + maxDuration, + gracePeriod, + throttleDepth, + disconnectDepth); } } diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java index 992d9b184f..f9252a320b 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/resources/v3/ProduceActionTest.java @@ -77,6 +77,8 @@ public class ProduceActionTest { private static final Duration FIVE_SECONDS_MS = Duration.ofMillis(5000); + private static final Duration FIVE_MS = Duration.ofMillis(5); + private static final int DEPTH = 100; @AfterAll public static void cleanUp() { @@ -839,7 +841,7 @@ private static ProduceAction getProduceAction( replay(produceControllerProvider, produceController); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_SECONDS_MS); + new StreamingResponseFactory(chunkedOutputFactory, FIVE_SECONDS_MS, FIVE_MS, DEPTH, DEPTH); // get the current thread so that the call counts can be seen by easy mock ExecutorService executorService = MoreExecutors.newDirectExecutorService(); diff --git a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java index 37ceef3814..c986cd6816 100644 --- a/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java +++ b/kafka-rest/src/test/java/io/confluent/kafkarest/response/StreamingResponseTest.java @@ -41,6 +41,8 @@ public class StreamingResponseTest { private static final Duration DURATION = Duration.ofMillis(5000); + private static final Duration GRACE_DURATION = Duration.ofMillis(1); + private static final int DEPTH = 100; @Test public void testGracePeriodExceededExceptionThrown() throws IOException { @@ -92,7 +94,8 @@ public void testGracePeriodExceededExceptionThrown() throws IOException { replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION); + new StreamingResponseFactory( + mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -156,7 +159,8 @@ public void testWriteToChunkedOutput() throws IOException { replay(mockedChunkedOutput, mockedChunkedOutputFactory); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION); + new StreamingResponseFactory( + mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requestsMappingIterator)); @@ -201,7 +205,8 @@ public void testHasNextMappingException() throws IOException { replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION); + new StreamingResponseFactory( + mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -241,7 +246,8 @@ public void testHasNextRuntimeException() throws IOException { replay(mockedChunkedOutput); StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory(mockedChunkedOutputFactory, DURATION, DURATION); + new StreamingResponseFactory( + mockedChunkedOutputFactory, DURATION, GRACE_DURATION, DEPTH, DEPTH); StreamingResponse streamingResponse = streamingResponseFactory.from(new JsonStream<>(() -> requests)); @@ -255,7 +261,7 @@ public void testHasNextRuntimeException() throws IOException { } @Test - public void testWriteToChunkedOutputAfterTimeout() throws IOException, InterruptedException { + public void testWriteToChunkedOutputAfterTimeout() throws IOException { String key = "foo"; String value = "bar"; ProduceRequest request = @@ -294,69 +300,272 @@ public void testWriteToChunkedOutputAfterTimeout() throws IOException, Interrupt ResultOrError.error( ErrorResponse.create( 408, - "Streaming connection open for longer than allowed: Connection will be closed.")); + "Streaming connection open for longer than allowed: " + + "Connection will be closed.")); ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class); ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class); expect(clock.instant()) - .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response (check) - expect(clock.instant()) - .andReturn(Instant.ofEpochMilli(0)); // stream start - composing response (check) - expect(requestsMappingIterator.hasNext()).andReturn(true); // first message - OK + .andReturn(Instant.ofEpochMilli(0)); // stream start - input stream response + + // first message + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); + expect(requestsMappingIterator.hasNext()).andReturn(true); expect(requestsMappingIterator.nextValue()).andReturn(request); expect(clock.instant()) .andReturn(Instant.ofEpochMilli(1)); // first comparison duration. within timeout - expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput); - mockedChunkedOutput.write(sucessResult); expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(sucessResult); - expect(requestsMappingIterator.hasNext()).andReturn(true); // is another message - expect(requestsMappingIterator.nextValue()).andReturn(request); // second message - bad gateway + // second message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(requestsMappingIterator.nextValue()).andReturn(request); expect(clock.instant()) .andReturn(Instant.ofEpochMilli(timeout + 5)); // second message beyond timeout - + expect(mockedChunkedOutput.isClosed()).andReturn(false); mockedChunkedOutput.write(error); + + // no third message + expect(requestsMappingIterator.hasNext()).andReturn(false); + + requestsMappingIterator.close(); // call from thread executor + mockedChunkedOutput.close(); // call from thread executor + + requestsMappingIterator.close(); // call from finally + mockedChunkedOutput.close(); // call from finally + + replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock); + + final StreamingResponse streamingResponse = + StreamingResponse.fromWithClock( + new JsonStream<>(() -> requestsMappingIterator), + mockedChunkedOutputFactory, + Duration.ofMillis(timeout), + Duration.ofMillis(50), + DEPTH, + DEPTH, + clock); + + CompletableFuture produceResponseFuture = new CompletableFuture<>(); + produceResponseFuture.complete(produceResponse); + + FakeAsyncResponse response = new FakeAsyncResponse(); + streamingResponse.compose(result -> produceResponseFuture).resume(response); + + try { + EasyMock.verify(mockedChunkedOutput); + EasyMock.verify(mockedChunkedOutputFactory); + EasyMock.verify(requestsMappingIterator); + EasyMock.verify(clock); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void testSlowWritesToKafka429() throws IOException { + + String key = "foo"; + String value = "bar"; + ProduceRequest request = + ProduceRequest.builder() + .setKey( + ProduceRequestData.builder() + .setFormat(EmbeddedFormat.AVRO) + .setRawSchema("{\"type\": \"string\"}") + .setData(TextNode.valueOf(key)) + .build()) + .setValue( + ProduceRequestData.builder() + .setFormat(EmbeddedFormat.AVRO) + .setRawSchema("{\"type\": \"string\"}") + .setData(TextNode.valueOf(value)) + .build()) + .setOriginalSize(0L) + .build(); + + MappingIterator requestsMappingIterator = mock(MappingIterator.class); + + long timeout = 10; + Clock clock = mock(Clock.class); + + CompletableFuture produceResponseFuture = new CompletableFuture<>(); + + ProduceResponse produceResponse = + ProduceResponse.builder() + .setClusterId("clusterId") + .setTopicName("topicName") + .setPartitionId(1) + .setOffset(1L) + .setErrorCode(HttpStatus.OK_200) + .build(); + ResultOrError sucessResult = ResultOrError.result(produceResponse); + + ResultOrError error = + ResultOrError.error( + ErrorResponse.create( + 429, + "Backlog of messages waiting to be sent to Kafka is too large: Not " + + "sending to Kafka.")); + + ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class); + ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class); + + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); + + // first message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); + expect(requestsMappingIterator.nextValue()).andReturn(request); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(1)); + expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput); expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(sucessResult); - expect(requestsMappingIterator.hasNext()) + // second message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(2)); + expect(requestsMappingIterator.nextValue()) .andAnswer( - () -> { // return hasnext true AFTER the thread executor has timed out and closed the - // connections. Then expect no other calls except the connection closing. - Thread.sleep(500); - return true; + () -> { + produceResponseFuture.complete(produceResponse); + return request; }); + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(error); - requestsMappingIterator.close(); // this ensures the closes have been called - mockedChunkedOutput.close(); - requestsMappingIterator.close(); // expect twice - one from the thread and one from the finally + expect(requestsMappingIterator.hasNext()).andReturn(false); + + requestsMappingIterator.close(); // closes from the finally mockedChunkedOutput.close(); replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock); - StreamingResponseFactory streamingResponseFactory = - new StreamingResponseFactory( - mockedChunkedOutputFactory, Duration.ofMillis(timeout), Duration.ofMillis(50)); - StreamingResponse streamingResponse = StreamingResponse.fromWithClock( new JsonStream<>(() -> requestsMappingIterator), mockedChunkedOutputFactory, Duration.ofMillis(timeout), Duration.ofMillis(50), + 1, + 2, clock); + FakeAsyncResponse response = new FakeAsyncResponse(); + streamingResponse.compose(result -> produceResponseFuture).resume(response); + + EasyMock.verify(mockedChunkedOutput); + EasyMock.verify(mockedChunkedOutputFactory); + EasyMock.verify(requestsMappingIterator); + EasyMock.verify(clock); + } + + @Test + public void testSlowWritesToKafkaDisconnects() throws IOException { + + String key = "foo"; + String value = "bar"; + ProduceRequest request = + ProduceRequest.builder() + .setKey( + ProduceRequestData.builder() + .setFormat(EmbeddedFormat.AVRO) + .setRawSchema("{\"type\": \"string\"}") + .setData(TextNode.valueOf(key)) + .build()) + .setValue( + ProduceRequestData.builder() + .setFormat(EmbeddedFormat.AVRO) + .setRawSchema("{\"type\": \"string\"}") + .setData(TextNode.valueOf(value)) + .build()) + .setOriginalSize(0L) + .build(); + + MappingIterator requestsMappingIterator = mock(MappingIterator.class); + + long timeout = 10; + Clock clock = mock(Clock.class); + CompletableFuture produceResponseFuture = new CompletableFuture<>(); - produceResponseFuture.complete(produceResponse); + + ProduceResponse produceResponse = + ProduceResponse.builder() + .setClusterId("clusterId") + .setTopicName("topicName") + .setPartitionId(1) + .setOffset(1L) + .setErrorCode(HttpStatus.OK_200) + .build(); + ResultOrError sucessResult = ResultOrError.result(produceResponse); + + ResultOrError error = + ResultOrError.error( + ErrorResponse.create( + 429, + "Backlog of messages waiting to be sent to Kafka is too large: Not " + + "sending to Kafka.")); + + ChunkedOutputFactory mockedChunkedOutputFactory = mock(ChunkedOutputFactory.class); + ChunkedOutput mockedChunkedOutput = mock(ChunkedOutput.class); + + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); + + // first message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(0)); + expect(requestsMappingIterator.nextValue()).andReturn(request); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(1)); + expect(mockedChunkedOutputFactory.getChunkedOutput()).andReturn(mockedChunkedOutput); + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(sucessResult); + + // second message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(2)); + expect(requestsMappingIterator.nextValue()) + .andAnswer( + () -> { + return request; + }); + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(error); + + // third message + expect(requestsMappingIterator.hasNext()).andReturn(true); + expect(clock.instant()).andReturn(Instant.ofEpochMilli(2)); + expect(requestsMappingIterator.nextValue()) + .andAnswer( + () -> { + produceResponseFuture.complete(produceResponse); + return request; + }); + expect(mockedChunkedOutput.isClosed()).andReturn(false); + mockedChunkedOutput.write(error); + + expect(requestsMappingIterator.hasNext()).andReturn(false); + + requestsMappingIterator.close(); // closes from thread executor + mockedChunkedOutput.close(); + requestsMappingIterator.close(); // closes from finally + mockedChunkedOutput.close(); + + replay(mockedChunkedOutput, mockedChunkedOutputFactory, requestsMappingIterator, clock); + + StreamingResponse streamingResponse = + StreamingResponse.fromWithClock( + new JsonStream<>(() -> requestsMappingIterator), + mockedChunkedOutputFactory, + Duration.ofMillis(timeout), + Duration.ofMillis(50), + 1, + 2, + clock); FakeAsyncResponse response = new FakeAsyncResponse(); - streamingResponse - .compose( - result -> { - return produceResponseFuture; - }) - .resume(response); + streamingResponse.compose(result -> produceResponseFuture).resume(response); EasyMock.verify(mockedChunkedOutput); EasyMock.verify(mockedChunkedOutputFactory);