Skip to content

Commit

Permalink
Kafka Streams Threading: Timeout behavior (apache#14171)
Browse files Browse the repository at this point in the history
Implement setting and clearing task timeouts, as well as changing the output on exceptions to make
it similar to the existing code path. 

Reviewer: Walker Carlson <[email protected]>
  • Loading branch information
lucasbru authored Aug 31, 2023
1 parent 43fe133 commit 16dc983
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Collection<Task> successfullyProcessed() {
return successfullyProcessed;
}

void addToSuccessfullyProcessed(final Task task) {
public void addToSuccessfullyProcessed(final Task task) {
successfullyProcessed.add(task);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
*/
package org.apache.kafka.streams.processor.internals.tasks;

import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.slf4j.Logger;

Expand Down Expand Up @@ -93,9 +98,9 @@ private void runOnce(final long nowMs) {
boolean progressed = false;

if (taskExecutionMetadata.canProcessTask(currentTask, nowMs) && currentTask.isProcessable(nowMs)) {
log.trace("processing record for task {}", currentTask.id());
currentTask.process(nowMs);
progressed = true;
if (processTask(currentTask, nowMs, time)) {
progressed = true;
}
}

if (taskExecutionMetadata.canPunctuateTask(currentTask)) {
Expand All @@ -115,6 +120,44 @@ private void runOnce(final long nowMs) {
}
}

private boolean processTask(final Task task, final long now, final Time time) {
boolean processed = false;
try {
processed = task.process(now);
if (processed) {
task.clearTaskTimeout();
// TODO: enable regardless of whether using named topologies
if (taskExecutionMetadata.hasNamedTopologies() && taskExecutionMetadata.processingMode() != EXACTLY_ONCE_V2) {
log.trace("Successfully processed task {}", task.id());
taskExecutionMetadata.addToSuccessfullyProcessed(task);
}
}
} catch (final TimeoutException timeoutException) {
// TODO consolidate TimeoutException retries with general error handling
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
log.error(
String.format(
"Could not complete processing records for %s due to the following exception; will move to next task and retry later",
task.id()),
timeoutException
);
} catch (final TaskMigratedException e) {
log.info("Failed to process stream task {} since it got migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);
e.setTaskId(task.id());
throw e;
} catch (final RuntimeException e) {
log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);
throw new StreamsException(e, task.id());
} finally {
task.recordProcessBatchTime(time.milliseconds() - now);
}
return processed;
}

private StreamTask unassignCurrentTask() {
if (currentTask == null)
throw new IllegalStateException("Does not own any task while being ask to unassign from task manager");
Expand All @@ -133,11 +176,11 @@ private StreamTask unassignCurrentTask() {
private final Time time;
private final String name;
private final TaskManager taskManager;
private final TaskExecutionMetadata taskExecutionMetadata;

private StreamTask currentTask = null;
private TaskExecutorThread taskExecutorThread = null;
private CountDownLatch shutdownGate;
private TaskExecutionMetadata taskExecutionMetadata;

public DefaultTaskExecutor(final TaskManager taskManager,
final String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals.tasks;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
Expand Down Expand Up @@ -105,6 +106,31 @@ public void shouldProcessTasks() {
taskExecutor.start();

verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(2)).process(anyLong());
verify(task, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).recordProcessBatchTime(anyLong());
}


@Test
public void shouldClearTaskTimeoutOnProcessed() {
when(taskExecutionMetadata.canProcessTask(any(), anyLong())).thenReturn(true);
when(task.isProcessable(anyLong())).thenReturn(true);
when(task.process(anyLong())).thenReturn(true);

taskExecutor.start();

verify(task, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).clearTaskTimeout();
}

@Test
public void shouldSetTaskTimeoutOnTimeoutException() {
final TimeoutException e = new TimeoutException();
when(taskExecutionMetadata.canProcessTask(any(), anyLong())).thenReturn(true);
when(task.isProcessable(anyLong())).thenReturn(true);
when(task.process(anyLong())).thenReturn(true).thenThrow(e);

taskExecutor.start();
verify(task, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).process(anyLong());
verify(task, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).maybeInitTaskTimeoutOrThrow(anyLong(), eq(e));
}

@Test
Expand Down

0 comments on commit 16dc983

Please sign in to comment.