Skip to content

Commit

Permalink
Merge pull request #106 from Sanketika-Obsrv/release-1.4.0
Browse files Browse the repository at this point in the history
Release 1.4.0
  • Loading branch information
ravismula authored Jan 3, 2025
2 parents 46256e8 + b71821c commit 7e92b40
Show file tree
Hide file tree
Showing 17 changed files with 176 additions and 95 deletions.
4 changes: 2 additions & 2 deletions framework/src/main/resources/baseconfig.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ kafka {
linger.ms = 10
compression = "snappy"
}
output.system.event.topic = ${job.env}".system.events"
output.failed.topic = ${job.env}".failed"
output.system.event.topic = "system.events"
output.failed.topic = "failed"
}

job {
Expand Down
4 changes: 2 additions & 2 deletions pipeline/cache-indexer/src/main/resources/cache-indexer.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
include "baseconfig.conf"

kafka {
output.failed.topic = ${job.env}".masterdata.failed"
groupId = ${job.env}"-cache-indexer-group"
output.failed.topic = "masterdata.failed"
groupId = "cache-indexer-group"
producer {
max-request-size = 5242880
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/cache-indexer/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ include "base-test.conf"

kafka {

output.failed.topic = ${job.env}".masterdata.failed"
output.failed.topic = "masterdata.failed"
groupId = ${job.env}"-cache-indexer-group"
producer {
max-request-size = 5242880
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".transform"
stats.topic = ${job.env}".stats"
groupId = ${job.env}"-druid-router-group"
input.topic = "transform"
stats.topic = "stats"
groupId = "druid-router-group"
}

task {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".unique"
output.denorm.topic = ${job.env}".denorm"
output.denorm.failed.topic = ${job.env}".failed"
groupId = ${job.env}"-denormalizer-group"
input.topic = "unique"
output.denorm.topic = "denorm"
output.denorm.failed.topic = "failed"
groupId = "denormalizer-group"
}

task {
Expand Down
10 changes: 5 additions & 5 deletions pipeline/extractor/src/main/resources/extractor.conf
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".ingest"
output.raw.topic = ${job.env}".raw"
output.extractor.duplicate.topic = ${job.env}".failed"
output.batch.failed.topic = ${job.env}".failed"
input.topic = "ingest"
output.raw.topic = "raw"
output.extractor.duplicate.topic = "failed"
output.batch.failed.topic = "failed"
event.max.size = "1048576" # Max is only 1MB
groupId = ${job.env}"-extractor-group"
groupId = "extractor-group"
producer {
max-request-size = 5242880
}
Expand Down
8 changes: 4 additions & 4 deletions pipeline/hudi-connector/src/main/resources/hudi-writer.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".hudi.connector.in"
output.topic = ${job.env}".hudi.connector.out"
output.invalid.topic = ${job.env}".failed"
input.topic = "hudi.connector.in"
output.topic = "hudi.connector.out"
output.invalid.topic = "failed"
event.max.size = "1048576" # Max is only 1MB
groupId = ${job.env}"-hudi-writer-group"
groupId = "hudi-writer-group"
producer {
max-request-size = 5242880
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,68 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.formats.common.TimestampFormat
import org.apache.flink.formats.json.JsonToRowDataConverters
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.sunbird.obsrv.util.{HudiSchemaParser, HudiSchemaSpec}
import org.apache.flink.table.data.RowData
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.util.{JSONUtil, Util}
import org.sunbird.obsrv.core.util.JSONUtil
import org.sunbird.obsrv.streaming.HudiConnectorConfig
import scala.collection.mutable.{Map => MMap}
import org.sunbird.obsrv.util.{HMetrics, HudiSchemaParser, ScalaGauge}

class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String) extends RichMapFunction[MMap[String, AnyRef], RowData] {
import scala.collection.mutable.{Map => MMap}

var jsonToRowDataConverters: JsonToRowDataConverters = _
var objectMapper: ObjectMapper = _
var hudiSchemaParser: HudiSchemaParser = _
class RowDataConverterFunction(config: HudiConnectorConfig, datasetId: String)
extends RichMapFunction[MMap[String, AnyRef], RowData] {

private val logger = LoggerFactory.getLogger(classOf[RowDataConverterFunction])

private var metrics: HMetrics = _
private var jsonToRowDataConverters: JsonToRowDataConverters = _
private var objectMapper: ObjectMapper = _
private var hudiSchemaParser: HudiSchemaParser = _

override def open(parameters: Configuration): Unit = {
super.open(parameters)

metrics = new HMetrics()
jsonToRowDataConverters = new JsonToRowDataConverters(false, true, TimestampFormat.SQL)
objectMapper = new ObjectMapper()
hudiSchemaParser = new HudiSchemaParser()

getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.gauge[Long, ScalaGauge[Long]](config.inputEventCountMetric, ScalaGauge[Long](() =>
metrics.getAndReset(datasetId, config.inputEventCountMetric)
))

getRuntimeContext.getMetricGroup
.addGroup(config.jobName)
.addGroup(datasetId)
.gauge[Long, ScalaGauge[Long]](config.failedEventCountMetric, ScalaGauge[Long](() =>
metrics.getAndReset(datasetId, config.failedEventCountMetric)
))
}

override def map(event: MMap[String, AnyRef]): RowData = {
convertToRowData(event)
try {
if (event.nonEmpty) {
metrics.increment(datasetId, config.inputEventCountMetric, 1)
}
val rowData = convertToRowData(event)
rowData
} catch {
case ex: Exception =>
metrics.increment(datasetId, config.failedEventCountMetric, 1)
logger.error("Failed to process record", ex)
throw ex
}
}

def convertToRowData(data: MMap[String, AnyRef]): RowData = {
val eventJson = JSONUtil.serialize(data)
val flattenedData = hudiSchemaParser.parseJson(datasetId, eventJson)
val rowType = hudiSchemaParser.rowTypeMap(datasetId)
val converter: JsonToRowDataConverters.JsonToRowDataConverter = jsonToRowDataConverters.createRowConverter(rowType)
val rowData = converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
rowData
val converter: JsonToRowDataConverters.JsonToRowDataConverter =
jsonToRowDataConverters.createRowConverter(rowType)
converter.convert(objectMapper.readTree(JSONUtil.serialize(flattenedData))).asInstanceOf[RowData]
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,15 @@ class HudiConnectorConfig(override val config: Config) extends BaseJobConfig[mut
val hudiCompactionTaskMemory: Int = config.getInt("hudi.write.compaction.max.memory")
val hudiFsAtomicCreationSupport: String = config.getString("hudi.fs.atomic_creation.support")

// Metrics

val inputEventCountMetric = "input-event-count"
val failedEventCountMetric = "failed-event-count"

// Metrics Exporter
val metricsReportType: String = config.getString("metrics.reporter.type")
val metricsReporterHost: String = config.getString("metrics.reporter.host")
val metricsReporterPort: String = config.getString("metrics.reporter.port")


}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,17 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink
conf.setString(FlinkOptions.PRECOMBINE_FIELD.key, datasetSchema.schema.timestampColumn)
conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key, datasetSchema.schema.partitionColumn)
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key, avroSchema.toString)

conf.setBoolean("hoodie.metrics.on", true)
if (config.metricsReportType.equalsIgnoreCase("PROMETHEUS_PUSHGATEWAY")) {
conf.setString("hoodie.metrics.reporter.type", config.metricsReportType)
conf.setString("hoodie.metrics.pushgateway.host", config.metricsReporterHost)
conf.setString("hoodie.metrics.pushgateway.port", config.metricsReporterPort)
}
if (config.metricsReportType.equalsIgnoreCase("JMX")) {
conf.setString("hoodie.metrics.reporter.type", config.metricsReportType)
conf.setString("hoodie.metrics.jmx.host", config.metricsReporterHost)
conf.setString("hoodie.metrics.jmx.port", config.metricsReporterPort)
}
val partitionField = datasetSchema.schema.columnSpec.filter(f => f.name.equalsIgnoreCase(datasetSchema.schema.partitionColumn)).head
if(partitionField.`type`.equalsIgnoreCase("timestamp") || partitionField.`type`.equalsIgnoreCase("epoch")) {
conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key, datasetSchema.schema.partitionColumn + "_partition")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.sunbird.obsrv.util


import org.apache.flink.metrics.Gauge

import scala.collection.concurrent.TrieMap

class HMetrics {
private val metricStore = TrieMap[(String, String), Long]()

def increment(dataset: String, metric: String, value: Long): Unit = {
metricStore.synchronized {
val key = (dataset, metric)
val current = metricStore.getOrElse(key, 0L)
metricStore.put(key, current + value)
}
}

def getAndReset(dataset: String, metric: String): Long = {
metricStore.synchronized {
val key = (dataset, metric)
val current = metricStore.getOrElse(key, 0L)
metricStore.remove(key)
current
}
}
}

case class ScalaGauge[T](getValueFn: () => T) extends Gauge[T] {
override def getValue: T = getValueFn()
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".masterdata.ingest"
output.raw.topic = ${job.env}".masterdata.raw"
output.extractor.duplicate.topic = ${job.env}".masterdata.failed"
output.failed.topic = ${job.env}".masterdata.failed"
output.batch.failed.topic = ${job.env}".masterdata.failed"
input.topic = "masterdata.ingest"
output.raw.topic = "masterdata.raw"
output.extractor.duplicate.topic = "masterdata.failed"
output.failed.topic = "masterdata.failed"
output.batch.failed.topic = "masterdata.failed"
event.max.size = "1048576" # Max is only 1MB
output.invalid.topic = ${job.env}".masterdata.failed"
output.unique.topic = ${job.env}".masterdata.unique"
output.duplicate.topic = ${job.env}".masterdata.failed"
output.denorm.topic = ${job.env}".masterdata.denorm"
output.transform.topic = ${job.env}".masterdata.transform"
output.transform.failed.topic = ${job.env}".masterdata.transform.failed"
stats.topic = ${job.env}".masterdata.stats"
groupId = ${job.env}"-masterdata-pipeline-group"
output.invalid.topic = "masterdata.failed"
output.unique.topic = "masterdata.unique"
output.duplicate.topic = "masterdata.failed"
output.denorm.topic = "masterdata.denorm"
output.transform.topic = "masterdata.transform"
output.transform.failed.topic = "masterdata.transform.failed"
stats.topic = "masterdata.stats"
groupId = "masterdata-pipeline-group"
producer {
max-request-size = 5242880
}
Expand Down
24 changes: 12 additions & 12 deletions pipeline/master-data-processor/src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ job {

kafka {

input.topic = ${job.env}".masterdata.ingest"
output.raw.topic = ${job.env}".masterdata.raw"
output.extractor.duplicate.topic = ${job.env}".masterdata.failed"
output.failed.topic = ${job.env}".masterdata.failed"
output.batch.failed.topic = ${job.env}".masterdata.failed"
input.topic = "masterdata.ingest"
output.raw.topic = "masterdata.raw"
output.extractor.duplicate.topic = "masterdata.failed"
output.failed.topic = "masterdata.failed"
output.batch.failed.topic = "masterdata.failed"
event.max.size = "1048576" # Max is only 1MB
output.invalid.topic = ${job.env}".masterdata.failed"
output.unique.topic = ${job.env}".masterdata.unique"
output.duplicate.topic = ${job.env}".masterdata.failed"
output.denorm.topic = ${job.env}".masterdata.denorm"
output.transform.topic = ${job.env}".masterdata.transform"
output.transform.failed.topic = ${job.env}".masterdata.transform.failed"
stats.topic = ${job.env}".masterdata.stats"
output.invalid.topic = "masterdata.failed"
output.unique.topic = "masterdata.unique"
output.duplicate.topic = "masterdata.failed"
output.denorm.topic = "masterdata.denorm"
output.transform.topic = "masterdata.transform"
output.transform.failed.topic = "masterdata.transform.failed"
stats.topic = "masterdata.stats"
groupId = ${job.env}"-masterdata-pipeline-group"
producer {
max-request-size = 5242880
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".raw"
output.invalid.topic = ${job.env}".failed"
output.unique.topic = ${job.env}".unique"
output.duplicate.topic = ${job.env}".failed"
groupId = ${job.env}"-pipeline-preprocessor-group"
input.topic = "raw"
output.invalid.topic = "failed"
output.unique.topic = "unique"
output.duplicate.topic = "failed"
groupId = "pipeline-preprocessor-group"
}

task {
Expand Down
8 changes: 4 additions & 4 deletions pipeline/transformer/src/main/resources/transformer.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".denorm"
output.transform.topic = ${job.env}".transform"
output.transform.failed.topic = ${job.env}".transform.failed"
groupId = ${job.env}"-transformer-group"
input.topic = "denorm"
output.transform.topic = "transform"
output.transform.failed.topic = "transform.failed"
groupId = "transformer-group"
producer {
max-request-size = 5242880
}
Expand Down
26 changes: 13 additions & 13 deletions pipeline/unified-pipeline/src/main/resources/unified-pipeline.conf
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
include "baseconfig.conf"

kafka {
input.topic = ${job.env}".ingest"
output.raw.topic = ${job.env}".raw"
output.extractor.duplicate.topic = ${job.env}".failed"
output.batch.failed.topic = ${job.env}".failed"
input.topic = "ingest"
output.raw.topic = "raw"
output.extractor.duplicate.topic = "failed"
output.batch.failed.topic = "failed"
event.max.size = "1048576" # Max is only 1MB
output.invalid.topic = ${job.env}".failed"
output.unique.topic = ${job.env}".unique"
output.duplicate.topic = ${job.env}".failed"
output.denorm.topic = ${job.env}".denorm"
output.denorm.failed.topic = ${job.env}".failed"
output.transform.topic = ${job.env}".transform"
output.transform.failed.topic = ${job.env}".failed"
stats.topic = ${job.env}".stats"
groupId = ${job.env}"-unified-pipeline-group"
output.invalid.topic = "failed"
output.unique.topic = "unique"
output.duplicate.topic = "failed"
output.denorm.topic = "denorm"
output.denorm.failed.topic = "failed"
output.transform.topic = "transform"
output.transform.failed.topic = "failed"
stats.topic = "stats"
groupId = "unified-pipeline-group"
producer {
max-request-size = 5242880
}
Expand Down
Loading

0 comments on commit 7e92b40

Please sign in to comment.