Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop to Main #85

Merged
merged 11 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build_and_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ jobs:
target: "merged-image"
- image: "master-data-processor"
target: "master-data-processor-image"
- image: "lakehouse-connector"
target: "lakehouse-connector-image"
steps:
- uses: actions/checkout@v4
with:
Expand Down
5 changes: 1 addition & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as mast
USER flink
COPY --from=build-pipeline /app/pipeline/master-data-processor/target/master-data-processor-1.0.0.jar $FLINK_HOME/lib

FROM --platform=linux/x86_64 flink:1.15.0-scala_2.12-java11 as hudi-connector-image
FROM --platform=linux/x86_64 sanketikahub/flink:1.15.0-scala_2.12-lakehouse as lakehouse-connector-image
USER flink
COPY ./pipeline/hudi-connector/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib
COPY ./pipeline/hudi-connector/flink-s3-fs-hadoop-1.15.2.jar $FLINK_HOME/lib
COPY ./pipeline/hudi-connector/hbase-server-2.4.13.jar $FLINK_HOME/lib
RUN mkdir $FLINK_HOME/custom-lib
COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custom-lib
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object DatasetModels {
@JsonProperty("status") status: String, @JsonProperty("connector_stats") connectorStats: Option[ConnectorStats] = None)

case class DataSource(@JsonProperty("id") id: String, @JsonProperty("datasource") datasource: String, @JsonProperty("dataset_id") datasetId: String,
@JsonProperty("type") `type`: String, @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String)
@JsonProperty("type") `type`: String, @JsonProperty("status") status: String, @JsonProperty("ingestion_spec") ingestionSpec: String, @JsonProperty("datasource_ref") datasourceRef: String)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ object DatasetRegistryService {
val datasource = rs.getString("datasource")
val datasetId = rs.getString("dataset_id")
val datasourceType = rs.getString("type")
val datasourceStatus = rs.getString("status")
val ingestionSpec = rs.getString("ingestion_spec")
val datasourceRef = rs.getString("datasource_ref")

DataSource(id, datasource, datasetId, datasourceType, ingestionSpec, datasourceRef)
DataSource(id, datasource, datasetId, datasourceType, datasourceStatus, ingestionSpec, datasourceRef)
}

private def parseDatasetTransformation(rs: ResultSet): DatasetTransformation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ object Constants {
val EVENT = "event"
val INVALID_JSON = "invalid_json"
val OBSRV_META = "obsrv_meta"
val PROCESSING_START_TIME = "processingStartTime"
val SRC = "src"
val ERROR_CODE = "error_code"
val ERROR_MSG = "error_msg"
Expand All @@ -15,5 +16,5 @@ object Constants {
val TOPIC = "topic"
val MESSAGE = "message"
val DATALAKE_TYPE = "datalake"

val MASTER_DATASET_TYPE = "master-dataset"
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DynamicRouterFunction(config: DruidRouterConfig) extends BaseDatasetProces
event.put(config.CONST_OBSRV_META, msg(config.CONST_OBSRV_META).asInstanceOf[Map[String, AnyRef]])
val tsKeyData = TimestampKeyParser.parseTimestampKey(dataset.datasetConfig, event)
event.put("indexTS", tsKeyData.value)
if (tsKeyData.isValid) {
if (tsKeyData.isValid || dataset.datasetType.equalsIgnoreCase(Constants.MASTER_DATASET_TYPE)) {
val routerConfig = dataset.routerConfig
val topicEventMap = mutable.Map(Constants.TOPIC -> routerConfig.topic, Constants.MESSAGE -> event)
ctx.output(config.routerOutputTag, topicEventMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ExtractionFunction(config: ExtractorConfig)
context.output(config.systemEventsOutputTag, failedSystemEvent(Some(config.defaultDatasetID), ErrorConstants.ERR_INVALID_EVENT, FunctionalError.InvalidJsonData))
return
}
addStartProcessingTimeIfMissing(batchEvent)
val eventAsText = JSONUtil.serialize(batchEvent)
val datasetIdOpt = batchEvent.get(config.CONST_DATASET)
if (datasetIdOpt.isEmpty) {
Expand Down Expand Up @@ -79,6 +80,13 @@ class ExtractionFunction(config: ExtractorConfig)
}
}

private def addStartProcessingTimeIfMissing(batchEvent: mutable.Map[String, AnyRef]): Unit = {
val obsrvMeta = batchEvent(Constants.OBSRV_META).asInstanceOf[Map[String, AnyRef]]
if(!obsrvMeta.contains(Constants.PROCESSING_START_TIME)) {
batchEvent.put(Constants.OBSRV_META, obsrvMeta ++ Map("processingStartTime" -> System.currentTimeMillis()))
}
}

private def isDuplicate(dataset: Dataset, dedupKey: Option[String], event: String,
context: ProcessFunction[mutable.Map[String, AnyRef], mutable.Map[String, AnyRef]]#Context): Boolean = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class HudiConnectorStreamTask(config: HudiConnectorConfig, kafkaConnector: Flink

def process(env: StreamExecutionEnvironment): Unit = {
val schemaParser = new HudiSchemaParser()
val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE))
val dataSourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live"))
dataSourceConfig.map{ dataSource =>
val datasetId = dataSource.datasetId
val dataStream = getMapDataStream(env, config, List(datasetId), config.kafkaConsumerProperties(), consumerSourceName = s"kafka-${datasetId}", kafkaConnector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HudiSchemaParser {
readSchema()

def readSchema(): Unit = {
val datasourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE))
val datasourceConfig = DatasetRegistry.getAllDatasources().filter(f => f.`type`.nonEmpty && f.`type`.equalsIgnoreCase(Constants.DATALAKE_TYPE) && f.status.equalsIgnoreCase("Live"))
datasourceConfig.map{f =>
val hudiSchemaSpec = JSONUtil.deserialize[HudiSchemaSpec](f.ingestionSpec)
val dataset = hudiSchemaSpec.dataset
Expand Down
5 changes: 5 additions & 0 deletions pipeline/master-data-processor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
<artifactId>transformer</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv.pipeline</groupId>
<artifactId>druid-router</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.github.java-json-tools</groupId>
<artifactId>json-schema-validator</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.sunbird.obsrv.extractor.task.{ExtractorConfig, ExtractorStreamTask}
import org.sunbird.obsrv.pipeline.function.MasterDataProcessorFunction
import org.sunbird.obsrv.preprocessor.task.{PipelinePreprocessorConfig, PipelinePreprocessorStreamTask}
import org.sunbird.obsrv.transformer.task.{TransformerConfig, TransformerStreamTask}
import org.sunbird.obsrv.router.task.{DruidRouterConfig, DynamicRouterStreamTask}

import java.io.File
import scala.collection.mutable
Expand Down Expand Up @@ -50,6 +51,7 @@ class MasterDataProcessorStreamTask(config: Config, masterDataConfig: MasterData
val preprocessorTask = new PipelinePreprocessorStreamTask(new PipelinePreprocessorConfig(config), kafkaConnector)
val denormalizerTask = new DenormalizerStreamTask(new DenormalizerConfig(config), kafkaConnector)
val transformerTask = new TransformerStreamTask(new TransformerConfig(config), kafkaConnector)
val routerTask = new DynamicRouterStreamTask(new DruidRouterConfig(config), kafkaConnector)

val transformedStream = transformerTask.processStream(
denormalizerTask.processStream(
Expand All @@ -67,6 +69,7 @@ class MasterDataProcessorStreamTask(config: Config, masterDataConfig: MasterData

addDefaultSinks(processedStream, masterDataConfig, kafkaConnector)
processedStream.getSideOutput(masterDataConfig.successTag())
routerTask.processStream(transformedStream)
}
}

Expand Down
Loading