-
Notifications
You must be signed in to change notification settings - Fork 654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KREST-2746 Reflect slow responses from Kafka back to the http client #1043
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ | |
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Function; | ||
import javax.annotation.Nullable; | ||
import javax.ws.rs.WebApplicationException; | ||
|
@@ -72,6 +73,7 @@ public abstract class StreamingResponse<T> { | |
|
||
private static final Logger log = LoggerFactory.getLogger(StreamingResponse.class); | ||
private static final int ONE_SECOND_MS = 1000; | ||
private static final int MAX_CLOSE_RETRIES = 2; | ||
|
||
private static final CompositeErrorMapper EXCEPTION_MAPPER = | ||
new CompositeErrorMapper.Builder() | ||
|
@@ -109,30 +111,45 @@ public abstract class StreamingResponse<T> { | |
private final ChunkedOutputFactory chunkedOutputFactory; | ||
private final Duration maxDuration; | ||
private final Duration gracePeriod; | ||
private final Integer throttleDepth; | ||
private final Integer disconnectDepth; | ||
private final Instant streamStartTime; | ||
private final Clock clock; | ||
|
||
volatile boolean closingStarted = false; | ||
volatile boolean closingFinished = false; | ||
|
||
StreamingResponse( | ||
ChunkedOutputFactory chunkedOutputFactory, | ||
Duration maxDuration, | ||
Duration gracePeriod, | ||
Integer throttleDepth, | ||
Integer disconnectDepth, | ||
Clock clock) { | ||
this.clock = clock; | ||
this.streamStartTime = clock.instant(); | ||
this.chunkedOutputFactory = requireNonNull(chunkedOutputFactory); | ||
this.maxDuration = maxDuration; | ||
this.gracePeriod = gracePeriod; | ||
this.throttleDepth = throttleDepth; | ||
this.disconnectDepth = disconnectDepth; | ||
} | ||
|
||
public static <T> StreamingResponse<T> from( | ||
JsonStream<T> inputStream, | ||
ChunkedOutputFactory chunkedOutputFactory, | ||
Duration maxDuration, | ||
Duration gracePeriod) { | ||
Duration gracePeriod, | ||
Integer throttleDepth, | ||
Integer disconnectDepth) { | ||
return new InputStreamingResponse<>( | ||
inputStream, chunkedOutputFactory, maxDuration, gracePeriod, Clock.systemUTC()); | ||
inputStream, | ||
chunkedOutputFactory, | ||
maxDuration, | ||
gracePeriod, | ||
throttleDepth, | ||
disconnectDepth, | ||
Clock.systemUTC()); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -141,15 +158,29 @@ static <T> StreamingResponse<T> fromWithClock( | |
ChunkedOutputFactory chunkedOutputFactory, | ||
Duration maxDuration, | ||
Duration gracePeriod, | ||
Integer throttleDepth, | ||
Integer disconnectDepth, | ||
Clock clock) { | ||
return new InputStreamingResponse<>( | ||
inputStream, chunkedOutputFactory, maxDuration, gracePeriod, clock); | ||
inputStream, | ||
chunkedOutputFactory, | ||
maxDuration, | ||
gracePeriod, | ||
throttleDepth, | ||
disconnectDepth, | ||
clock); | ||
} | ||
|
||
public final <O> StreamingResponse<O> compose( | ||
Function<? super T, ? extends CompletableFuture<O>> transform) { | ||
return new ComposingStreamingResponse<>( | ||
this, transform, chunkedOutputFactory, maxDuration, gracePeriod); | ||
this, | ||
transform, | ||
chunkedOutputFactory, | ||
maxDuration, | ||
gracePeriod, | ||
throttleDepth, | ||
disconnectDepth); | ||
} | ||
|
||
/** | ||
|
@@ -170,20 +201,9 @@ public final void resume(AsyncResponse asyncResponse) { | |
// need to recheck closingStarted because hasNext can take time to respond | ||
if (!closingStarted | ||
&& Duration.between(streamStartTime, clock.instant()).compareTo(maxDuration) > 0) { | ||
if (executorService == null) { | ||
executorService = Executors.newSingleThreadScheduledExecutor(); | ||
executorService.schedule( | ||
() -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS); | ||
} | ||
next(); | ||
responseQueue.push( | ||
CompletableFuture.completedFuture( | ||
ResultOrError.error( | ||
EXCEPTION_MAPPER.toErrorResponse( | ||
new StatusCodeException( | ||
Status.REQUEST_TIMEOUT, | ||
"Streaming connection open for longer than allowed", | ||
"Connection will be closed."))))); | ||
handleOverMaxStreamDuration(executorService, responseQueue); | ||
} else if (!closingStarted && responseQueue.getTailLength().get() >= throttleDepth) { | ||
handleResponseQueueDepthTooLarge(executorService, responseQueue); | ||
} else if (!closingStarted) { | ||
responseQueue.push(next().handle(this::handleNext)); | ||
} else { | ||
|
@@ -196,12 +216,24 @@ public final void resume(AsyncResponse asyncResponse) { | |
CompletableFuture.completedFuture( | ||
ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e)))); | ||
} finally { | ||
// if there are still outstanding response to send back, for example hasNext has returned | ||
// false, but the executorService hasn't triggered yet, give everything a chance to | ||
// complete within the grace period plus a little bit. | ||
int retries = 0; | ||
while (!closingFinished && retries < MAX_CLOSE_RETRIES) { | ||
retries++; | ||
try { | ||
Thread.sleep(gracePeriod.toMillis() / retries); | ||
} catch (InterruptedException e) { | ||
// do nothing | ||
} | ||
} | ||
close(); | ||
responseQueue.close(); | ||
if (executorService != null) { | ||
executorService.shutdown(); | ||
try { | ||
if (!executorService.awaitTermination(ONE_SECOND_MS, TimeUnit.MILLISECONDS)) { | ||
if (!executorService.awaitTermination(gracePeriod.toMillis(), TimeUnit.MILLISECONDS)) { | ||
executorService.shutdownNow(); | ||
} | ||
} catch (InterruptedException e) { | ||
|
@@ -211,12 +243,47 @@ public final void resume(AsyncResponse asyncResponse) { | |
} | ||
} | ||
|
||
private void closeAll(AsyncResponseQueue responseQueue) { | ||
closingStarted = true; | ||
close(); | ||
responseQueue.close(); | ||
private void handleOverMaxStreamDuration( | ||
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { | ||
triggerDelayedClose(executorService, responseQueue); | ||
next(); | ||
responseQueue.push( | ||
CompletableFuture.completedFuture( | ||
ResultOrError.error( | ||
EXCEPTION_MAPPER.toErrorResponse( | ||
new StatusCodeException( | ||
Status.REQUEST_TIMEOUT, | ||
"Streaming connection open for longer than allowed", | ||
"Connection will be closed."))))); | ||
} | ||
|
||
private void handleResponseQueueDepthTooLarge( | ||
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { | ||
if (responseQueue.getTailLength().get() >= disconnectDepth) { | ||
triggerDelayedClose(executorService, responseQueue); | ||
} | ||
next(); | ||
responseQueue.push( | ||
CompletableFuture.completedFuture( | ||
ResultOrError.error( | ||
EXCEPTION_MAPPER.toErrorResponse( | ||
new StatusCodeException( | ||
Status.TOO_MANY_REQUESTS, | ||
"Backlog of messages waiting to be sent to Kafka is too large", | ||
"Not sending to Kafka."))))); | ||
} | ||
|
||
private void triggerDelayedClose( | ||
ScheduledExecutorService executorService, AsyncResponseQueue responseQueue) { | ||
if (executorService == null) { | ||
executorService = Executors.newSingleThreadScheduledExecutor(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is null, so it's not an object reference that we can overwrite/edit at this point. Need to eg return the executor service and set the original object to that value |
||
executorService.schedule( | ||
() -> closeAll(responseQueue), gracePeriod.toMillis(), TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
abstract void closeAll(AsyncResponseQueue responseQueue); | ||
|
||
private ResultOrError handleNext(T result, @Nullable Throwable error) { | ||
if (error == null) { | ||
return ResultOrError.result(result); | ||
|
@@ -241,11 +308,17 @@ private InputStreamingResponse( | |
ChunkedOutputFactory chunkedOutputFactory, | ||
Duration maxDuration, | ||
Duration gracePeriod, | ||
Integer throttleDepth, | ||
Integer disconnectDepth, | ||
Clock clock) { | ||
super(chunkedOutputFactory, maxDuration, gracePeriod, clock); | ||
super(chunkedOutputFactory, maxDuration, gracePeriod, throttleDepth, disconnectDepth, clock); | ||
this.inputStream = requireNonNull(inputStream); | ||
} | ||
|
||
public void closeAll(AsyncResponseQueue responseQueue) { | ||
// Never called | ||
} | ||
|
||
public void close() { | ||
try { | ||
inputStream.close(); | ||
|
@@ -288,12 +361,30 @@ private ComposingStreamingResponse( | |
Function<? super I, ? extends CompletableFuture<O>> transform, | ||
ChunkedOutputFactory chunkedOutputFactory, | ||
Duration maxDuration, | ||
Duration gracePeriod) { | ||
super(chunkedOutputFactory, maxDuration, gracePeriod, streamingResponseInput.clock); | ||
Duration gracePeriod, | ||
Integer throttleDepth, | ||
Integer disconnectDepth) { | ||
super( | ||
chunkedOutputFactory, | ||
maxDuration, | ||
gracePeriod, | ||
throttleDepth, | ||
disconnectDepth, | ||
streamingResponseInput.clock); | ||
this.streamingResponseInput = requireNonNull(streamingResponseInput); | ||
this.transform = requireNonNull(transform); | ||
} | ||
|
||
@Override | ||
protected void closeAll(AsyncResponseQueue responseQueue) { | ||
streamingResponseInput.closingStarted = true; | ||
closingStarted = true; | ||
close(); | ||
responseQueue.close(); | ||
streamingResponseInput.closingFinished = true; | ||
closingFinished = true; | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
try { | ||
|
@@ -347,14 +438,16 @@ private void asyncResume(AsyncResponse asyncResponse) { | |
asyncResponse.resume(Response.ok(sink).build()); | ||
} | ||
|
||
private volatile boolean sinkClosed = false; | ||
volatile boolean sinkClosed = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this no longer private? |
||
|
||
private volatile AtomicInteger queueDepth = new AtomicInteger(0); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't be volatile |
||
|
||
private boolean isClosed() { | ||
return sinkClosed; | ||
private AtomicInteger getTailLength() { | ||
return queueDepth; | ||
} | ||
|
||
private void push(CompletableFuture<ResultOrError> result) { | ||
log.debug("Pushing to response queue"); | ||
queueDepth.incrementAndGet(); | ||
tail = | ||
CompletableFuture.allOf(tail, result) | ||
.thenApply( | ||
|
@@ -367,6 +460,7 @@ private void push(CompletableFuture<ResultOrError> result) { | |
ResultOrError res = result.join(); | ||
log.debug("Writing to sink"); | ||
sink.write(res); | ||
queueDepth.decrementAndGet(); | ||
} catch (IOException e) { | ||
log.error("Error when writing streaming result to response channel.", e); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the MAX_CLOSE_RETRIES