From 16dc983ad67767ee8debd125a3f8b150a91c7acf Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Thu, 31 Aug 2023 22:21:01 +0200 Subject: [PATCH] Kafka Streams Threading: Timeout behavior (#14171) Implement setting and clearing task timeouts, as well as changing the output on exceptions to make it similar to the existing code path. Reviewer: Walker Carlson --- .../internals/TaskExecutionMetadata.java | 2 +- .../internals/tasks/DefaultTaskExecutor.java | 51 +++++++++++++++++-- .../tasks/DefaultTaskExecutorTest.java | 26 ++++++++++ 3 files changed, 74 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java index bd1515c7bce0..9f6667305c7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java @@ -97,7 +97,7 @@ Collection successfullyProcessed() { return successfullyProcessed; } - void addToSuccessfullyProcessed(final Task task) { + public void addToSuccessfullyProcessed(final Task task) { successfullyProcessed.add(task); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index f15631fa207b..baddc6286a2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -16,13 +16,18 @@ */ package org.apache.kafka.streams.processor.internals.tasks; +import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2; + import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.internals.ReadOnlyTask; import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata; import org.slf4j.Logger; @@ -93,9 +98,9 @@ private void runOnce(final long nowMs) { boolean progressed = false; if (taskExecutionMetadata.canProcessTask(currentTask, nowMs) && currentTask.isProcessable(nowMs)) { - log.trace("processing record for task {}", currentTask.id()); - currentTask.process(nowMs); - progressed = true; + if (processTask(currentTask, nowMs, time)) { + progressed = true; + } } if (taskExecutionMetadata.canPunctuateTask(currentTask)) { @@ -115,6 +120,44 @@ private void runOnce(final long nowMs) { } } + private boolean processTask(final Task task, final long now, final Time time) { + boolean processed = false; + try { + processed = task.process(now); + if (processed) { + task.clearTaskTimeout(); + // TODO: enable regardless of whether using named topologies + if (taskExecutionMetadata.hasNamedTopologies() && taskExecutionMetadata.processingMode() != EXACTLY_ONCE_V2) { + log.trace("Successfully processed task {}", task.id()); + taskExecutionMetadata.addToSuccessfullyProcessed(task); + } + } + } catch (final TimeoutException timeoutException) { + // TODO consolidate TimeoutException retries with general error handling + task.maybeInitTaskTimeoutOrThrow(now, timeoutException); + log.error( + String.format( + "Could not complete processing records for %s due to the following exception; will move to next task and retry later", + task.id()), + timeoutException + ); + } catch (final TaskMigratedException e) { + log.info("Failed to process stream task {} since it got migrated to another thread already. " + + "Will trigger a new rebalance and close all tasks as zombies together.", task.id()); + throw e; + } catch (final StreamsException e) { + log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e); + e.setTaskId(task.id()); + throw e; + } catch (final RuntimeException e) { + log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e); + throw new StreamsException(e, task.id()); + } finally { + task.recordProcessBatchTime(time.milliseconds() - now); + } + return processed; + } + private StreamTask unassignCurrentTask() { if (currentTask == null) throw new IllegalStateException("Does not own any task while being ask to unassign from task manager"); @@ -133,11 +176,11 @@ private StreamTask unassignCurrentTask() { private final Time time; private final String name; private final TaskManager taskManager; + private final TaskExecutionMetadata taskExecutionMetadata; private StreamTask currentTask = null; private TaskExecutorThread taskExecutorThread = null; private CountDownLatch shutdownGate; - private TaskExecutionMetadata taskExecutionMetadata; public DefaultTaskExecutor(final TaskManager taskManager, final String name, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java index 52eaad62194e..cd400bef415e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals.tasks; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.StreamsException; @@ -105,6 +106,31 @@ public void shouldProcessTasks() { taskExecutor.start(); verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(2)).process(anyLong()); + verify(task, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).recordProcessBatchTime(anyLong()); + } + + + @Test + public void shouldClearTaskTimeoutOnProcessed() { + when(taskExecutionMetadata.canProcessTask(any(), anyLong())).thenReturn(true); + when(task.isProcessable(anyLong())).thenReturn(true); + when(task.process(anyLong())).thenReturn(true); + + taskExecutor.start(); + + verify(task, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).clearTaskTimeout(); + } + + @Test + public void shouldSetTaskTimeoutOnTimeoutException() { + final TimeoutException e = new TimeoutException(); + when(taskExecutionMetadata.canProcessTask(any(), anyLong())).thenReturn(true); + when(task.isProcessable(anyLong())).thenReturn(true); + when(task.process(anyLong())).thenReturn(true).thenThrow(e); + + taskExecutor.start(); + verify(task, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).process(anyLong()); + verify(task, timeout(VERIFICATION_TIMEOUT).atLeastOnce()).maybeInitTaskTimeoutOrThrow(anyLong(), eq(e)); } @Test