From c8b7d48ba98a715c8b3e8ae28129d9af71433885 Mon Sep 17 00:00:00 2001 From: Manjunath Davanam Date: Tue, 3 Sep 2024 10:59:27 +0530 Subject: [PATCH] Obsrv V2 Release (#19) * #OBS-I182: Fix the issue with cache indexer * #OBS-I182: Cache Indexer fix |Removed the kafka-client and casting the number to long value * #OBS-I182: Cache Indexer fix | Removing the obsrv_meta information before indexing into cache --------- Co-authored-by: Santhosh Vasabhaktula --- .../BaseDatasetProcessFunction.scala | 4 +-- .../sunbird/obsrv/core/serde/SerdeUtil.scala | 34 +++++++++++++++++++ .../obsrv/core/streaming/BaseStreamTask.scala | 7 ++++ .../core/streaming/FlinkKafkaConnector.scala | 9 +++++ pipeline/cache-indexer/pom.xml | 12 ------- .../streaming/CacheIndexerStreamTask.scala | 2 +- .../sunbird/obsrv/util/MasterDataCache.scala | 9 +++-- .../sunbird/obsrv/fixture/EventFixture.scala | 10 +++--- 8 files changed, 65 insertions(+), 22 deletions(-) diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/streaming/BaseDatasetProcessFunction.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/streaming/BaseDatasetProcessFunction.scala index fede2b54..9c454ec0 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/streaming/BaseDatasetProcessFunction.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/streaming/BaseDatasetProcessFunction.scala @@ -28,11 +28,11 @@ trait SystemEventHandler { } private def getTime(timespans: Map[String, AnyRef], producer: Producer): Option[Long] = { - timespans.get(producer.toString).map(f => f.asInstanceOf[Long]) + timespans.get(producer.toString).map(f => f.asInstanceOf[Number].longValue()) } private def getStat(obsrvMeta: Map[String, AnyRef], stat: Stats): Option[Long] = { - obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Long]) + obsrvMeta.get(stat.toString).map(f => f.asInstanceOf[Number].longValue()) } def getError(error: ErrorConstants.Error, producer: Producer, functionalError: FunctionalError): Option[ErrorLog] = { diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala b/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala index 370353c7..d68b924a 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/serde/SerdeUtil.scala @@ -46,6 +46,40 @@ class MapDeserializationSchema extends KafkaRecordDeserializationSchema[mutable. } +class TopicDeserializationSchema extends KafkaRecordDeserializationSchema[mutable.Map[String, AnyRef]] { + + private val serialVersionUID = -3224825136576915426L + + override def getProducedType: TypeInformation[mutable.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[mutable.Map[String, AnyRef]]) + + override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[mutable.Map[String, AnyRef]]): Unit = { + val msg = try { + val event = JSONUtil.deserialize[Map[String, AnyRef]](record.value()) + mutable.Map[String, AnyRef]( + "dataset" -> record.topic(), + "event" -> event + ) + } catch { + case _: Exception => + mutable.Map[String, AnyRef](Constants.INVALID_JSON -> new String(record.value, "UTF-8")) + } + initObsrvMeta(msg, record) + out.collect(msg) + } + + private def initObsrvMeta(msg: mutable.Map[String, AnyRef], record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = { + if (!msg.contains("obsrv_meta")) { + msg.put("obsrv_meta", Map( + "syncts" -> record.timestamp(), + "processingStartTime" -> System.currentTimeMillis(), + "flags" -> Map(), + "timespans" -> Map(), + "error" -> Map() + )) + } + } +} + class StringDeserializationSchema extends KafkaRecordDeserializationSchema[String] { private val serialVersionUID = -3224825136576915426L diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseStreamTask.scala b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseStreamTask.scala index 8ebdb8a7..bdc897da 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseStreamTask.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseStreamTask.scala @@ -38,6 +38,13 @@ abstract class BaseStreamTask[T] extends BaseStreamTaskSink[T] { .rebalance() } + def getTopicMapDataStream(env: StreamExecutionEnvironment, config: BaseJobConfig[T], kafkaTopics: List[String], + consumerSourceName: String, kafkaConnector: FlinkKafkaConnector): DataStream[mutable.Map[String, AnyRef]] = { + env.fromSource(kafkaConnector.kafkaTopicMapSource(kafkaTopics), WatermarkStrategy.noWatermarks[mutable.Map[String, AnyRef]](), consumerSourceName) + .uid(consumerSourceName).setParallelism(config.kafkaConsumerParallelism) + .rebalance() + } + def getStringDataStream(env: StreamExecutionEnvironment, config: BaseJobConfig[T], kafkaConnector: FlinkKafkaConnector): DataStream[String] = { env.fromSource(kafkaConnector.kafkaStringSource(config.inputTopic()), WatermarkStrategy.noWatermarks[String](), config.inputConsumer()) .uid(config.inputConsumer()).setParallelism(config.kafkaConsumerParallelism) diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/FlinkKafkaConnector.scala b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/FlinkKafkaConnector.scala index 508e1e7c..39552dd7 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/FlinkKafkaConnector.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/FlinkKafkaConnector.scala @@ -47,6 +47,15 @@ class FlinkKafkaConnector(config: BaseJobConfig[_]) extends Serializable { .build() } + def kafkaTopicMapSource(kafkaTopics: List[String]): KafkaSource[mutable.Map[String, AnyRef]] = { + KafkaSource.builder[mutable.Map[String, AnyRef]]() + .setTopics(kafkaTopics.asJava) + .setDeserializer(new TopicDeserializationSchema) + .setProperties(config.kafkaConsumerProperties()) + .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) + .build() + } + def kafkaMapDynamicSink(): KafkaSink[mutable.Map[String, AnyRef]] = { KafkaSink.builder[mutable.Map[String, AnyRef]]() .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) diff --git a/pipeline/cache-indexer/pom.xml b/pipeline/cache-indexer/pom.xml index 7d9ed5a8..36d76208 100644 --- a/pipeline/cache-indexer/pom.xml +++ b/pipeline/cache-indexer/pom.xml @@ -37,24 +37,12 @@ org.sunbird.obsrv dataset-registry 1.0.0 - - - org.apache.kafka - kafka-clients - - org.json4s json4s-native_${scala.maj.version} 4.0.6 - - org.apache.kafka - kafka-clients - ${kafka.version} - test - org.apache.kafka kafka_${scala.maj.version} diff --git a/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/streaming/CacheIndexerStreamTask.scala b/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/streaming/CacheIndexerStreamTask.scala index 61b9ddec..735440b7 100644 --- a/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/streaming/CacheIndexerStreamTask.scala +++ b/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/streaming/CacheIndexerStreamTask.scala @@ -32,7 +32,7 @@ class CacheIndexerStreamTask(config: CacheIndexerConfig, kafkaConnector: FlinkKa val datasets = DatasetRegistry.getAllDatasets(Some(DatasetType.master.toString)) val datasetIds = datasets.map(f => f.id) - val dataStream = getMapDataStream(env, config, datasetIds, config.kafkaConsumerProperties(), consumerSourceName = s"cache-indexer-consumer", kafkaConnector) + val dataStream = getTopicMapDataStream(env, config, datasetIds, consumerSourceName = s"cache-indexer-consumer", kafkaConnector) processStream(dataStream) } diff --git a/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/util/MasterDataCache.scala b/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/util/MasterDataCache.scala index c3365255..c5f95f32 100644 --- a/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/util/MasterDataCache.scala +++ b/pipeline/cache-indexer/src/main/scala/org/sunbird/obsrv/util/MasterDataCache.scala @@ -1,9 +1,10 @@ package org.sunbird.obsrv.util import org.json4s.native.JsonMethods._ -import org.json4s.{JNothing, JValue} +import org.json4s.{JField, JNothing, JValue} import org.slf4j.LoggerFactory import org.sunbird.obsrv.core.cache.RedisConnect +import org.sunbird.obsrv.core.model.Constants.OBSRV_META import org.sunbird.obsrv.model.DatasetModels.Dataset import org.sunbird.obsrv.pipeline.task.CacheIndexerConfig import redis.clients.jedis.Jedis @@ -37,7 +38,11 @@ class MasterDataCache(val config: CacheIndexerConfig) { def process(dataset: Dataset, key: String, event: JValue): (Int, Int) = { val jedis = this.datasetPipelineMap(dataset.id) val dataFromCache = getDataFromCache(dataset, key, jedis) - updateCache(dataset, dataFromCache, key, event, jedis) + val updatedEvent = event.removeField { + case JField(OBSRV_META, _) => true + case _ => false + } + updateCache(dataset, dataFromCache, key, updatedEvent, jedis) (if (dataFromCache == null) 1 else 0, if (dataFromCache == null) 0 else 1) } diff --git a/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/fixture/EventFixture.scala b/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/fixture/EventFixture.scala index cf28aec5..078cde33 100644 --- a/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/fixture/EventFixture.scala +++ b/pipeline/cache-indexer/src/test/scala/org/sunbird/obsrv/fixture/EventFixture.scala @@ -2,9 +2,9 @@ package org.sunbird.obsrv.fixture object EventFixture { - val VALID_BATCH_EVENT_D3_INSERT = """{"dataset":"dataset3","event":{"code":"HYUN-CRE-D6","manufacturer":"Hyundai","model":"Creta","variant":"SX(O)","modelYear":"2023","price":"2200000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"}}}""" - val VALID_BATCH_EVENT_D3_INSERT_2 = """{"dataset":"dataset3","event":{"code":"HYUN-TUC-D6","manufacturer":"Hyundai","model":"Tucson","variant":"Signature","modelYear":"2023","price":"4000000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"admin.hyun@gmail.com","locationId":"KUN134567"}}}""" - val VALID_BATCH_EVENT_D3_UPDATE = """{"dataset":"dataset3","event":{"code":"HYUN-CRE-D6","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"},"safety":"3 Star (Global NCAP)","seatingCapacity":5}}""" - val VALID_BATCH_EVENT_D4 = """{"dataset":"dataset4","event":{"code":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}}""" - val INVALID_BATCH_EVENT_D4 = """{"dataset":"dataset4","event":{"code1":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}}""" + val VALID_BATCH_EVENT_D3_INSERT = """{"code":"HYUN-CRE-D6","manufacturer":"Hyundai","model":"Creta","variant":"SX(O)","modelYear":"2023","price":"2200000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"}}""" + val VALID_BATCH_EVENT_D3_INSERT_2 = """{"code":"HYUN-TUC-D6","manufacturer":"Hyundai","model":"Tucson","variant":"Signature","modelYear":"2023","price":"4000000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","dealer":{"email":"admin.hyun@gmail.com","locationId":"KUN134567"}}""" + val VALID_BATCH_EVENT_D3_UPDATE = """{"code":"HYUN-CRE-D6","dealer":{"email":"john.doe@example.com","locationId":"KUN12345"},"safety":"3 Star (Global NCAP)","seatingCapacity":5}""" + val VALID_BATCH_EVENT_D4 = """{"code":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}""" + val INVALID_BATCH_EVENT_D4 = """{"code1":"JEEP-CP-D3","manufacturer":"Jeep","model":"Compass","variant":"Model S (O) Diesel 4x4 AT","modelYear":"2023","price":"3800000","currencyCode":"INR","currency":"Indian Rupee","transmission":"automatic","fuel":"Diesel","safety":"5 Star (Euro NCAP)","seatingCapacity":5}""" }