Skip to content

Commit

Permalink
Merge pull request #85 from Sanketika-Obsrv/develop
Browse files Browse the repository at this point in the history
Develop to Main
  • Loading branch information
ravismula authored Jun 20, 2024
2 parents 90090d2 + db99123 commit 3aafbd2
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build_and_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ jobs:
target: "merged-image"
- image: "master-data-processor"
target: "master-data-processor-image"
- image: "lakehouse-connector"
target: "lakehouse-connector-image"
steps:
- uses: actions/checkout@v4
with:
Expand Down
5 changes: 1 addition & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as mast
USER flink
COPY --from=build-pipeline /app/pipeline/master-data-processor/target/master-data-processor-1.0.0.jar $FLINK_HOME/lib

FROM --platform=linux/x86_64 flink:1.15.0-scala_2.12-java11 as hudi-connector-image
FROM --platform=linux/x86_64 sanketikahub/flink:1.15.0-scala_2.12-lakehouse as lakehouse-connector-image
USER flink
COPY ./pipeline/hudi-connector/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib
COPY ./pipeline/hudi-connector/flink-s3-fs-hadoop-1.15.2.jar $FLINK_HOME/lib
COPY ./pipeline/hudi-connector/hbase-server-2.4.13.jar $FLINK_HOME/lib
RUN mkdir $FLINK_HOME/custom-lib
COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custom-lib
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object DatasetModels {
@JsonProperty("status") status: String, @JsonProperty("connector_stats") connectorStats: Option[ConnectorStats] = None)

case class DataSource(@JsonProperty("id") id: String, @JsonProperty("datasource") datasource: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("type") `type`: String, @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String)
@JsonProperty("type") `type`: String, @JsonProperty("status") status: String, @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ object DatasetRegistryService {
val datasource = rs.getString("datasource")
val datasetId = rs.getString("dataset_id")
val datasourceType = rs.getString("type")
val datasourceStatus = rs.getString("status")
val ingestionSpec = rs.getString("ingestion_spec")
val datasourceRef = rs.getString("datasource_ref")

DataSource(id, datasource, datasetId, datasourceType, ingestionSpec, datasourceRef)
DataSource(id, datasource, datasetId, datasourceType, datasourceStatus, ingestionSpec, datasourceRef)
}

private def parseDatasetTransformation(rs: ResultSet): DatasetTransformation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ object Constants {
val EVENT = "event"
val INVALID_JSON = "invalid_json"
val OBSRV_META = "obsrv_meta"
val PROCESSING_START_TIME = "processingStartTime"
val SRC = "src"
val ERROR_CODE = "error_code"
val ERROR_MSG = "error_msg"
Expand All @@ -15,5 +16,5 @@ object Constants {
val TOPIC = "topic"
val MESSAGE = "message"
val DATALAKE_TYPE = "datalake"

val MASTER_DATASET_TYPE = "master-dataset"
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DynamicRouterFunction(config: DruidRouterConfig) extends BaseDatasetProces
event.put(config.CONST_OBSRV_META, msg(config.CONST_OBSRV_META).asInstanceOf[Map[String, AnyRef]])
val tsKeyData = TimestampKeyParser.parseTimestampKey(dataset.datasetConfig, event)
event.put("indexTS", tsKeyData.value)
if (tsKeyData.isValid) {
if (tsKeyData.isValid || dataset.datasetType.equalsIgnoreCase(Constants.MASTER_DATASET_TYPE)) {
val routerConfig = dataset.routerConfig
val topicEventMap = mutable.Map(Constants.TOPIC -> routerConfig.topic, Constants.MESSAGE -> event)
ctx.output(config.routerOutputTag, topicEventMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ExtractionFunction(config: ExtractorConfig)
context.output(config.systemEventsOutputTag, failedSystemEvent(Some(config.defaultDatasetID), ErrorConstants.ERR_INVALID_EVENT, FunctionalError.InvalidJsonData))
return
}
addStartProcessingTimeIfMissing(batchEvent)
val eventAsText = JSONUtil.serialize(batchEvent)
val datasetIdOpt = batchEvent.get(config.CONST_DATASET)
if (datasetIdOpt.isEmpty) {
Expand Down Expand Up @@ -79,6 +80,13 @@ class ExtractionFunction(config: ExtractorConfig)
}
}

private def addStartProcessingTimeIfMissing(batchEvent: mutable.Map[String, AnyRef]): Unit = {
val obsrvMeta = batchEvent(Constants.OBSRV_META).asInstanceOf[Map[String, AnyRef]]
if(!obsrvMeta.contains(Constants.PROCESSING_START_TIME)) {
batchEvent.put(Constants.OBSRV_META, obsrvMeta ++ Map("processingStartTime" -> System.currentTimeMillis()))
}
}

private def isDuplicate(dataset: Dataset, dedupKey: Option[String], event: String,
context: ProcessFunction[mutable.Map[String, AnyRef], mutable.Map[String, AnyRef]]#Context): Boolean = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink

def process(env: StreamExecutionEnvironment): Unit = {
val schemaParser = new HudiSchemaParser()
val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE))
val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live"))
dataSourceConfig.map{ dataSource =>
val datasetId = dataSource.datasetId
val dataStream = getMapDataStream(env, config, List(datasetId), config.kafkaConsumerProperties(), consumerSourceName = s"kafka-${datasetId}", kafkaConnector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HudiSchemaParser {
readSchema()

def readSchema(): Unit = {
val datasourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE))
val datasourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live"))
datasourceConfig.map{f =>
val hudiSchemaSpec = JSONUtil.deserialize[HudiSchemaSpec](f.ingestionSpec)
val dataset = hudiSchemaSpec.dataset
Expand Down
5 changes: 5 additions & 0 deletions pipeline/master-data-processor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
<artifactId>transformer</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv.pipeline</groupId>
<artifactId>druid-router</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.github.java-json-tools</groupId>
<artifactId>json-schema-validator</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.sunbird.obsrv.extractor.task.{ExtractorConfig, ExtractorStreamTask}
import org.sunbird.obsrv.pipeline.function.MasterDataProcessorFunction
import org.sunbird.obsrv.preprocessor.task.{PipelinePreprocessorConfig, PipelinePreprocessorStreamTask}
import org.sunbird.obsrv.transformer.task.{TransformerConfig, TransformerStreamTask}
import org.sunbird.obsrv.router.task.{DruidRouterConfig, DynamicRouterStreamTask}

import java.io.File
import scala.collection.mutable
Expand Down Expand Up @@ -50,6 +51,7 @@ class MasterDataProcessorStreamTask(config: Config, masterDataConfig: MasterData
val preprocessorTask = new PipelinePreprocessorStreamTask(new PipelinePreprocessorConfig(config), kafkaConnector)
val denormalizerTask = new DenormalizerStreamTask(new DenormalizerConfig(config), kafkaConnector)
val transformerTask = new TransformerStreamTask(new TransformerConfig(config), kafkaConnector)
val routerTask = new DynamicRouterStreamTask(new DruidRouterConfig(config), kafkaConnector)

val transformedStream = transformerTask.processStream(
denormalizerTask.processStream(
Expand All @@ -67,6 +69,7 @@ class MasterDataProcessorStreamTask(config: Config, masterDataConfig: MasterData

addDefaultSinks(processedStream, masterDataConfig, kafkaConnector)
processedStream.getSideOutput(masterDataConfig.successTag())
routerTask.processStream(transformedStream)
}
}

Expand Down

0 comments on commit 3aafbd2

Please sign in to comment.