Skip to content

Commit

Permalink
add metrics for tracking last committed watermark for each topic part…
Browse files Browse the repository at this point in the history
…ition
  • Loading branch information
sauliusvl committed Sep 18, 2024
1 parent b8632ce commit 0a7806d
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
package com.adform.streamloader.sink.batch

import com.adform.streamloader.model._
import com.adform.streamloader.util.Retry._
import com.adform.streamloader.util._
import com.adform.streamloader.sink.PartitionGroupSinker
import com.adform.streamloader.sink.batch.storage.RecordBatchStorage
import com.adform.streamloader.source.KafkaContext
import com.adform.streamloader.util.Retry._
import com.adform.streamloader.util._
import io.micrometer.core.instrument.{Counter, Gauge, Meter, Timer}
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -71,10 +71,14 @@ class RecordBatchingSinker[B <: RecordBatch](
batchStorage.commitBatch(batch)
}
)

batch.recordRanges.foreach(range => {
Metrics.committedWatermarks(range.topicPartition).set(range.end.watermark.millis.toDouble)
})

if (!batch.discard()) {
log.warn("Failed discarding batch")
}

} catch {
case e if isInterruptionException(e) =>
log.debug("Batch commit thread interrupted")
Expand Down Expand Up @@ -175,14 +179,19 @@ class RecordBatchingSinker[B <: RecordBatch](
val recordsWritten: Map[TopicPartition, Counter] =
groupPartitions.map(tp => tp -> createCounter("records.written", commonTags ++ partitionTags(tp))).toMap

val committedWatermarks: Map[TopicPartition, AssignableGauge] =
groupPartitions
.map(tp => tp -> createAssignableGauge("committed.watermark.ms", commonTags ++ partitionTags(tp)))
.toMap

val recordsBatched: Map[TopicPartition, Counter] =
groupPartitions.map(tp => tp -> createCounter("records.batched", commonTags ++ partitionTags(tp))).toMap

val commitDuration: Timer = createTimer("commit.duration", commonTags, maxDuration = Duration.ofMinutes(5))
val commitQueueSize: Gauge =
createGauge("commit.queue.size", self, (_: RecordBatchingSinker[B]) => self.commitQueue.size(), commonTags)

val allMeters: Seq[Meter] =
Seq(commitDuration, commitQueueSize) ++ recordsWritten.values
val allMeters: Seq[Meter] = Seq(commitDuration, commitQueueSize) ++
recordsWritten.values ++ committedWatermarks.values.map(_.underlying) ++ recordsBatched.values
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

package com.adform.streamloader.util

import java.time.Duration
import java.util.function.ToDoubleFunction

import io.micrometer.core.instrument._
import io.micrometer.core.instrument.composite.CompositeMeterRegistry

import java.time.Duration
import java.util.function.ToDoubleFunction
import scala.collection.concurrent
import scala.collection.concurrent.TrieMap

Expand Down Expand Up @@ -65,6 +64,22 @@ trait Metrics {
.register(Metrics.registry)
}

class AssignableGauge(name: String, tags: Seq[MetricTag]) {
private var currentValue: Double = 0.0
lazy val underlying: Gauge = createGauge(name, this, (_: AssignableGauge) => currentValue, tags)

def set(value: Double): Double = {
currentValue = value
underlying.value()
}

def close(): Unit = underlying.close()
}

protected def createAssignableGauge(name: String, tags: Seq[MetricTag] = Seq()): AssignableGauge = {
new AssignableGauge(name, tags)
}

protected def createDistribution(name: String, tags: Seq[MetricTag] = Seq()): DistributionSummary =
DistributionSummary
.builder(joinPrefixes(Seq(metricsRoot, name)))
Expand Down

0 comments on commit 0a7806d

Please sign in to comment.