From 0a7806dc49d8b286e688c27c6f32b432ccc9d93d Mon Sep 17 00:00:00 2001 From: Saulius Valatka Date: Wed, 18 Sep 2024 23:07:40 +0300 Subject: [PATCH] add metrics for tracking last committed watermark for each topic partition --- .../sink/batch/RecordBatchingSinker.scala | 19 ++++++++++++----- .../adform/streamloader/util/Metrics.scala | 21 ++++++++++++++++--- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala b/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala index dc8d825a..3b08a8f9 100644 --- a/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala +++ b/stream-loader-core/src/main/scala/com/adform/streamloader/sink/batch/RecordBatchingSinker.scala @@ -9,11 +9,11 @@ package com.adform.streamloader.sink.batch import com.adform.streamloader.model._ -import com.adform.streamloader.util.Retry._ -import com.adform.streamloader.util._ import com.adform.streamloader.sink.PartitionGroupSinker import com.adform.streamloader.sink.batch.storage.RecordBatchStorage import com.adform.streamloader.source.KafkaContext +import com.adform.streamloader.util.Retry._ +import com.adform.streamloader.util._ import io.micrometer.core.instrument.{Counter, Gauge, Meter, Timer} import org.apache.kafka.common.TopicPartition @@ -71,10 +71,14 @@ class RecordBatchingSinker[B <: RecordBatch]( batchStorage.commitBatch(batch) } ) + + batch.recordRanges.foreach(range => { + Metrics.committedWatermarks(range.topicPartition).set(range.end.watermark.millis.toDouble) + }) + if (!batch.discard()) { log.warn("Failed discarding batch") } - } catch { case e if isInterruptionException(e) => log.debug("Batch commit thread interrupted") @@ -175,6 +179,11 @@ class RecordBatchingSinker[B <: RecordBatch]( val recordsWritten: Map[TopicPartition, Counter] = groupPartitions.map(tp => tp -> createCounter("records.written", commonTags ++ partitionTags(tp))).toMap + val committedWatermarks: Map[TopicPartition, AssignableGauge] = + groupPartitions + .map(tp => tp -> createAssignableGauge("committed.watermark.ms", commonTags ++ partitionTags(tp))) + .toMap + val recordsBatched: Map[TopicPartition, Counter] = groupPartitions.map(tp => tp -> createCounter("records.batched", commonTags ++ partitionTags(tp))).toMap @@ -182,7 +191,7 @@ class RecordBatchingSinker[B <: RecordBatch]( val commitQueueSize: Gauge = createGauge("commit.queue.size", self, (_: RecordBatchingSinker[B]) => self.commitQueue.size(), commonTags) - val allMeters: Seq[Meter] = - Seq(commitDuration, commitQueueSize) ++ recordsWritten.values + val allMeters: Seq[Meter] = Seq(commitDuration, commitQueueSize) ++ + recordsWritten.values ++ committedWatermarks.values.map(_.underlying) ++ recordsBatched.values } } diff --git a/stream-loader-core/src/main/scala/com/adform/streamloader/util/Metrics.scala b/stream-loader-core/src/main/scala/com/adform/streamloader/util/Metrics.scala index 33bd790e..af05fab9 100644 --- a/stream-loader-core/src/main/scala/com/adform/streamloader/util/Metrics.scala +++ b/stream-loader-core/src/main/scala/com/adform/streamloader/util/Metrics.scala @@ -8,12 +8,11 @@ package com.adform.streamloader.util -import java.time.Duration -import java.util.function.ToDoubleFunction - import io.micrometer.core.instrument._ import io.micrometer.core.instrument.composite.CompositeMeterRegistry +import java.time.Duration +import java.util.function.ToDoubleFunction import scala.collection.concurrent import scala.collection.concurrent.TrieMap @@ -65,6 +64,22 @@ trait Metrics { .register(Metrics.registry) } + class AssignableGauge(name: String, tags: Seq[MetricTag]) { + private var currentValue: Double = 0.0 + lazy val underlying: Gauge = createGauge(name, this, (_: AssignableGauge) => currentValue, tags) + + def set(value: Double): Double = { + currentValue = value + underlying.value() + } + + def close(): Unit = underlying.close() + } + + protected def createAssignableGauge(name: String, tags: Seq[MetricTag] = Seq()): AssignableGauge = { + new AssignableGauge(name, tags) + } + protected def createDistribution(name: String, tags: Seq[MetricTag] = Seq()): DistributionSummary = DistributionSummary .builder(joinPrefixes(Seq(metricsRoot, name)))