Skip to content

Commit

Permalink
[SPARK-50301][SS] Make TransformWithState metrics reflect their intui…
Browse files Browse the repository at this point in the history
…tive meanings

### What changes were proposed in this pull request?

These changes make the following changes to metrics in TWS:

- `allUpdatesTimeMs` now captures the time it takes to process all the new data with the user's stateful processor.
- `timerProcessingTimeMs` was added to capture the time it takes to process all the user's timers.
- `allRemovalsTimeMs` now captures the time it takes to do TTL cleanup at the end of a micro-batch.
- `commitTimeMs` now captures _only_ the time it takes to commit the state, not the TTL cleanup.

With these metrics, a user can have a fairly clear picture of where time is being spent in a micro-batch that uses TWS:

![image](https://github.com/user-attachments/assets/87a0dc9c-c71b-4d55-8623-8970ad83adf6)

### Why are the changes needed?

The metrics today misrepresent what they're actually measuring.

### Does this PR introduce _any_ user-facing change?

Yes. Metrics for TWS are changing. However, since TWS is `private[sql]`, this shouldn't impact any real users.

### How was this patch tested?

We don't have any way to test these metrics in _any_ stateful operator for streaming today.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48862 from neilramaswamy/spark-50301.

Authored-by: Neil Ramaswamy <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
neilramaswamy authored and HeartSaVioR committed Nov 22, 2024
1 parent ea222a3 commit f5bb11c
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,20 @@ case class TransformWithStateExec(
CompletionIterator[InternalRow, Iterator[InternalRow]] = {
val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
val commitTimeMs = longMetric("commitTimeMs")
val timeoutLatencyMs = longMetric("allRemovalsTimeMs")
val timerProcessingTimeMs = longMetric("timerProcessingTimeMs")
// In TWS, allRemovalsTimeMs is the time taken to remove state due to TTL.
// It does not measure any time taken by explicit calls from the user's state processor
// that clear()s state variables.
//
// allRemovalsTimeMs is not granular enough to distinguish between user-caused removals and
// TTL-caused removals. We could leave this empty and have two custom metrics, but leaving
// this as always 0 will be confusing for users. We could also time every call to clear(), but
// that could have performance penalties. So, we choose to capture TTL-only removals.
val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")

val currentTimeNs = System.nanoTime
val updatesStartTimeNs = currentTimeNs
var timeoutProcessingStartTimeNs = currentTimeNs
var timerProcessingStartTimeNs = currentTimeNs

// If timeout is based on event time, then filter late data based on watermark
val filteredIter = watermarkPredicateForDataForLateEvents match {
Expand All @@ -360,9 +369,13 @@ case class TransformWithStateExec(
val newDataProcessorIter =
CompletionIterator[InternalRow, Iterator[InternalRow]](
processNewData(filteredIter), {
// Once the input is processed, mark the start time for timeout processing to measure
// Note: Due to the iterator lazy execution, this metric also captures the time taken
// by the upstream (consumer) operators in addition to the processing in this operator.
allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs)

// Once the input is processed, mark the start time for timer processing to measure
// it separately from the overall processing time.
timeoutProcessingStartTimeNs = System.nanoTime
timerProcessingStartTimeNs = System.nanoTime
processorHandle.setHandleState(StatefulProcessorHandleState.DATA_PROCESSED)
})

Expand All @@ -376,9 +389,10 @@ case class TransformWithStateExec(
private def getIterator(): Iterator[InternalRow] =
CompletionIterator[InternalRow, Iterator[InternalRow]](
processTimers(timeMode, processorHandle), {
// Note: `timeoutLatencyMs` also includes the time the parent operator took for
// processing output returned through iterator.
timeoutLatencyMs += NANOSECONDS.toMillis(System.nanoTime - timeoutProcessingStartTimeNs)
// Note: `timerProcessingTimeMs` also includes the time the parent operators take for
// processing output returned from the timers that fire.
timerProcessingTimeMs +=
NANOSECONDS.toMillis(System.nanoTime - timerProcessingStartTimeNs)
processorHandle.setHandleState(StatefulProcessorHandleState.TIMER_PROCESSED)
})
}
Expand All @@ -387,13 +401,12 @@ case class TransformWithStateExec(
// Return an iterator of all the rows generated by all the keys, such that when fully
// consumed, all the state updates will be committed by the state store
CompletionIterator[InternalRow, Iterator[InternalRow]](outputIterator, {
// Note: Due to the iterator lazy execution, this metric also captures the time taken
// by the upstream (consumer) operators in addition to the processing in this operator.
allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - updatesStartTimeNs)
allRemovalsTimeMs += timeTakenMs {
processorHandle.doTtlCleanup()
}

commitTimeMs += timeTakenMs {
if (isStreaming) {
// clean up any expired user state
processorHandle.doTtlCleanup()
store.commit()
} else {
store.abort()
Expand All @@ -419,6 +432,8 @@ case class TransformWithStateExec(
StatefulOperatorCustomSumMetric("numMapStateVars", "Number of map state variables"),
StatefulOperatorCustomSumMetric("numDeletedStateVars", "Number of deleted state variables"),
// metrics around timers
StatefulOperatorCustomSumMetric("timerProcessingTimeMs",
"Number of milliseconds taken to process all timers"),
StatefulOperatorCustomSumMetric("numRegisteredTimers", "Number of registered timers"),
StatefulOperatorCustomSumMetric("numDeletedTimers", "Number of deleted timers"),
StatefulOperatorCustomSumMetric("numExpiredTimers", "Number of expired timers"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,30 @@ class StatefulProcessorWithCompositeTypes extends RunningCountStatefulProcessor
}
}

// For each record, creates a timer to fire in 10 seconds that sleeps for 1 second.
class SleepingTimerProcessor extends StatefulProcessor[String, String, String] {
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {}

override def handleInputRows(
key: String,
inputRows: Iterator[String],
timerValues: TimerValues): Iterator[String] = {
inputRows.flatMap { _ =>
val currentTime = timerValues.getCurrentProcessingTimeInMs()
getHandle.registerTimer(currentTime + 10000)
None
}
}

override def handleExpiredTimer(
key: String,
timerValues: TimerValues,
expiredTimerInfo: ExpiredTimerInfo): Iterator[String] = {
Thread.sleep(1000)
Iterator.single(key)
}
}

/**
* Class that adds tests for transformWithState stateful streaming operator
*/
Expand Down Expand Up @@ -708,6 +732,52 @@ class TransformWithStateSuite extends StateStoreMetricsTest
)
}

test("transformWithState - timer duration should be reflected in metrics") {
val clock = new StreamManualClock
val inputData = MemoryStream[String]
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(
new SleepingTimerProcessor, TimeMode.ProcessingTime(), OutputMode.Update())

testStream(result, OutputMode.Update())(
StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
// Side effect: timer scheduled for t = 1 + 10 = 11.
CheckNewAnswer(),
Execute { q =>
val metrics = q.lastProgress.stateOperators(0).customMetrics
assert(metrics.get("numRegisteredTimers") === 1)
assert(metrics.get("timerProcessingTimeMs") < 2000)
},

AddData(inputData, "b"),
AdvanceManualClock(1 * 1000),
// Side effect: timer scheduled for t = 2 + 10 = 12.
CheckNewAnswer(),
Execute { q =>
val metrics = q.lastProgress.stateOperators(0).customMetrics
assert(metrics.get("numRegisteredTimers") === 1)
assert(metrics.get("timerProcessingTimeMs") < 2000)
},

AddData(inputData, "c"),
// Time is currently 2 and we need to advance past 12. So, advance by 11 seconds.
AdvanceManualClock(11 * 1000),
CheckNewAnswer("a", "b"),
Execute { q =>
val metrics = q.lastProgress.stateOperators(0).customMetrics
assert(metrics.get("numRegisteredTimers") === 1)

// Both timers should have fired and taken 1 second each to process.
assert(metrics.get("timerProcessingTimeMs") >= 2000)
},

StopStream
)
}

test("Use statefulProcessor without transformWithState - handle should be absent") {
val processor = new RunningCountStatefulProcessor()
val ex = intercept[Exception] {
Expand Down

0 comments on commit f5bb11c

Please sign in to comment.