Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KREST-2746 Reflect slow responses from Kafka back to the http client #1043

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,29 @@ public class KafkaRestConfig extends RestConfig {
"streaming.connection.max.duration.grace.period.ms";
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC =
"How long after a streaming connection reaches its maximum duration outstanding "
+ "requests will be processed for before the connection is closed.";
+ "requests will be processed for before the connection is closed. "
+ "Maximum value is 10 seconds.";
private static final String STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT = "500";
public static final ConfigDef.Range STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR =
ConfigDef.Range.between(1, 10000);

public static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH =
"streaming.response.queue.throttle.depth";
private static final String STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DOC =
"The maximum depth of the response queue that is used to hold requests as they are being "
+ "processed by Kafka. If this queue grows too long it indicates that Kafka is "
+ "processing messages more slowly than the user is sending them. After this "
+ "queue depth is reached, then all requests will receive a 429 response until the"
+ "queue depth returns under the limit.";
private static final Integer STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DEFAULT = 100;

public static final String STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH =
"streaming.response.queue.disconnect.depth";
private static final String STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DOC =
"The maximum depth of the response queue that is used to hold requests as they are being"
+ "processed by Kafka. If the queue depth grows beyond this limit then the connection"
+ "is closed.";
private static final Integer STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DEFAULT = 200;

private static final ConfigDef config;
private volatile Metrics metrics;
Expand Down Expand Up @@ -846,8 +867,21 @@ protected static ConfigDef baseKafkaRestConfigDef() {
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS,
Type.LONG,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DEFAULT,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_VALIDATOR,
Importance.LOW,
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC);
STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS_DOC)
.define(
STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH,
Type.INT,
STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DEFAULT,
Importance.LOW,
STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH_DOC)
.define(
STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH,
Type.INT,
STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DEFAULT,
Importance.LOW,
STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH_DOC);
}

private static Properties getPropsFromFile(String propsFile) throws RestConfigException {
Expand Down Expand Up @@ -1088,6 +1122,14 @@ public final Duration getStreamingConnectionMaxDurationGracePeriod() {
return Duration.ofMillis(getLong(STREAMING_CONNECTION_MAX_DURATION_GRACE_PERIOD_MS));
}

public final Integer getStreamingConnectionMaxQueueDepthBeforeThrottling() {
return getInt(STREAMING_RESPONSE_QUEUE_THROTTLE_DEPTH);
}

public final Integer getStreamingConnectionMaxQueueDepthBeforeDisconnect() {
return getInt(STREAMING_RESPONSE_QUEUE_DISCONNECT_DEPTH);
}

public final int getRateLimitDefaultCost() {
return getInt(RATE_LIMIT_DEFAULT_COST_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ protected void configure() {
.qualifiedBy(new StreamingConnectionMaxDurationGracePeriodImpl())
.to(Duration.class);

bind(config.getStreamingConnectionMaxQueueDepthBeforeThrottling())
.qualifiedBy(new getStreamingConnectionMaxQueueDepthBeforeThrottlingImpl())
.to(Integer.class);

bind(config.getStreamingConnectionMaxQueueDepthBeforeDisconnect())
.qualifiedBy(new getStreamingConnectionMaxQueueDepthBeforeDisconnectImpl())
.to(Integer.class);

bind(config.getSchemaRegistryConfigs())
.qualifiedBy(new SchemaRegistryConfigsImpl())
.to(new TypeLiteral<Map<String, Object>>() {});
Expand Down Expand Up @@ -441,5 +449,23 @@ private static final class StreamingConnectionMaxDurationConfigImpl
private static final class StreamingConnectionMaxDurationGracePeriodImpl
extends AnnotationLiteral<StreamingMaxConnectionGracePeriod>
implements StreamingMaxConnectionGracePeriod {}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
public @interface getStreamingConnectionMaxQueueDepthBeforeThrottling {}

private static final class getStreamingConnectionMaxQueueDepthBeforeThrottlingImpl
extends AnnotationLiteral<getStreamingConnectionMaxQueueDepthBeforeThrottling>
implements getStreamingConnectionMaxQueueDepthBeforeThrottling {}

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER})
public @interface getStreamingConnectionMaxQueueDepthBeforeDisconnect {}

private static final class getStreamingConnectionMaxQueueDepthBeforeDisconnectImpl
extends AnnotationLiteral<getStreamingConnectionMaxQueueDepthBeforeDisconnect>
implements getStreamingConnectionMaxQueueDepthBeforeDisconnect {}
}
// CHECKSTYLE:ON:ClassDataAbstractionCoupling
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -72,6 +73,7 @@ public abstract class StreamingResponse<T> {

private static final Logger log = LoggerFactory.getLogger(StreamingResponse.class);
private static final int ONE_SECOND_MS = 1000;
private static final int MAX_CLOSE_RETRIES = 2;

private static final CompositeErrorMapper EXCEPTION_MAPPER =
new CompositeErrorMapper.Builder()
Expand Down Expand Up @@ -109,30 +111,45 @@ public abstract class StreamingResponse<T> {
private final ChunkedOutputFactory chunkedOutputFactory;
private final Duration maxDuration;
private final Duration gracePeriod;
private final Integer throttleDepth;
private final Integer disconnectDepth;
private final Instant streamStartTime;
private final Clock clock;

volatile boolean closingStarted = false;
volatile boolean closingFinished = false;

StreamingResponse(
ChunkedOutputFactory chunkedOutputFactory,
Duration maxDuration,
Duration gracePeriod,
Integer throttleDepth,
Integer disconnectDepth,
Clock clock) {
this.clock = clock;
this.streamStartTime = clock.instant();
this.chunkedOutputFactory = requireNonNull(chunkedOutputFactory);
this.maxDuration = maxDuration;
this.gracePeriod = gracePeriod;
this.throttleDepth = throttleDepth;
this.disconnectDepth = disconnectDepth;
}

public static <T> StreamingResponse<T> from(
JsonStream<T> inputStream,
ChunkedOutputFactory chunkedOutputFactory,
Duration maxDuration,
Duration gracePeriod) {
Duration gracePeriod,
Integer throttleDepth,
Integer disconnectDepth) {
return new InputStreamingResponse<>(
inputStream, chunkedOutputFactory, maxDuration, gracePeriod, Clock.systemUTC());
inputStream,
chunkedOutputFactory,
maxDuration,
gracePeriod,
throttleDepth,
disconnectDepth,
Clock.systemUTC());
}

@VisibleForTesting
Expand All @@ -141,15 +158,29 @@ static <T> StreamingResponse<T> fromWithClock(
ChunkedOutputFactory chunkedOutputFactory,
Duration maxDuration,
Duration gracePeriod,
Integer throttleDepth,
Integer disconnectDepth,
Clock clock) {
return new InputStreamingResponse<>(
inputStream, chunkedOutputFactory, maxDuration, gracePeriod, clock);
inputStream,
chunkedOutputFactory,
maxDuration,
gracePeriod,
throttleDepth,
disconnectDepth,
clock);
}

public final <O> StreamingResponse<O> compose(
Function<? super T, ? extends CompletableFuture<O>> transform) {
return new ComposingStreamingResponse<>(
this, transform, chunkedOutputFactory, maxDuration, gracePeriod);
this,
transform,
chunkedOutputFactory,
maxDuration,
gracePeriod,
throttleDepth,
disconnectDepth);
}

/**
Expand All @@ -170,20 +201,9 @@ public final void resume(AsyncResponse asyncResponse) {
// need to recheck closingStarted because hasNext can take time to respond
if (!closingStarted
&& Duration.between(streamStartTime, clock.instant()).compareTo(maxDuration) > 0) {
if (executorService == null) {
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.schedule(
() -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS);
}
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
Status.REQUEST_TIMEOUT,
"Streaming connection open for longer than allowed",
"Connection will be closed.")))));
handleOverMaxStreamDuration(executorService, responseQueue);
} else if (!closingStarted && responseQueue.getTailLength().get() >= throttleDepth) {
handleResponseQueueDepthTooLarge(executorService, responseQueue);
} else if (!closingStarted) {
responseQueue.push(next().handle(this::handleNext));
} else {
Expand All @@ -196,12 +216,24 @@ public final void resume(AsyncResponse asyncResponse) {
CompletableFuture.completedFuture(
ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e))));
} finally {
// if there are still outstanding response to send back, for example hasNext has returned
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the MAX_CLOSE_RETRIES

// false, but the executorService hasn't triggered yet, give everything a chance to
// complete within the grace period plus a little bit.
int retries = 0;
while (!closingFinished && retries < MAX_CLOSE_RETRIES) {
retries++;
try {
Thread.sleep(gracePeriod.toMillis() / retries);
} catch (InterruptedException e) {
// do nothing
}
}
close();
responseQueue.close();
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(ONE_SECOND_MS, TimeUnit.MILLISECONDS)) {
if (!executorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
Expand All @@ -211,12 +243,47 @@ public final void resume(AsyncResponse asyncResponse) {
}
}

private void closeAll(AsyncResponseQueue responseQueue) {
closingStarted = true;
close();
responseQueue.close();
private void handleOverMaxStreamDuration(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
triggerDelayedClose(executorService, responseQueue);
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
Status.REQUEST_TIMEOUT,
"Streaming connection open for longer than allowed",
"Connection will be closed.")))));
}

private void handleResponseQueueDepthTooLarge(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
if (responseQueue.getTailLength().get() >= disconnectDepth) {
triggerDelayedClose(executorService, responseQueue);
}
next();
responseQueue.push(
CompletableFuture.completedFuture(
ResultOrError.error(
EXCEPTION_MAPPER.toErrorResponse(
new StatusCodeException(
Status.TOO_MANY_REQUESTS,
"Backlog of messages waiting to be sent to Kafka is too large",
"Not sending to Kafka.")))));
}

private void triggerDelayedClose(
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) {
if (executorService == null) {
executorService = Executors.newSingleThreadScheduledExecutor();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is null, so it's not an object reference that we can overwrite/edit at this point.

Need to eg return the executor service and set the original object to that value

executorService.schedule(
() -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS);
}
}

abstract void closeAll(AsyncResponseQueue responseQueue);

private ResultOrError handleNext(T result, @Nullable Throwable error) {
if (error == null) {
return ResultOrError.result(result);
Expand All @@ -241,11 +308,17 @@ private InputStreamingResponse(
ChunkedOutputFactory chunkedOutputFactory,
Duration maxDuration,
Duration gracePeriod,
Integer throttleDepth,
Integer disconnectDepth,
Clock clock) {
super(chunkedOutputFactory, maxDuration, gracePeriod, clock);
super(chunkedOutputFactory, maxDuration, gracePeriod, throttleDepth, disconnectDepth, clock);
this.inputStream = requireNonNull(inputStream);
}

public void closeAll(AsyncResponseQueue responseQueue) {
// Never called
}

public void close() {
try {
inputStream.close();
Expand Down Expand Up @@ -288,12 +361,30 @@ private ComposingStreamingResponse(
Function<? super I, ? extends CompletableFuture<O>> transform,
ChunkedOutputFactory chunkedOutputFactory,
Duration maxDuration,
Duration gracePeriod) {
super(chunkedOutputFactory, maxDuration, gracePeriod, streamingResponseInput.clock);
Duration gracePeriod,
Integer throttleDepth,
Integer disconnectDepth) {
super(
chunkedOutputFactory,
maxDuration,
gracePeriod,
throttleDepth,
disconnectDepth,
streamingResponseInput.clock);
this.streamingResponseInput = requireNonNull(streamingResponseInput);
this.transform = requireNonNull(transform);
}

@Override
protected void closeAll(AsyncResponseQueue responseQueue) {
streamingResponseInput.closingStarted = true;
closingStarted = true;
close();
responseQueue.close();
streamingResponseInput.closingFinished = true;
closingFinished = true;
}

@Override
public boolean hasNext() {
try {
Expand Down Expand Up @@ -347,14 +438,16 @@ private void asyncResume(AsyncResponse asyncResponse) {
asyncResponse.resume(Response.ok(sink).build());
}

private volatile boolean sinkClosed = false;
volatile boolean sinkClosed = false;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this no longer private?


private volatile AtomicInteger queueDepth = new AtomicInteger(0);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be volatile


private boolean isClosed() {
return sinkClosed;
private AtomicInteger getTailLength() {
return queueDepth;
}

private void push(CompletableFuture<ResultOrError> result) {
log.debug("Pushing to response queue");
queueDepth.incrementAndGet();
tail =
CompletableFuture.allOf(tail, result)
.thenApply(
Expand All @@ -367,6 +460,7 @@ private void push(CompletableFuture<ResultOrError> result) {
ResultOrError res = result.join();
log.debug("Writing to sink");
sink.write(res);
queueDepth.decrementAndGet();
} catch (IOException e) {
log.error("Error when writing streaming result to response channel.", e);
}
Expand Down
Loading