From fae940ed41fb0094b5d5307606d7bb5d0e38e342 Mon Sep 17 00:00:00 2001 From: Sowmya N Dixit Date: Tue, 19 Mar 2024 12:27:32 +0530 Subject: [PATCH] Sunbird opensource release 2.0.1 GA (#15) --- .github/workflows/build_and_deploy.yaml | 113 +++---- .github/workflows/pull_request.yaml | 25 ++ .github/workflows/upload_artifact.yaml | 85 +++++ data-products/pom.xml | 320 +++++++++++++++--- .../src/main/resources/application.conf | 7 - data-products/src/main/resources/log4j2.xml | 13 + .../main/resources/masterdata-indexer.conf | 26 ++ .../MasterDataProcessorIndexer.scala | 196 ++++++----- .../helper/BaseMetricHelper.scala | 41 +++ .../helper/KafkaMessageProducer.scala | 34 ++ .../obsrv/dataproducts/model/JobMetric.scala | 28 ++ .../obsrv/dataproducts/util/CommonUtil.scala | 45 +++ .../obsrv/dataproducts/util/HttpUtil.scala | 17 + .../obsrv/dataproducts/util/StorageUtil.scala | 51 +++ .../src/test/resources/application.conf | 7 - .../resources/masterdata-indexer-test.conf | 35 ++ .../org/sunbird/fixture/EventFixture.scala | 11 + .../sunbird/spec/MasterDataIndexerSpec.scala | 242 +++++++++++++ .../service/DatasetRegistryService.scala | 18 +- .../obsrv/core/model/ErrorConstants.scala | 12 +- .../obsrv/core/model/SystemConfig.scala | 27 +- .../obsrv/core/streaming/BaseJobConfig.scala | 5 +- pipeline/kafka-connector/pom.xml | 263 -------------- .../src/main/resources/kafka-connector.conf | 16 - .../connector/task/KafkaConnectorConfig.scala | 25 -- .../task/KafkaConnectorStreamTask.scala | 71 ---- .../src/test/resources/test.conf | 14 - .../KafkaConnectorStreamTestSpec.scala | 126 ------- pipeline/pom.xml | 1 - 29 files changed, 1093 insertions(+), 781 deletions(-) create mode 100644 .github/workflows/pull_request.yaml create mode 100644 .github/workflows/upload_artifact.yaml delete mode 100644 data-products/src/main/resources/application.conf create mode 100644 data-products/src/main/resources/log4j2.xml create mode 100644 data-products/src/main/resources/masterdata-indexer.conf create mode 100644 data-products/src/main/scala/org/sunbird/obsrv/dataproducts/helper/BaseMetricHelper.scala create mode 100644 data-products/src/main/scala/org/sunbird/obsrv/dataproducts/helper/KafkaMessageProducer.scala create mode 100644 data-products/src/main/scala/org/sunbird/obsrv/dataproducts/model/JobMetric.scala create mode 100644 data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/CommonUtil.scala create mode 100644 data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/HttpUtil.scala create mode 100644 data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/StorageUtil.scala delete mode 100644 data-products/src/test/resources/application.conf create mode 100644 data-products/src/test/resources/masterdata-indexer-test.conf create mode 100644 data-products/src/test/scala/org/sunbird/fixture/EventFixture.scala create mode 100644 data-products/src/test/scala/org/sunbird/spec/MasterDataIndexerSpec.scala delete mode 100644 pipeline/kafka-connector/pom.xml delete mode 100644 pipeline/kafka-connector/src/main/resources/kafka-connector.conf delete mode 100644 pipeline/kafka-connector/src/main/scala/org/sunbird/obsrv/connector/task/KafkaConnectorConfig.scala delete mode 100644 pipeline/kafka-connector/src/main/scala/org/sunbird/obsrv/connector/task/KafkaConnectorStreamTask.scala delete mode 100644 pipeline/kafka-connector/src/test/resources/test.conf delete mode 100644 pipeline/kafka-connector/src/test/scala/org/sunbird/obsrv/connector/KafkaConnectorStreamTestSpec.scala diff --git a/.github/workflows/build_and_deploy.yaml b/.github/workflows/build_and_deploy.yaml index 48c610c9..35ba8cf8 100644 --- a/.github/workflows/build_and_deploy.yaml +++ b/.github/workflows/build_and_deploy.yaml @@ -1,104 +1,67 @@ -name: Obsrv Core service build and deploy workflow +name: Build and Deploy run-name: Workflow run for ${{ github.ref }} on: push: tags: - - '*' + - '*' + workflow_dispatch: + inputs: + aws-deploy: + type: boolean + required: true + default: false jobs: - check-tag: - runs-on: ubuntu-latest - outputs: - ALLOWED_TAG: ${{ steps.tag-checker.outputs.TRIGGER_ALLOWED }} - steps: - - name: Check if tag is one in list of current releases - id: tag-checker - run: | - (echo -n TRIGGER_ALLOWED= && echo 'print("${{ github.ref_name }}".split("_")[0] - not in ${{ vars.CURRENT_RELEASE }})' | python3) >> "$GITHUB_OUTPUT" - - docker-build: - needs: check-tag - if: needs.check-tag.outputs.ALLOWED_TAG == 'True' + build-image: runs-on: ubuntu-latest strategy: matrix: - include: - - image: "extractor" - target: "extractor-image" - - image: "preprocessor" - target: "preprocessor-image" - - image: "denormalizer" - target: "denormalizer-image" - - image: "transformer" - target: "transformer-image" - - image: "druid-router" - target: "router-image" - - image: "merged-pipeline" - target: "merged-image" - - image: "master-data-processor" - target: "master-data-processor-image" - - image: "kafka-connector" - target: "kafka-connector-image" - - + include: + - image: "extractor" + target: "extractor-image" + - image: "preprocessor" + target: "preprocessor-image" + - image: "denormalizer" + target: "denormalizer-image" + - image: "transformer" + target: "transformer-image" + - image: "druid-router" + target: "router-image" + - image: "merged-pipeline" + target: "merged-image" + - image: "master-data-processor" + target: "master-data-processor-image" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 0 - - name: Maven Build - run: | - mvn clean install -DskipTests - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 - name: Login to docker hub - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - - name: Build merged-pipeline image and push - uses: docker/build-push-action@v4 - with: - platforms: linux/amd64 - target: merged-image - push: true - tags: ${{ secrets.DOCKERHUB_USERNAME }}/merged-pipeline:${{ github.ref_name }} - - - name: Build merged-pipeline image and push - uses: docker/build-push-action@v4 - with: - platforms: linux/amd64 - target: master-data-processor-image - push: true - tags: ${{ secrets.DOCKERHUB_USERNAME }}/master-data-processor:${{ github.ref_name }} - - - name: Build merged-pipeline image and push - uses: docker/build-push-action@v4 - with: - platforms: linux/amd64 - target: kafka-connector-image - push: true - tags: ${{ secrets.DOCKERHUB_USERNAME }}/kafka-connector:${{ github.ref_name }} - - name: Build ${{matrix.image}} image and push - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: platforms: linux/amd64 target: ${{matrix.target}} push: true tags: ${{ secrets.DOCKERHUB_USERNAME }}/${{matrix.image}}:${{ github.ref_name }} + aws-deploy: - needs: [check-tag, docker-build] - if: needs.check-tag.outputs.ALLOWED_TAG == 'True' + needs: build-image + if: github.event.inputs.aws-deploy == 'True' runs-on: ubuntu-latest environment: aws-dev steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Setup Terragrunt uses: autero1/action-terragrunt@v1.1.0 with: @@ -107,12 +70,12 @@ jobs: run: terragrunt --version - name: Clone the terraform deployment repo - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: repository: ${{ vars.DEPLOY_REPO }} path: deploy ref: ${{ vars.DEPLOY_REPO_REF }} - + - name: Fetch and update kubeconfig file env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} @@ -136,12 +99,12 @@ jobs: -var flink_image_tag=${{ github.ref_name }} azure-deploy: - needs: [check-tag, docker-build] - if: needs.check-tag.outputs.ALLOWED_TAG == 'True' && vars.CLOUD_PROVIDER == 'azure' + needs: build-image + if: vars.CLOUD_PROVIDER == 'azure' runs-on: ubuntu-latest steps: - name: Clone the terraform deployment repo - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: repository: ${{ vars.DEPLOY_REPO }} path: deploy @@ -158,4 +121,4 @@ jobs: terragrunt init terragrunt apply -auto-approve -replace=module.flink.helm_release.flink \ -var flink_container_registry=${{ secrets.DOCKERHUB_USERNAME }} \ - -var flink_image_tag=${{ github.ref_name }} + -var flink_image_tag=${{ github.ref_name }} \ No newline at end of file diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml new file mode 100644 index 00000000..44c79317 --- /dev/null +++ b/.github/workflows/pull_request.yaml @@ -0,0 +1,25 @@ +name: Pull Request +run-name: Workflow run for pull request - ${{ github.event.pull_request.title }} +on: + pull_request: + types: + - opened + - synchronize + +jobs: + test-cases: + if: github.event_name == 'pull_request' + runs-on: ubuntu-latest + steps: + - name: Set up JDK 11 + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '11' + + - name: Checkout code + uses: actions/checkout@v4 + + - name: Run test cases + run: | + mvn clean install \ No newline at end of file diff --git a/.github/workflows/upload_artifact.yaml b/.github/workflows/upload_artifact.yaml new file mode 100644 index 00000000..38cb7ec8 --- /dev/null +++ b/.github/workflows/upload_artifact.yaml @@ -0,0 +1,85 @@ +name: Upload Artifacts +run-name: Workflow run for ${{ github.ref }} +on: + push: + tags: + - '*' + +jobs: + artifacts-upload-core: + runs-on: ubuntu-latest + steps: + - name: Get Tag Name + id: get-tag + run: echo "tag_name=${GITHUB_REF#refs/tags/}" >>$GITHUB_OUTPUT + + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup JAVA + uses: actions/setup-java@v4 + with: + java-version: '11' + distribution: 'temurin' + cache: 'maven' + + - name: Build Framework Artifacts + run: | + cd ./framework + mvn clean install -DskipTests + - name: Upload Framework Artifacts + uses: actions/upload-artifact@v4 + with: + name: framework-${{ steps.get-tag.outputs.tag_name }}.jar + path: ./framework/target/framework-1.0.0.jar + if-no-files-found: error + + - name: Build Dataset Registry Artifacts + run: | + cd ./dataset-registry + mvn clean install -DskipTests + - name: Upload Dataset Registry Artifacts + uses: actions/upload-artifact@v4 + with: + name: dataset-registry-${{ steps.get-tag.outputs.tag_name }}.jar + path: ./dataset-registry/target/dataset-registry-1.0.0.jar + if-no-files-found: error + + artifacts-upload-pipeline: + needs: artifacts-upload-core + runs-on: ubuntu-latest + strategy: + matrix: + include: + - image: "extractor" + - image: "preprocessor" + - image: "denormalizer" + - image: "transformer" + - image: "druid-router" + - image: "pipeline-merged" + - image: "master-data-processor" + steps: + - name: Get Tag Name + id: get-tag + run: echo "tag_name=${GITHUB_REF#refs/tags/}" >>$GITHUB_OUTPUT + + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup JAVA + uses: actions/setup-java@v4 + with: + java-version: '11' + distribution: 'temurin' + cache: 'maven' + + - name: Build Data Pipeline Artifacts + run: | + cd ./pipeline + mvn clean install -DskipTests + - name: Upload Data Pipeline Artifacts + uses: actions/upload-artifact@v4 + with: + name: ${{matrix.image}}-${{ steps.get-tag.outputs.tag_name }}.jar + path: ./pipeline/${{matrix.image}}/target/${{matrix.image}}-1.0.0.jar + if-no-files-found: error \ No newline at end of file diff --git a/data-products/pom.xml b/data-products/pom.xml index 51090a71..c3b0045d 100644 --- a/data-products/pom.xml +++ b/data-products/pom.xml @@ -12,7 +12,9 @@ 3.1.0 2.12.11 2.12 - 1.1.1 + 1.4.0 + 11 + 2.14.1 @@ -30,6 +32,10 @@ org.apache.xbean xbean-asm6-shaded + + org.apache.zookeeper + zookeeper + @@ -58,16 +64,31 @@ commons-text 1.6 - com.fasterxml.jackson.core jackson-annotations - 2.10.0 + 2.15.2 com.fasterxml.jackson.core jackson-core - 2.10.0 + 2.15.2 + + + com.fasterxml.jackson.module + jackson-module-scala_${scala.maj.version} + 2.15.2 + + + com.fasterxml.jackson.core + jackson-databind + + + + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 org.sunbird.obsrv @@ -83,6 +104,32 @@ org.sunbird.obsrv framework 1.0.0 + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.module + jackson-module-scala_${scala.maj.version} + + + com.fasterxml.jackson.core + jackson-databind + + + io.github.embeddedkafka + embedded-kafka_${scala.maj.version} + + + org.apache.flink + flink-runtime + + org.scalatest @@ -111,29 +158,6 @@ - - org.sunbird - cloud-store-sdk_${scala.maj.version} - 1.4.6 - - - com.microsoft.azure - azure-storage - - - com.fasterxml.jackson.core - jackson-core - - - org.apache.httpcomponents - httpclient - - - com.google.guava - guava - - - com.microsoft.azure azure-storage @@ -155,6 +179,10 @@ org.apache.avro avro + + org.apache.zookeeper + zookeeper + @@ -163,43 +191,198 @@ 2.7.3 provided + + software.amazon.awssdk + s3 + 2.17.0 + + + software.amazon.awssdk + auth + 2.17.0 + + + software.amazon.awssdk + sts + 2.17.0 + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + org.mockito + mockito-scala-scalatest_${scala.maj.version} + 1.17.29 + test + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + it.ozimov + embedded-redis + 0.7.1 + test + + + org.sunbird.obsrv + framework + 1.0.0 + test-jar + test + + + org.sunbird.obsrv + dataset-registry + 1.0.0 + test-jar + test + + + io.zonky.test + embedded-postgres + 2.0.3 + test + + + junit + junit + 4.12 + test + + + com.konghq + unirest-mocks + 3.14.1 + test + + + io.github.embeddedkafka + embedded-kafka_2.12 + 2.8.1 + test + + + org.apache.zookeeper + zookeeper + + + + + org.apache.zookeeper + zookeeper + 3.5.9 + + + com.squareup.okhttp3 + mockwebserver + 4.4.0 + test + + + com.google.code.gson + gson + 2.8.9 + - - - etl-jobs-1.0 src/main/scala src/test/scala - - net.alchim31.maven - scala-maven-plugin - 3.2.2 + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + package - compile - testCompile + shade - - -dependencyfile - ${project.build.directory}/.scala_dependencies - -nobootcp - + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.sunbird.obsrv.dataproducts.job.MasterDataProcessorIndexer + + + META-INF/services/org.jclouds.apis.ApiMetadata + + + META-INF/services/org.jclouds.providers.ProviderMetadata + + + + + + maven-surefire-plugin + 2.20 + + true + + + org.scalatest scalatest-maven-plugin - 2.0.0 + 1.0 + + ${project.build.directory}/surefire-reports + . + data-products-testsuite.txt + test - test test @@ -208,19 +391,53 @@ - maven-assembly-plugin - 2.3 + org.apache.maven.plugins + maven-jar-plugin + 3.2.0 + + + + test-jar + + + + + + + org.scoverage + scoverage-maven-plugin + ${scoverage.plugin.version} - - src/main/assembly/src.xml - + ${scala.version} + true + true + + + + + net.alchim31.maven + scala-maven-plugin + 4.4.0 + + ${java.target.runtime} + ${java.target.runtime} + ${scala.version} + false - make-assembly - package + scala-compile-first + process-resources - single + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile @@ -238,5 +455,4 @@ - diff --git a/data-products/src/main/resources/application.conf b/data-products/src/main/resources/application.conf deleted file mode 100644 index 6b0a1c94..00000000 --- a/data-products/src/main/resources/application.conf +++ /dev/null @@ -1,7 +0,0 @@ -# do not delete this file -redis.host="localhost" -redis.port="6379" -cloudStorage.container="obsrv-data" -cloudStorage.provider="aws" -druid.indexer.url=http://localhost:8888/druid/indexer/v1/task -druid.datasource.delete.url=http://localhost:8888/druid/coordinator/v1/datasources/ \ No newline at end of file diff --git a/data-products/src/main/resources/log4j2.xml b/data-products/src/main/resources/log4j2.xml new file mode 100644 index 00000000..a395a5a9 --- /dev/null +++ b/data-products/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/data-products/src/main/resources/masterdata-indexer.conf b/data-products/src/main/resources/masterdata-indexer.conf new file mode 100644 index 00000000..5e2ee921 --- /dev/null +++ b/data-products/src/main/resources/masterdata-indexer.conf @@ -0,0 +1,26 @@ +# do not delete this file +env=local + +redis.host="localhost" +redis.port="6379" +redis.scan.count=1000 +redis.max.pipeline.size=1000 +cloud.storage.container="://"container_name"/" +cloud.storage.provider="" +cloud.storage.accountName="obsrv" # Is required when azure is provider. Will only be used when azure is the provider +druid.indexer.url="http://localhost:8888/druid/indexer/v1/task" +druid.datasource.delete.url="http://localhost:8888/druid/coordinator/v1/datasources/" + +metrics { + topicName = ""${env}".spark.stats" +} + +kafka { + bootstrap.servers = "localhost:9092" +} + +#inputSourceSpec +source.spec="{\"spec\":{\"ioConfig\":{\"type\":\"index_parallel\",\"inputSource\":{\"type\":\"local\",\"baseDir\":\"FILE_PATH\",\"filter\":\"**json.gz\"}}}}" + +#deltaIngestionSpec +delta.ingestion.spec= "{\"type\":\"index_parallel\",\"spec\":{\"dataSchema\":{\"dataSource\":\"DATASOURCE_REF\"},\"ioConfig\":{\"type\":\"index_parallel\"},\"tuningConfig\":{\"type\":\"index_parallel\",\"maxRowsInMemory\":500000,\"forceExtendableShardSpecs\":false,\"logParseExceptions\":true}}}" \ No newline at end of file diff --git a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/MasterDataProcessorIndexer.scala b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/MasterDataProcessorIndexer.scala index e1ecfdec..2c41181c 100644 --- a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/MasterDataProcessorIndexer.scala +++ b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/MasterDataProcessorIndexer.scala @@ -2,129 +2,127 @@ package org.sunbird.obsrv.dataproducts import com.redislabs.provider.redis._ import com.typesafe.config.{Config, ConfigFactory} -import kong.unirest.Unirest -import org.apache.spark.{SparkConf, SparkContext} -import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} +import org.apache.logging.log4j.{LogManager, Logger} +import org.apache.spark.sql.SparkSession import org.joda.time.{DateTime, DateTimeZone} import org.json4s.native.JsonMethods._ -import org.sunbird.cloud.storage.factory.{StorageConfig, StorageServiceFactory} -import org.sunbird.obsrv.core.util.JSONUtil +import org.sunbird.obsrv.core.exception.ObsrvException +import org.sunbird.obsrv.core.model.ErrorConstants +import org.sunbird.obsrv.dataproducts.helper.BaseMetricHelper +import org.sunbird.obsrv.dataproducts.model.{Edata, MetricLabel} +import org.sunbird.obsrv.dataproducts.util.{CommonUtil, HttpUtil, StorageUtil} import org.sunbird.obsrv.model.DatasetModels.{DataSource, Dataset} +import org.sunbird.obsrv.model.DatasetStatus import org.sunbird.obsrv.registry.DatasetRegistry -import scala.collection.mutable - object MasterDataProcessorIndexer { - - private val config: Config = ConfigFactory.load("application.conf").withFallback(ConfigFactory.systemEnvironment()) - private val dayPeriodFormat: DateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd").withZoneUTC() - - private case class Paths(datasourceRef: String, objectKey: String, outputFilePath: String, timestamp: Long) - - def main(args: Array[String]): Unit = { - - val datasets = DatasetRegistry.getAllDatasets("master-dataset") - val indexedDatasets = datasets.filter(dataset => dataset.datasetConfig.indexData.nonEmpty && dataset.datasetConfig.indexData.get) - indexedDatasets.foreach(dataset => { - indexDataset(dataset) - }) - } - - private def indexDataset(dataset: Dataset): Unit = { - val datasources = DatasetRegistry.getDatasources(dataset.id) - if(datasources.isEmpty || datasources.get.size > 1) { - return - } - val datasource = datasources.get.head - val paths = getPaths(datasource) - createDataFile(dataset, paths.timestamp, paths.outputFilePath, paths.objectKey) - val ingestionSpec = updateIngestionSpec(datasource, paths.datasourceRef, paths.objectKey) - submitIngestionTask(ingestionSpec) - updateDataSourceRef(datasource, paths.datasourceRef) - if(!datasource.datasource.equals(datasource.datasourceRef)) { - deleteDataSource(datasource.datasourceRef) + private final val logger: Logger = LogManager.getLogger(MasterDataProcessorIndexer.getClass) + + @throws[ObsrvException] + def processDataset(config: Config, dataset: Dataset, spark: SparkSession): Map[String, Long] = { + val result = CommonUtil.time { + val datasource = fetchDatasource(dataset) + val paths = StorageUtil.getPaths(datasource, config) + val eventsCount: Long = createDataFile(dataset, paths.outputFilePath, spark, config) + val ingestionSpec: String = updateIngestionSpec(datasource, paths.datasourceRef, paths.ingestionPath, config) + if (eventsCount > 0L) { + submitIngestionTask(dataset.id, ingestionSpec, config) + } + DatasetRegistry.updateDatasourceRef(datasource, paths.datasourceRef) + if (!datasource.datasourceRef.equals(paths.datasourceRef)) { + deleteDataSource(dataset.id, datasource.datasourceRef, config) + } + Map("success_dataset_count" -> 1, "total_dataset_count" -> 1, "total_events_processed" -> eventsCount) } + val metricMap = result._2 ++ Map("total_time_taken" -> result._1) + metricMap.asInstanceOf[Map[String, Long]] } - private def getPaths(datasource: DataSource): Paths = { - - val dt = new DateTime(DateTimeZone.UTC).withTimeAtStartOfDay() - val timestamp = dt.getMillis - val date = dayPeriodFormat.print(dt) - val objectKey = "masterdata-indexer/" + datasource.datasetId + "/" + date + ".json" - val datasourceRef = datasource.datasource + '-' + date - val outputFilePath = "masterdata-indexer/" + datasource.datasetId + "/" + date - Paths(datasourceRef, objectKey, outputFilePath, timestamp) - } - private def updateIngestionSpec(datasource: DataSource, datasourceRef: String, objectKey: String): String = { - - val deltaIngestionSpec = s"""{"type":"index_parallel","spec":{"dataSchema":{"dataSource":"$datasourceRef"},"ioConfig":{"type":"index_parallel"},"tuningConfig":{"type":"index_parallel","targetPartitionSize":5000000,"maxRowsInMemory":25000,"forceExtendableShardSpecs":false,"logParseExceptions":true}}}""" - val provider = getProvider() - val container = config.getString("cloudStorage.container") - val inputSourceSpec = s"""{"spec":{"ioConfig":{"inputSource":{"type":"$provider","objectGlob":"**.json","objects":[{"bucket":"$container","path":"$objectKey"}]}}}}""" - + // This method is used to update the ingestion spec based on datasource and storage path + private def updateIngestionSpec(datasource: DataSource, datasourceRef: String, filePath: String, config: Config): String = { + val deltaIngestionSpec: String = config.getString("delta.ingestion.spec").replace("DATASOURCE_REF", datasourceRef) + val inputSourceSpec: String = StorageUtil.getInputSourceSpec(filePath, config) val deltaJson = parse(deltaIngestionSpec) val inputSourceJson = parse(inputSourceSpec) val ingestionSpec = parse(datasource.ingestionSpec) - val modIngestionSpec = ingestionSpec merge deltaJson merge inputSourceJson compact(render(modIngestionSpec)) } - @throws[Exception] - private def getProvider(): String = { - config.getString("cloudStorage.provider") match { - case "aws" => "s3" - case "azure" => "azure" - case "gcloud" => "google" - case "cephs3" => "s3" // TODO: Have to check Druid compatibility - case "oci" => "s3" // TODO: Have to check Druid compatibility - case _ => throw new Exception("Unsupported provider") - } + // This method is used to submit the ingestion task to Druid for indexing data + def submitIngestionTask(datasetId: String, ingestionSpec: String, config: Config): Unit = { + logger.debug(s"submitIngestionTask() | datasetId=$datasetId") + val response = HttpUtil.post(config.getString("druid.indexer.url"), ingestionSpec) + response.ifFailure(throw new ObsrvException(ErrorConstants.ERR_SUBMIT_INGESTION_FAILED)) } - private def submitIngestionTask(ingestionSpec: String) = { - // TODO: Handle success and failure responses properly - val response = Unirest.post(config.getString("druid.indexer.url")) - .header("Content-Type", "application/json") - .body(ingestionSpec).asJson() - response.ifFailure(_ => throw new Exception("Exception while submitting ingestion task")) + // This method is used for deleting a datasource from druid + private def deleteDataSource(datasetID: String, datasourceRef: String, config: Config): Unit = { + logger.debug(s"deleteDataSource() | datasetId=$datasetID") + val response = HttpUtil.delete(config.getString("druid.datasource.delete.url") + datasourceRef) + response.ifFailure(throw new ObsrvException(ErrorConstants.ERR_DELETE_DATASOURCE_FAILED)) } - private def updateDataSourceRef(datasource: DataSource, datasourceRef: String): Unit = { - DatasetRegistry.updateDatasourceRef(datasource, datasourceRef) + // This method will fetch the data from redis based on dataset config + // then write the data as a compressed JSON to the respective cloud provider + private def createDataFile(dataset: Dataset, outputFilePath: String, spark: SparkSession, config: Config): Long = { + logger.info(s"createDataFile() | START | dataset=${dataset.id} ") + import spark.implicits._ + val readWriteConf = ReadWriteConfig(scanCount = config.getInt("redis.scan.count"), maxPipelineSize = config.getInt("redis.max.pipeline.size")) + val redisConfig = new RedisConfig(initialHost = RedisEndpoint(host = dataset.datasetConfig.redisDBHost.get, port = dataset.datasetConfig.redisDBPort.get, dbNum = dataset.datasetConfig.redisDB.get)) + val ts: Long = new DateTime(DateTimeZone.UTC).withTimeAtStartOfDay().getMillis + val rdd = spark.sparkContext.fromRedisKV("*")(redisConfig = redisConfig, readWriteConfig = readWriteConf).map( + f => CommonUtil.processEvent(f._2, ts) + ) + val noOfRecords: Long = rdd.count() + if (noOfRecords > 0) { + rdd.toDF().write.mode("overwrite").option("compression", "gzip").json(outputFilePath) + } + logger.info(s"createDataFile() | END | dataset=${dataset.id} | noOfRecords=$noOfRecords") + noOfRecords } - private def deleteDataSource(datasourceRef: String): Unit = { - // TODO: Handle success and failure responses properly - val response = Unirest.delete(config.getString("druid.datasource.delete.url") + datasourceRef) - .header("Content-Type", "application/json") - .asJson() - response.ifFailure(_ => throw new Exception("Exception while deleting datasource" + datasourceRef)) + private def getDatasets(): List[Dataset] = { + val datasets: List[Dataset] = DatasetRegistry.getAllDatasets("master-dataset") + datasets.filter(dataset => { + dataset.datasetConfig.indexData.nonEmpty && dataset.datasetConfig.indexData.get && dataset.status == DatasetStatus.Live + }) } - private def createDataFile(dataset: Dataset, timestamp: Long, outputFilePath: String, objectKey: String): String = { - - val conf = new SparkConf() - .setAppName("MasterDataProcessorIndexer") - .setMaster("local[4]") - .set("spark.redis.host", dataset.datasetConfig.redisDBHost.get) - .set("spark.redis.port", String.valueOf(dataset.datasetConfig.redisDBHost.get)) - .set("spark.redis.db", String.valueOf(dataset.datasetConfig.redisDB.get)) - - val sc = new SparkContext(conf) - - val readWriteConf = ReadWriteConfig(scanCount = 1000, maxPipelineSize = 1000) - sc.fromRedisKV("*")(readWriteConfig = readWriteConf) - .map(f => JSONUtil.deserialize[mutable.Map[String, AnyRef]](f._2)) - .map(f => f.put("syncts", timestamp.asInstanceOf[AnyRef])) - .map(f => JSONUtil.serialize(f)) - .coalesce(1) - .saveAsTextFile(outputFilePath) - sc.stop() + def fetchDatasource(dataset: Dataset): DataSource = { + val datasources: List[DataSource] = DatasetRegistry.getDatasources(dataset.id).get + if (datasources.isEmpty) { + throw new ObsrvException(ErrorConstants.ERR_DATASOURCE_NOT_FOUND) + } + datasources.head + } - val storageService = StorageServiceFactory.getStorageService(StorageConfig(config.getString("cloudStorage.provider"), config.getString("cloudStorage.accountName"), config.getString("cloudStorage.accountKey"))) - storageService.upload(config.getString("cloudStorage.container"), outputFilePath + "/part-00000", objectKey, isDirectory = Option(false)) + // This method will fetch the dataset from database and processes the dataset + // then generates required metrics + def processDatasets(config: Config, spark: SparkSession): Unit = { + val datasets: List[Dataset] = getDatasets() + val metricHelper = new BaseMetricHelper(config) + datasets.foreach(dataset => { + logger.info(s"processDataset() | START | datasetId=${dataset.id}") + val metricData = try { + val metrics = processDataset(config, dataset, spark) + logger.info(s"processDataset() | SUCCESS | datasetId=${dataset.id} | Metrics=$metrics") + Edata(metric = metrics, labels = List(MetricLabel("job", "MasterDataIndexer"), MetricLabel("datasetId", dataset.id), MetricLabel("cloud", s"${config.getString("cloud.storage.provider")}"))) + } catch { + case ex: ObsrvException => + logger.error(s"processDataset() | FAILED | datasetId=${dataset.id} | Error=${ex.error}", ex) + Edata(metric = Map(metricHelper.getMetricName("failure_dataset_count") -> 1, "total_dataset_count" -> 1), labels = List(MetricLabel("job", "MasterDataIndexer"), MetricLabel("datasetId", dataset.id), MetricLabel("cloud", s"${config.getString("cloud.storage.provider")}")), err = ex.error.errorCode, errMsg = ex.error.errorMsg) + } + metricHelper.generate(datasetId = dataset.id, edata = metricData) + }) } -} + // $COVERAGE-OFF$ + def main(args: Array[String]): Unit = { + val config = ConfigFactory.load("masterdata-indexer.conf").withFallback(ConfigFactory.systemEnvironment()) + val spark = CommonUtil.getSparkSession("MasterDataIndexer", config) + processDatasets(config, spark) + spark.stop() + } + // $COVERAGE-ON$ +} \ No newline at end of file diff --git a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/helper/BaseMetricHelper.scala b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/helper/BaseMetricHelper.scala new file mode 100644 index 00000000..5c09f6e1 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/helper/BaseMetricHelper.scala @@ -0,0 +1,41 @@ +package org.sunbird.obsrv.dataproducts.helper + +import com.typesafe.config.Config +import org.sunbird.obsrv.core.util.JSONUtil +import org.sunbird.obsrv.dataproducts.model._ + +class BaseMetricHelper(config: Config) { + + val metrics: Map[String, String] = Map( + "total_dataset_count" -> "total_dataset_count", + "success_dataset_count" -> "success_dataset_count", + "failure_dataset_count" -> "failure_dataset_count", + "total_events_processed" -> "total_events_processed", + "total_time_taken" -> "total_time_taken" + ) + + private val metricsProducer = new KafkaMessageProducer(config) + + private def sync(metric: IJobMetric): Unit = { + val metricStr = JSONUtil.serialize(metric) + metricsProducer.sendMessage(message = metricStr) + } + + def getMetricName(name: String): String = { + metrics.getOrElse(name, "") + } + + private def getObject(datasetId: String) = { + MetricObject(id = datasetId, `type` = "Dataset", ver = "1.0.0") + } + + def generate(datasetId: String, edata: Edata): Unit = { + val `object` = getObject(datasetId) + val actor = Actor(id = "MasterDataProcessorIndexerJob", `type` = "SYSTEM") + val pdata = Pdata(id = "DataProducts", pid = "MasterDataProcessorIndexerJob", ver = "1.0.0") + val context = Context(env = config.getString("env"), pdata = pdata) + val metric = JobMetric(ets = System.currentTimeMillis(), actor = actor, context = context, `object` = `object`, edata = edata) + this.sync(metric) + } +} + diff --git a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/helper/KafkaMessageProducer.scala b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/helper/KafkaMessageProducer.scala new file mode 100644 index 00000000..4fe9ab97 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/helper/KafkaMessageProducer.scala @@ -0,0 +1,34 @@ +package org.sunbird.obsrv.dataproducts.helper + +import com.typesafe.config.Config +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.slf4j.LoggerFactory + +import java.util.Properties + +class KafkaMessageProducer(config: Config) { + + private[this] val logger = LoggerFactory.getLogger(classOf[KafkaMessageProducer]) + private val kafkaProperties = new Properties(); + private val defaultTopicName = config.getString("metrics.topicName") + private val defaultKey = null + + kafkaProperties.put("bootstrap.servers", config.getString("kafka.bootstrap.servers")) + kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") + val producer = new KafkaProducer[String, String](kafkaProperties) + + def sendMessage(topic: String = defaultTopicName, key: String = defaultKey, message: String): Unit = { + try { + val record = new ProducerRecord[String, String](topic, key, message) + producer.send(record) + } + // $COVERAGE-OFF$ + catch { + case e: Exception => + logger.error("Exception occured while sending message to kafka", e.getMessage) + e.printStackTrace() + } + // $COVERAGE-ON$ + } +} diff --git a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/model/JobMetric.scala b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/model/JobMetric.scala new file mode 100644 index 00000000..0e8ce6a1 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/model/JobMetric.scala @@ -0,0 +1,28 @@ +package org.sunbird.obsrv.dataproducts.model + +import java.util.UUID + +case class Actor(id: String, `type`: String) + +case class Context(env: String, pdata: Pdata) + +case class Edata(metric: Map[String, Any], labels: Seq[MetricLabel], err: String = null, errMsg: String = null) + +case class MetricLabel(key: String, value: String) + +case class MetricObject(id: String, `type`: String, ver: String) + +case class Pdata(id: String, pid: String, ver: String) + +trait IJobMetric { + val eid: String + val ets: Long + val mid: String + val actor: Actor + val context: Context + val `object`: MetricObject + val edata: Edata +} + +case class JobMetric(eid: String = "METRIC", ets: Long, mid: String = UUID.randomUUID().toString, actor: Actor, context: Context, `object`: MetricObject, edata: Edata) extends IJobMetric + diff --git a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/CommonUtil.scala b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/CommonUtil.scala new file mode 100644 index 00000000..4deb017b --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/CommonUtil.scala @@ -0,0 +1,45 @@ +package org.sunbird.obsrv.dataproducts.util + +import com.typesafe.config.Config +import org.apache.logging.log4j.{LogManager, Logger} +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.sunbird.obsrv.core.util.JSONUtil +import org.sunbird.obsrv.model.DatasetModels.{DataSource, Dataset} + +import scala.collection.mutable + +object CommonUtil { + + private final val logger: Logger = LogManager.getLogger(CommonUtil.getClass) + + def time[R](block: => R): (Long, R) = { + val t0 = System.currentTimeMillis() + val result = block // call-by-name + val t1 = System.currentTimeMillis() + ((t1 - t0), result) + } + + def processEvent(value: String, ts: Long) = { + val json = JSONUtil.deserialize[mutable.Map[String, AnyRef]](value) + json("obsrv_meta") = mutable.Map[String, AnyRef]("syncts" -> ts.asInstanceOf[AnyRef]).asInstanceOf[AnyRef] + JSONUtil.serialize(json) + } + + private def getSafeConfigString(config: Config, key: String): String = { + if (config.hasPath(key)) config.getString(key) else "" + } + + def getSparkSession(appName: String, config: Config): SparkSession = { + + val conf = new SparkConf().setAppName(appName) + val master = getSafeConfigString(config, "spark.master") + + if (master.isEmpty) { + logger.info("Master not found. Setting it to local[*]") + conf.setMaster("local[*]") + } + SparkSession.builder().appName(appName).config(conf).getOrCreate() + } + +} \ No newline at end of file diff --git a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/HttpUtil.scala b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/HttpUtil.scala new file mode 100644 index 00000000..90a33055 --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/HttpUtil.scala @@ -0,0 +1,17 @@ +package org.sunbird.obsrv.dataproducts.util + +import kong.unirest.{HttpResponse, JsonNode, Unirest} + +import scala.collection.JavaConverters._ +import scala.language.postfixOps + +object HttpUtil extends Serializable { + + def post(url: String, requestBody: String, headers: Map[String, String] = Map[String, String]("Content-Type" -> "application/json")): HttpResponse[JsonNode] = { + Unirest.post(url).headers(headers.asJava).body(requestBody).asJson() + } + + def delete(url: String): HttpResponse[JsonNode] = { + Unirest.delete(url).header("Content-Type", "application/json").asJson() + } +} diff --git a/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/StorageUtil.scala b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/StorageUtil.scala new file mode 100644 index 00000000..f98a047c --- /dev/null +++ b/data-products/src/main/scala/org/sunbird/obsrv/dataproducts/util/StorageUtil.scala @@ -0,0 +1,51 @@ +package org.sunbird.obsrv.dataproducts.util + +import com.typesafe.config.Config +import org.apache.logging.log4j.{LogManager, Logger} +import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} +import org.joda.time.{DateTime, DateTimeZone} +import org.sunbird.obsrv.core.exception.ObsrvException +import org.sunbird.obsrv.core.model.ErrorConstants +import org.sunbird.obsrv.dataproducts.MasterDataProcessorIndexer +import org.sunbird.obsrv.model.DatasetModels.DataSource + +object StorageUtil { + val logger: Logger = LogManager.getLogger(MasterDataProcessorIndexer.getClass) + val dayPeriodFormat: DateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd").withZoneUTC() + + case class Paths(datasourceRef: String, ingestionPath: String, outputFilePath: String, timestamp: Long) + + case class BlobProvider(sparkURIFormat: String, ingestionSourceType: String, druidURIFormat: String) + + // This method returns a BlobProvider object based on cloud storage provider + def providerFormat(cloudProvider: String): BlobProvider = { + cloudProvider match { + case "local" => BlobProvider("file", "local", "file") + case "aws" => BlobProvider("s3a", "s3", "s3") + case "azure" => BlobProvider("wasbs", "azure", "azure") + case "gcloud" => BlobProvider("gs", "google", "gs") + case "cephs3" => BlobProvider("s3a", "s3", "s3") // TODO: Have to check Druid compatibility + case "oci" => BlobProvider("s3a", "s3", "s3") // TODO: Have to check Druid compatibility + case _ => throw new ObsrvException(ErrorConstants.UNSUPPORTED_PROVIDER) + } + } + + def getPaths(datasource: DataSource, config: Config): Paths = { + val dt = new DateTime(DateTimeZone.UTC).withTimeAtStartOfDay() + val timestamp = dt.getMillis + val date = dayPeriodFormat.print(dt) + val provider = providerFormat(config.getString("cloud.storage.provider")) + val cloudPrefix = provider.sparkURIFormat + config.getString("cloud.storage.container") + val pathSuffix = s"""masterdata-indexer/${datasource.datasetId}/$date/""" + val ingestionPath = cloudPrefix.replace(provider.sparkURIFormat, provider.druidURIFormat) + pathSuffix + val datasourceRef = datasource.datasource + '-' + date + val outputFilePath = cloudPrefix + pathSuffix + Paths(datasourceRef, ingestionPath, outputFilePath, timestamp) + } + + // This method provides appropriate input source spec depending on the cloud storage provider + def getInputSourceSpec(filePath: String, config: Config): String = { + config.getString("source.spec").replace("FILE_PATH", filePath) + } + +} \ No newline at end of file diff --git a/data-products/src/test/resources/application.conf b/data-products/src/test/resources/application.conf deleted file mode 100644 index 6b0a1c94..00000000 --- a/data-products/src/test/resources/application.conf +++ /dev/null @@ -1,7 +0,0 @@ -# do not delete this file -redis.host="localhost" -redis.port="6379" -cloudStorage.container="obsrv-data" -cloudStorage.provider="aws" -druid.indexer.url=http://localhost:8888/druid/indexer/v1/task -druid.datasource.delete.url=http://localhost:8888/druid/coordinator/v1/datasources/ \ No newline at end of file diff --git a/data-products/src/test/resources/masterdata-indexer-test.conf b/data-products/src/test/resources/masterdata-indexer-test.conf new file mode 100644 index 00000000..ecf5f976 --- /dev/null +++ b/data-products/src/test/resources/masterdata-indexer-test.conf @@ -0,0 +1,35 @@ +# do not delete this file +env=local + +redis.host="localhost" +redis.port="6379" +redis.scan.count=1000 +redis.max.pipeline.size=1000 +cloud.storage.container=":///"containerName"/" +cloud.storage.provider="local" +cloud.storage.accountName="obsrv" # Is required when azure is provider. Will only be used when azure is the provider +druid.indexer.url="http://localhost:8888/druid/indexer/v1/task" +druid.datasource.delete.url="http://localhost:8888/druid/coordinator/v1/datasources/" + +metrics { + topicName = ""${env}".spark.stats" +} + +kafka { + bootstrap.servers = "localhost:9092" +} + +#inputSourceSpec +source.spec="{\"spec\":{\"ioConfig\":{\"type\":\"index_parallel\",\"inputSource\":{\"type\":\"local\",\"baseDir\":\"FILE_PATH\",\"filter\":\"**json.gz\"}}}}" + +#deltaIngestionSpec +delta.ingestion.spec= "{\"type\":\"index_parallel\",\"spec\":{\"dataSchema\":{\"dataSource\":\"DATASOURCE_REF\"},\"ioConfig\":{\"type\":\"index_parallel\"},\"tuningConfig\":{\"type\":\"index_parallel\",\"maxRowsInMemory\":500000,\"forceExtendableShardSpecs\":false,\"logParseExceptions\":true}}}" + +postgres { + host = localhost + port = 5432 + maxConnections = 2 + user = "postgres" + password = "postgres" + database="postgres" +} \ No newline at end of file diff --git a/data-products/src/test/scala/org/sunbird/fixture/EventFixture.scala b/data-products/src/test/scala/org/sunbird/fixture/EventFixture.scala new file mode 100644 index 00000000..e40425a9 --- /dev/null +++ b/data-products/src/test/scala/org/sunbird/fixture/EventFixture.scala @@ -0,0 +1,11 @@ +package org.sunbird.fixture + +object EventFixture { + + val d1 = """{"fcm_token":"","city":"Sohna","device_id":"device-00","device_spec":"\"{'os':'Windows OS','cpu':'Apple M1','make':'Motorola XT1706'}\"","state":"Karnataka","uaspec":{"agent":"Safari","ver":"76.0.3809.132","system":"iOS 10","raw":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"},"country":"India","country_code":"IN","producer_id":"dev.obsrv.portal","state_code_custom":29,"state_code":"KA","state_custom":"Karnataka","district_custom":"Karnataka,s","first_access":1568379184000,"api_last_updated_on":1568377184000,"user_declared_district":"Bedfordshire","user_declared_state":"Karnataka"}""" + val d2 = """{"fcm_token":"","city":"Nawapur","device_id":"device-01","device_spec":"\"{'os':'CentOS','cpu':'Samsung Exynos','make':'Samsung S23'}\"","state":"Sikkim","uaspec":{"agent":"Safari","ver":"76.0.3809.132","system":"Windows OS","raw":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"},"country":"India","country_code":"IN","producer_id":"dev.obsrv.portal","state_code_custom":29,"state_code":"SI","state_custom":"Karnataka","district_custom":"Karnataka,s","first_access":1568379184000,"api_last_updated_on":1568377184000,"user_declared_district":"Berkshire","user_declared_state": "Sikkim"}""" + val d3 = """{"fcm_token":"","city":"Sumerpur","device_id":"device-02","device_spec":"\"{'os':'iOS 15','cpu':'abi: armeabi-v7a ARMv7 Processor rev 4 (v7l)','make':'iPhone'}\"","state":"Chhattisgarh","uaspec":{"agent":"Chrome","ver":"76.0.3809.132","system":"Android 6.0","raw":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"},"country":"India","country_code":"IN","producer_id":"dev.obsrv.portal","state_code_custom":29,"state_code":"CH","state_custom":"Karnataka","district_custom":"Karnataka,s","first_access":1568379184000,"api_last_updated_on":1568377184000,"user_declared_district":"Buckinghamshire","user_declared_state":"Chhattisgarh"}""" + val d4 = """{"fcm_token":"","city":"Kailasahar","device_id":"device-03","device_spec":"\"{'os':'Android 6.0','cpu':'AMD Ryzen 4900X','make':'Motorola XT1706'}\"","state":"Kerala","uaspec":{"agent":"Chrome","ver":"76.0.3809.132","system":"Android 12.0","raw":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"},"country":"India","country_code":"IN","producer_id":"dev.obsrv.portal","state_code_custom":29,"state_code":"KE","state_custom":"Karnataka","district_custom":"Karnataka,s","first_access":1568379184000,"api_last_updated_on":1568377184000,"user_declared_district":"Bedfordshire","user_declared_state":"Kerala"}""" + val d5 = """{"fcm_token":"","city":"Shahade","device_id":"device-04","device_spec":"\"{'os':'Ubuntu','cpu':'Qualcomm Snapdragon','make':'Blackberry'}\"","state":"Meghalaya","uaspec":{"agent":"Opera","ver":"76.0.3809.132","system":"Blackberry","raw":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36"},"country":"India","country_code":"IN","producer_id":"dev.obsrv.portal","state_code_custom":29,"state_code":"ME","state_custom":"Karnataka","district_custom":"Karnataka,s","first_access":1568379184000,"api_last_updated_on":1568377184000,"user_declared_district":"Berkshire","user_declared_state":"Meghalaya"}""" + +} diff --git a/data-products/src/test/scala/org/sunbird/spec/MasterDataIndexerSpec.scala b/data-products/src/test/scala/org/sunbird/spec/MasterDataIndexerSpec.scala new file mode 100644 index 00000000..0d54050e --- /dev/null +++ b/data-products/src/test/scala/org/sunbird/spec/MasterDataIndexerSpec.scala @@ -0,0 +1,242 @@ +package org.sunbird.obsrv.spec + +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig, duration2JavaDuration} +import io.zonky.test.db.postgres.embedded.EmbeddedPostgres +import kong.unirest.{HttpResponse, JsonNode} +import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} +import org.joda.time.{DateTime, DateTimeZone} +import org.mockito.MockitoSugar.{mock, when} +import org.sunbird.obsrv.registry.DatasetRegistry +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import org.sunbird.fixture.EventFixture +import org.sunbird.obsrv.core.cache.RedisConnect +import org.sunbird.obsrv.core.util.{PostgresConnect, PostgresConnectionConfig} +import org.sunbird.obsrv.dataproducts.helper.BaseMetricHelper +import org.sunbird.obsrv.dataproducts.model.{Edata, MetricLabel} +import redis.embedded.RedisServer + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.mockito.Mockito +import org.sunbird.obsrv.core.exception.ObsrvException +import org.sunbird.obsrv.core.model +import org.sunbird.obsrv.core.model.ErrorConstants +import org.sunbird.obsrv.dataproducts +import org.sunbird.obsrv.dataproducts.MasterDataProcessorIndexer +import org.sunbird.obsrv.dataproducts.util.StorageUtil.BlobProvider +import org.sunbird.obsrv.dataproducts.util.{CommonUtil, HttpUtil, StorageUtil} + +import scala.collection.mutable.ListBuffer + +class MasterDataIndexerSpec extends FlatSpec with BeforeAndAfterAll with Matchers { + + private val jobConfig: Config = ConfigFactory.load("masterdata-indexer-test.conf").withFallback(ConfigFactory.systemEnvironment()) + val mockMetrics = mock[BaseMetricHelper] + val pwd = System.getProperty("user.dir") + + val postgresConfig = PostgresConnectionConfig( + user = jobConfig.getString("postgres.user"), + password = jobConfig.getString("postgres.password"), + database = "postgres", + host = jobConfig.getString("postgres.host"), + port = jobConfig.getInt("postgres.port"), + maxConnections = jobConfig.getInt("postgres.maxConnections") + ) + + var embeddedPostgres: EmbeddedPostgres = _ + var redisServer: RedisServer = _ + var redisConnection: RedisConnect = _ + private val dayPeriodFormat: DateTimeFormatter = DateTimeFormat.forPattern("yyyyMMdd").withZoneUTC() + val dt = new DateTime(DateTimeZone.UTC).withTimeAtStartOfDay() + val date = dayPeriodFormat.print(dt) + + val customKafkaConsumerProperties: Map[String, String] = Map[String, String]("auto.offset.reset" -> "earliest", "group.id" -> "test-event-schema-group") + implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = + EmbeddedKafkaConfig( + kafkaPort = 9092, + zooKeeperPort = 2183, + customConsumerProperties = customKafkaConsumerProperties + ) + implicit val deserializer: StringDeserializer = new StringDeserializer() + var spark: SparkSession = _ + + + override def beforeAll(): Unit = { + super.beforeAll() + val conf = new SparkConf() + .setAppName("MasterDataProcessorIndexer") + .setMaster("local[*]") + spark = SparkSession.builder().config(conf).getOrCreate() + redisServer = new RedisServer(6340) + redisServer.start() + embeddedPostgres = EmbeddedPostgres.builder.setPort(5432).start() + val postgresConnect = new PostgresConnect(postgresConfig) + createSchema(postgresConnect) + insertTestData(postgresConnect) + redisConnection = new RedisConnect("localhost", 6340, 30000) + val jedis = redisConnection.getConnection(3) + jedis.set("device-00", EventFixture.d1) + jedis.set("device-01", EventFixture.d2) + jedis.set("device-02", EventFixture.d3) + jedis.set("device-03", EventFixture.d4) + jedis.set("device-04", EventFixture.d5) + EmbeddedKafka.start()(embeddedKafkaConfig) + createTestTopics() + } + + override def afterAll(): Unit = { + super.afterAll() + redisServer.stop() + embeddedPostgres.close() + EmbeddedKafka.stop() + spark.stop() + } + + def createTestTopics(): Unit = { + EmbeddedKafka.createCustomTopic("spark.stats") + } + + private def createSchema(postgresConnect: PostgresConnect) { + postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasets ( id text PRIMARY KEY, type text NOT NULL, validation_config json, extraction_config json, dedup_config json, data_schema json, denorm_config json, router_config json NOT NULL, dataset_config json NOT NULL, status text NOT NULL, tags text[], data_version INT, created_by text NOT NULL, updated_by text NOT NULL, created_date timestamp NOT NULL, updated_date timestamp NOT NULL );") + postgresConnect.execute("CREATE TABLE IF NOT EXISTS datasources ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), ingestion_spec json NOT NULL, datasource text NOT NULL, datasource_ref text NOT NULL);") + postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_transformations ( id text PRIMARY KEY, dataset_id text REFERENCES datasets (id), field_key text NOT NULL, transformation_function json NOT NULL, status text NOT NULL, mode text, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(field_key, dataset_id) );") + postgresConnect.execute("CREATE TABLE IF NOT EXISTS dataset_source_config ( id text PRIMARY KEY, dataset_id text NOT NULL REFERENCES datasets (id), connector_type text NOT NULL, connector_config json NOT NULL, status text NOT NULL, connector_stats json, created_by text NOT NULL, updated_by text NOT NULL, created_date Date NOT NULL, updated_date Date NOT NULL, UNIQUE(connector_type, dataset_id) );") + } + + private def insertTestData(postgresConnect: PostgresConnect) = { + postgresConnect.execute("insert into datasets(id, type, validation_config, extraction_config, dedup_config, data_schema, denorm_config, router_config, dataset_config, tags, data_version, status, created_by, updated_by, created_date, updated_date) VALUES('md1','master-dataset', '{\"validate\": true, \"mode\": \"Strict\", \"validation_mode\": {}}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 1036800}, \"batch_id\": \"id\"}', '{\"drop_duplicates\": true, \"dedup_key\": \"device_id\", \"dedup_period\": 1036800}', '{\"$schema\": \"https://json-schema.org/draft/2020-12/schema\", \"type\": \"object\", \"properties\": {\"fcm_token\": {\"type\": \"string\"}, \"city\": {\"type\": \"string\"}, \"device_id\": {\"type\": \"string\"}, \"device_spec\": {\"type\": \"string\"}, \"state\": {\"type\": \"string\"}, \"uaspec\": {\"type\": \"object\", \"properties\": {\"agent\": {\"type\": \"string\"}, \"ver\": {\"type\": \"string\"}, \"system\": {\"type\": \"string\"}, \"raw\": {\"type\": \"string\"}}}, \"country\": {\"type\": \"string\"}, \"country_code\": {\"type\": \"string\"}, \"producer_id\": {\"type\": \"string\"}, \"state_code_custom\": {\"type\": \"integer\"}, \"state_code\": {\"type\": \"string\"}, \"state_custom\": {\"type\": \"string\"}, \"district_custom\": {\"type\": \"string\"}, \"first_access\": {\"type\": \"integer\", \"format\": \"date-time\", \"suggestions\": [{\"message\": \"The Property ''first_access'' appears to be ''date-time'' format type.\", \"advice\": \"The System can index all data on this column\", \"resolutionType\": \"INDEX\", \"severity\": \"LOW\"}]}, \"api_last_updated_on\": {\"type\": \"integer\", \"format\": \"date-time\", \"suggestions\": [{\"message\": \"The Property ''api_last_updated_on'' appears to be ''date-time'' format type.\", \"advice\": \"The System can index all data on this column\", \"resolutionType\": \"INDEX\", \"severity\": \"LOW\"}]}, \"user_declared_district\": {\"type\": \"string\"}, \"user_declared_state\": {\"type\": \"string\"}}, \"required\": [\"first_access\", \"api_last_updated_on\", \"device_id\"]}', '{\"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"denorm_fields\": []}', '{\"topic\": \",d1\"}', '{\"data_key\": \"device_id\", \"timestamp_key\": \"\", \"exclude_fields\": [], \"entry_topic\": \"local.masterdata.ingest\", \"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"index_data\": true, \"redis_db\": 3}', NULL, NULL, 'Live', 'SYSTEM', 'SYSTEM', '2023-10-04 06:44:11.600', '2023-10-04 06:44:11.600');") + postgresConnect.execute("insert into datasources(id, dataset_id, ingestion_spec, datasource, datasource_ref) VALUES('md1_md1.1_DAY', 'md1', '{\"type\": \"kafka\",\"spec\": {\"dataSchema\": {\"dataSource\": \"telemetry-device-data.1_DAY\",\"dimensionsSpec\": {\"dimensions\": [{\"type\": \"string\",\"name\": \"fcm_token\"},{\"type\": \"string\",\"name\": \"city\"},{\"type\": \"string\",\"name\": \"device_id\"},{\"type\": \"string\",\"name\": \"device_spec\"},{\"type\": \"string\",\"name\": \"state\"},{\"type\": \"string\",\"name\": \"uaspec_agent\"}]},\"timestampSpec\": {\"column\": \"syncts\",\"format\": \"auto\"},\"metricsSpec\": [{\"type\": \"doubleSum\",\"name\": \"state_code_custom\",\"fieldName\": \"state_code_custom\"}],\"granularitySpec\": {\"type\": \"uniform\",\"segmentGranularity\": \"DAY\",\"rollup\": false}},\"tuningConfig\": {\"type\": \"kafka\",\"maxBytesInMemory\": 134217728,\"maxRowsPerSegment\": 500000,\"logParseExceptions\": true},\"ioConfig\": {\"type\": \"kafka\",\"topic\": \"telemetry-device-data\",\"consumerProperties\": {\"bootstrap.servers\": \"localhost:9092\"},\"taskCount\": 1,\"replicas\": 1,\"taskDuration\": \"PT1H\",\"useEarliestOffset\": true,\"completionTimeout\": \"PT1H\",\"inputFormat\": {\"type\": \"json\",\"flattenSpec\": {\"useFieldDiscovery\": true,\"fields\": [{ \"type\": \"path\",\"expr\": \"$.fcm_token\",\"name\": \"fcm_token\"},{\"type\": \"path\",\"expr\": \"$.city\",\"name\": \"city\"},{\"type\": \"path\",\"expr\": \"$.device_id\",\"name\": \"device_id\"},{\"type\": \"path\",\"expr\": \"$.device_spec\",\"name\": \"device_spec\"},{\"type\": \"path\",\"expr\": \"$.state\",\"name\": \"state\"},{\"type\": \"path\",\"expr\": \"$.uaspec.agent\",\"name\": \"uaspec_agent\"}]}},\"appendToExisting\": false}}}', 'md1.1_DAY', 'md1.1_DAY');") + postgresConnect.execute("insert into dataset_transformations values('tf1', 'md1', 'dealer.email', '{\"type\":\"mask\",\"expr\":\"dealer.email\"}', 'Live', 'Strict', 'System', 'System', now(), now());") + postgresConnect.execute("insert into datasets(id, type, validation_config, extraction_config, dedup_config, denorm_config, router_config, dataset_config, tags, data_version, status, created_by, updated_by, created_date, updated_date) VALUES('md2','master-dataset', '{\"validate\": true, \"mode\": \"Strict\", \"validation_mode\": {}}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 1036800}, \"batch_id\": \"id\"}', '{\"drop_duplicates\": true, \"dedup_key\": \"device_id\", \"dedup_period\": 1036800}', '{\"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"denorm_fields\": []}', '{\"topic\": \",d1\"}', '{\"data_key\": \"device_id\", \"timestamp_key\": \"\", \"exclude_fields\": [], \"entry_topic\": \"local.masterdata.ingest\", \"redis_db_host\": \"localhost\", \"redis_db_port\":6340, \"index_data\": true, \"redis_db\": 5}', NULL, NULL, 'Live', 'SYSTEM', 'SYSTEM', 'now()', 'now()');") + postgresConnect.execute("insert into datasources(id, dataset_id, ingestion_spec, datasource, datasource_ref) VALUES('md2_md1.1_DAY', 'md2', '{\"type\": \"kafka\",\"spec\": {\"dataSchema\": {\"dataSource\": \"telemetry-device-data.1_DAY\",\"dimensionsSpec\": {\"dimensions\": [{\"type\": \"string\",\"name\": \"fcm_token\"},{\"type\": \"string\",\"name\": \"city\"},{\"type\": \"string\",\"name\": \"device_id\"},{\"type\": \"string\",\"name\": \"device_spec\"},{\"type\": \"string\",\"name\": \"state\"},{\"type\": \"string\",\"name\": \"uaspec_agent\"}]},\"timestampSpec\": {\"column\": \"syncts\",\"format\": \"auto\"},\"metricsSpec\": [{\"type\": \"doubleSum\",\"name\": \"state_code_custom\",\"fieldName\": \"state_code_custom\"}],\"granularitySpec\": {\"type\": \"uniform\",\"segmentGranularity\": \"DAY\",\"rollup\": false}},\"tuningConfig\": {\"type\": \"kafka\",\"maxBytesInMemory\": 134217728,\"maxRowsPerSegment\": 500000,\"logParseExceptions\": true},\"ioConfig\": {\"type\": \"kafka\",\"topic\": \"telemetry-device-data\",\"consumerProperties\": {\"bootstrap.servers\": \"localhost:9092\"},\"taskCount\": 1,\"replicas\": 1,\"taskDuration\": \"PT1H\",\"useEarliestOffset\": true,\"completionTimeout\": \"PT1H\",\"inputFormat\": {\"type\": \"json\",\"flattenSpec\": {\"useFieldDiscovery\": true,\"fields\": [{ \"type\": \"path\",\"expr\": \"$.fcm_token\",\"name\": \"fcm_token\"},{\"type\": \"path\",\"expr\": \"$.city\",\"name\": \"city\"},{\"type\": \"path\",\"expr\": \"$.device_id\",\"name\": \"device_id\"},{\"type\": \"path\",\"expr\": \"$.device_spec\",\"name\": \"device_spec\"},{\"type\": \"path\",\"expr\": \"$.state\",\"name\": \"state\"},{\"type\": \"path\",\"expr\": \"$.uaspec.agent\",\"name\": \"uaspec_agent\"}]}},\"appendToExisting\": false}}}', 'md2.1_DAY', 'md2.1_DAY');") + postgresConnect.execute("insert into datasets(id, type, validation_config, extraction_config, dedup_config, data_schema, denorm_config, router_config, dataset_config, tags, data_version, status, created_by, updated_by, created_date, updated_date) VALUES('md3','master-dataset', '{\"validate\": true, \"mode\": \"Strict\", \"validation_mode\": {}}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 1036800}, \"batch_id\": \"id\"}', '{\"drop_duplicates\": true, \"dedup_key\": \"device_id\", \"dedup_period\": 1036800}', '{\"$schema\": \"https://json-schema.org/draft/2020-12/schema\", \"type\": \"object\", \"properties\": {\"fcm_token\": {\"type\": \"string\"}, \"city\": {\"type\": \"string\"}, \"device_id\": {\"type\": \"string\"}, \"device_spec\": {\"type\": \"string\"}, \"state\": {\"type\": \"string\"}, \"uaspec\": {\"type\": \"object\", \"properties\": {\"agent\": {\"type\": \"string\"}, \"ver\": {\"type\": \"string\"}, \"system\": {\"type\": \"string\"}, \"raw\": {\"type\": \"string\"}}}, \"country\": {\"type\": \"string\"}, \"country_code\": {\"type\": \"string\"}, \"producer_id\": {\"type\": \"string\"}, \"state_code_custom\": {\"type\": \"integer\"}, \"state_code\": {\"type\": \"string\"}, \"state_custom\": {\"type\": \"string\"}, \"district_custom\": {\"type\": \"string\"}, \"first_access\": {\"type\": \"integer\", \"format\": \"date-time\", \"suggestions\": [{\"message\": \"The Property ''first_access'' appears to be ''date-time'' format type.\", \"advice\": \"The System can index all data on this column\", \"resolutionType\": \"INDEX\", \"severity\": \"LOW\"}]}, \"api_last_updated_on\": {\"type\": \"integer\", \"format\": \"date-time\", \"suggestions\": [{\"message\": \"The Property ''api_last_updated_on'' appears to be ''date-time'' format type.\", \"advice\": \"The System can index all data on this column\", \"resolutionType\": \"INDEX\", \"severity\": \"LOW\"}]}, \"user_declared_district\": {\"type\": \"string\"}, \"user_declared_state\": {\"type\": \"string\"}}, \"required\": [\"first_access\", \"api_last_updated_on\", \"device_id\"]}', '{\"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"denorm_fields\": []}', '{\"topic\": \",d1\"}', '{\"data_key\": \"device_id\", \"timestamp_key\": \"\", \"exclude_fields\": [], \"entry_topic\": \"local.masterdata.ingest\", \"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"index_data\": true, \"redis_db\": 6}', NULL, NULL, 'Live', 'SYSTEM', 'SYSTEM', '2023-10-04 06:44:11.600', '2023-10-04 06:44:11.600');") + postgresConnect.execute("insert into datasources(id, dataset_id, ingestion_spec, datasource, datasource_ref) VALUES('md3_md3.1_DAY', 'md3', '{\"type\": \"kafka\",\"spec\": {\"dataSchema\": {\"dataSource\": \"telemetry-device-data.1_DAY\",\"dimensionsSpec\": {\"dimensions\": [{\"type\": \"string\",\"name\": \"fcm_token\"},{\"type\": \"string\",\"name\": \"city\"},{\"type\": \"string\",\"name\": \"device_id\"},{\"type\": \"string\",\"name\": \"device_spec\"},{\"type\": \"string\",\"name\": \"state\"},{\"type\": \"string\",\"name\": \"uaspec_agent\"}]},\"timestampSpec\": {\"column\": \"syncts\",\"format\": \"auto\"},\"metricsSpec\": [{\"type\": \"doubleSum\",\"name\": \"state_code_custom\",\"fieldName\": \"state_code_custom\"}],\"granularitySpec\": {\"type\": \"uniform\",\"segmentGranularity\": \"DAY\",\"rollup\": false}},\"tuningConfig\": {\"type\": \"kafka\",\"maxBytesInMemory\": 134217728,\"maxRowsPerSegment\": 500000,\"logParseExceptions\": true},\"ioConfig\": {\"type\": \"kafka\",\"topic\": \"telemetry-device-data\",\"consumerProperties\": {\"bootstrap.servers\": \"localhost:9092\"},\"taskCount\": 1,\"replicas\": 1,\"taskDuration\": \"PT1H\",\"useEarliestOffset\": true,\"completionTimeout\": \"PT1H\",\"inputFormat\": {\"type\": \"json\",\"flattenSpec\": {\"useFieldDiscovery\": true,\"fields\": [{ \"type\": \"path\",\"expr\": \"$.fcm_token\",\"name\": \"fcm_token\"},{\"type\": \"path\",\"expr\": \"$.city\",\"name\": \"city\"},{\"type\": \"path\",\"expr\": \"$.device_id\",\"name\": \"device_id\"},{\"type\": \"path\",\"expr\": \"$.device_spec\",\"name\": \"device_spec\"},{\"type\": \"path\",\"expr\": \"$.state\",\"name\": \"state\"},{\"type\": \"path\",\"expr\": \"$.uaspec.agent\",\"name\": \"uaspec_agent\"}]}},\"appendToExisting\": false}}}', 'md3.1_DAY', 'md3.1_DAY');") + postgresConnect.execute("insert into datasources(id, dataset_id, ingestion_spec, datasource, datasource_ref) VALUES('md3_md3.2_DAY', 'md3', '{\"type\": \"kafka\",\"spec\": {\"dataSchema\": {\"dataSource\": \"telemetry-device-data.1_DAY\",\"dimensionsSpec\": {\"dimensions\": [{\"type\": \"string\",\"name\": \"fcm_token\"},{\"type\": \"string\",\"name\": \"city\"},{\"type\": \"string\",\"name\": \"device_id\"},{\"type\": \"string\",\"name\": \"device_spec\"},{\"type\": \"string\",\"name\": \"state\"},{\"type\": \"string\",\"name\": \"uaspec_agent\"}]},\"timestampSpec\": {\"column\": \"syncts\",\"format\": \"auto\"},\"metricsSpec\": [{\"type\": \"doubleSum\",\"name\": \"state_code_custom\",\"fieldName\": \"state_code_custom\"}],\"granularitySpec\": {\"type\": \"uniform\",\"segmentGranularity\": \"DAY\",\"rollup\": false}},\"tuningConfig\": {\"type\": \"kafka\",\"maxBytesInMemory\": 134217728,\"maxRowsPerSegment\": 500000,\"logParseExceptions\": true},\"ioConfig\": {\"type\": \"kafka\",\"topic\": \"telemetry-device-data\",\"consumerProperties\": {\"bootstrap.servers\": \"localhost:9092\"},\"taskCount\": 1,\"replicas\": 1,\"taskDuration\": \"PT1H\",\"useEarliestOffset\": true,\"completionTimeout\": \"PT1H\",\"inputFormat\": {\"type\": \"json\",\"flattenSpec\": {\"useFieldDiscovery\": true,\"fields\": [{ \"type\": \"path\",\"expr\": \"$.fcm_token\",\"name\": \"fcm_token\"},{\"type\": \"path\",\"expr\": \"$.city\",\"name\": \"city\"},{\"type\": \"path\",\"expr\": \"$.device_id\",\"name\": \"device_id\"},{\"type\": \"path\",\"expr\": \"$.device_spec\",\"name\": \"device_spec\"},{\"type\": \"path\",\"expr\": \"$.state\",\"name\": \"state\"},{\"type\": \"path\",\"expr\": \"$.uaspec.agent\",\"name\": \"uaspec_agent\"}]}},\"appendToExisting\": false}}}', 'md3.2_DAY', 'md3.2_DAY');") + postgresConnect.execute("insert into datasets(id, type, validation_config, extraction_config, dedup_config, data_schema, denorm_config, router_config, dataset_config, tags, data_version, status, created_by, updated_by, created_date, updated_date) VALUES('md5','master-dataset', '{\"validate\": true, \"mode\": \"Strict\", \"validation_mode\": {}}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 1036800}, \"batch_id\": \"id\"}', '{\"drop_duplicates\": true, \"dedup_key\": \"device_id\", \"dedup_period\": 1036800}', '{\"$schema\": \"https://json-schema.org/draft/2020-12/schema\", \"type\": \"object\", \"properties\": {\"fcm_token\": {\"type\": \"string\"}, \"city\": {\"type\": \"string\"}, \"device_id\": {\"type\": \"string\"}, \"device_spec\": {\"type\": \"string\"}, \"state\": {\"type\": \"string\"}, \"uaspec\": {\"type\": \"object\", \"properties\": {\"agent\": {\"type\": \"string\"}, \"ver\": {\"type\": \"string\"}, \"system\": {\"type\": \"string\"}, \"raw\": {\"type\": \"string\"}}}, \"country\": {\"type\": \"string\"}, \"country_code\": {\"type\": \"string\"}, \"producer_id\": {\"type\": \"string\"}, \"state_code_custom\": {\"type\": \"integer\"}, \"state_code\": {\"type\": \"string\"}, \"state_custom\": {\"type\": \"string\"}, \"district_custom\": {\"type\": \"string\"}, \"first_access\": {\"type\": \"integer\", \"format\": \"date-time\", \"suggestions\": [{\"message\": \"The Property ''first_access'' appears to be ''date-time'' format type.\", \"advice\": \"The System can index all data on this column\", \"resolutionType\": \"INDEX\", \"severity\": \"LOW\"}]}, \"api_last_updated_on\": {\"type\": \"integer\", \"format\": \"date-time\", \"suggestions\": [{\"message\": \"The Property ''api_last_updated_on'' appears to be ''date-time'' format type.\", \"advice\": \"The System can index all data on this column\", \"resolutionType\": \"INDEX\", \"severity\": \"LOW\"}]}, \"user_declared_district\": {\"type\": \"string\"}, \"user_declared_state\": {\"type\": \"string\"}}, \"required\": [\"first_access\", \"api_last_updated_on\", \"device_id\"]}', '{\"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"denorm_fields\": []}', '{\"topic\": \",d1\"}', '{\"data_key\": \"device_id\", \"timestamp_key\": \"\", \"exclude_fields\": [], \"entry_topic\": \"local.masterdata.ingest\", \"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"index_data\": true, \"redis_db\": 9}', NULL, NULL, 'Live', 'SYSTEM', 'SYSTEM', '2023-10-04 06:44:11.600', '2023-10-04 06:44:11.600');") + postgresConnect.execute("insert into datasets(id, type, validation_config, extraction_config, dedup_config, data_schema, denorm_config, router_config, dataset_config, tags, data_version, status, created_by, updated_by, created_date, updated_date) VALUES('md4','master-dataset', '{\"validate\": true, \"mode\": \"Strict\", \"validation_mode\": {}}', '{\"is_batch_event\": true, \"extraction_key\": \"events\", \"dedup_config\": {\"drop_duplicates\": true, \"dedup_key\": \"id\", \"dedup_period\": 1036800}, \"batch_id\": \"id\"}', '{\"drop_duplicates\": true, \"dedup_key\": \"device_id\", \"dedup_period\": 1036800}', '{\"$schema\": \"https://json-schema.org/draft/2020-12/schema\", \"type\": \"object\", \"properties\": {\"fcm_token\": {\"type\": \"string\"}, \"city\": {\"type\": \"string\"}, \"device_id\": {\"type\": \"string\"}, \"device_spec\": {\"type\": \"string\"}, \"state\": {\"type\": \"string\"}, \"uaspec\": {\"type\": \"object\", \"properties\": {\"agent\": {\"type\": \"string\"}, \"ver\": {\"type\": \"string\"}, \"system\": {\"type\": \"string\"}, \"raw\": {\"type\": \"string\"}}}, \"country\": {\"type\": \"string\"}, \"country_code\": {\"type\": \"string\"}, \"producer_id\": {\"type\": \"string\"}, \"state_code_custom\": {\"type\": \"integer\"}, \"state_code\": {\"type\": \"string\"}, \"state_custom\": {\"type\": \"string\"}, \"district_custom\": {\"type\": \"string\"}, \"first_access\": {\"type\": \"integer\", \"format\": \"date-time\", \"suggestions\": [{\"message\": \"The Property ''first_access'' appears to be ''date-time'' format type.\", \"advice\": \"The System can index all data on this column\", \"resolutionType\": \"INDEX\", \"severity\": \"LOW\"}]}, \"api_last_updated_on\": {\"type\": \"integer\", \"format\": \"date-time\", \"suggestions\": [{\"message\": \"The Property ''api_last_updated_on'' appears to be ''date-time'' format type.\", \"advice\": \"The System can index all data on this column\", \"resolutionType\": \"INDEX\", \"severity\": \"LOW\"}]}, \"user_declared_district\": {\"type\": \"string\"}, \"user_declared_state\": {\"type\": \"string\"}}, \"required\": [\"first_access\", \"api_last_updated_on\", \"device_id\"]}', '{\"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"denorm_fields\": []}', '{\"topic\": \",d1\"}', '{\"data_key\": \"device_id\", \"timestamp_key\": \"\", \"exclude_fields\": [], \"entry_topic\": \"local.masterdata.ingest\", \"redis_db_host\": \"localhost\", \"redis_db_port\": 6340, \"index_data\": true, \"redis_db\": 3}', NULL, NULL, 'Live', 'SYSTEM', 'SYSTEM', '2023-10-04 06:44:11.600', '2023-10-04 06:44:11.600');") + postgresConnect.execute("insert into datasources(id, dataset_id, ingestion_spec, datasource, datasource_ref) VALUES('md4_md4.1_DAY', 'md4', '{\"type\": \"kafka\",\"spec\": {\"dataSchema\": {\"dataSource\": \"telemetry-device-data.1_DAY\",\"dimensionsSpec\": {\"dimensions\": [{\"type\": \"string\",\"name\": \"fcm_token\"},{\"type\": \"string\",\"name\": \"city\"},{\"type\": \"string\",\"name\": \"device_id\"},{\"type\": \"string\",\"name\": \"device_spec\"},{\"type\": \"string\",\"name\": \"state\"},{\"type\": \"string\",\"name\": \"uaspec_agent\"}]},\"timestampSpec\": {\"column\": \"syncts\",\"format\": \"auto\"},\"metricsSpec\": [{\"type\": \"doubleSum\",\"name\": \"state_code_custom\",\"fieldName\": \"state_code_custom\"}],\"granularitySpec\": {\"type\": \"uniform\",\"segmentGranularity\": \"DAY\",\"rollup\": false}},\"tuningConfig\": {\"type\": \"kafka\",\"maxBytesInMemory\": 134217728,\"maxRowsPerSegment\": 500000,\"logParseExceptions\": true},\"ioConfig\": {\"type\": \"kafka\",\"topic\": \"telemetry-device-data\",\"consumerProperties\": {\"bootstrap.servers\": \"localhost:9092\"},\"taskCount\": 1,\"replicas\": 1,\"taskDuration\": \"PT1H\",\"useEarliestOffset\": true,\"completionTimeout\": \"PT1H\",\"inputFormat\": {\"type\": \"json\",\"flattenSpec\": {\"useFieldDiscovery\": true,\"fields\": [{ \"type\": \"path\",\"expr\": \"$.fcm_token\",\"name\": \"fcm_token\"},{\"type\": \"path\",\"expr\": \"$.city\",\"name\": \"city\"},{\"type\": \"path\",\"expr\": \"$.device_id\",\"name\": \"device_id\"},{\"type\": \"path\",\"expr\": \"$.device_spec\",\"name\": \"device_spec\"},{\"type\": \"path\",\"expr\": \"$.state\",\"name\": \"state\"},{\"type\": \"path\",\"expr\": \"$.uaspec.agent\",\"name\": \"uaspec_agent\"}]}},\"appendToExisting\": false}}}', 'md4.1_DAY', 'md4.1_DAY');") + + } + + def checkTestTopicsOffset(): Unit = { + val topics: java.util.Collection[String] = new java.util.ArrayList[String]() + topics.add("spark.stats") + val consumerPollingTimeout: FiniteDuration = FiniteDuration(1, "minute") + EmbeddedKafka.withConsumer[String, String, Unit] { + val messagesBuffers = topics.asScala.map(_ -> ListBuffer.empty[(String, String)]).toMap + consumer => + consumer.subscribe(topics) + val recordIterator = consumer.poll(duration2JavaDuration(consumerPollingTimeout)).iterator() + while (recordIterator.hasNext) { + val record = recordIterator.next + messagesBuffers(record.topic) += (record.key() -> record.value()) + consumer.commitSync() + } + consumer.close() + val messages = messagesBuffers.mapValues(_.toList) + messages("spark.stats").length shouldBe 4 + + } + } + + it should "index datasets for single datasource and generate metrics for local storage" in { + val config = jobConfig.withValue("cloud.storage.container", ConfigValueFactory.fromAnyRef(s"${pwd}/obsrv-data")) + println("Path -> " + config.getString("cloud.storage.container")) + assertThrows[Exception]( + MasterDataProcessorIndexer.processDatasets(config, spark) + ) + } + + it should "index datasets for aws" in { + val config = jobConfig.withValue("cloud.storage.provider", ConfigValueFactory.fromAnyRef("aws")) + assertThrows[Exception]( + MasterDataProcessorIndexer.processDatasets(config, spark) + ) + } + + it should "index datasets for azure" in { + val config = jobConfig.withValue("cloud.storage.provider", ConfigValueFactory.fromAnyRef("azure")) + assertThrows[Exception]( + MasterDataProcessorIndexer.processDatasets(config, spark) + ) + } + + it should "index datasets for gcloud" in { + val config = jobConfig.withValue("cloud.storage.provider", ConfigValueFactory.fromAnyRef("gcloud")) + assertThrows[Exception]( + MasterDataProcessorIndexer.processDatasets(config, spark) + ) + } + + it should "index datasets for cephs3" in { + val config = jobConfig.withValue("cloud.storage.provider", ConfigValueFactory.fromAnyRef("cephs3")) + assertThrows[Exception]( + MasterDataProcessorIndexer.processDatasets(config, spark) + ) + } + + it should "index datasets for oci" in { + val config = jobConfig.withValue("cloud.storage.provider", ConfigValueFactory.fromAnyRef("oci")) + assertThrows[Exception]( + MasterDataProcessorIndexer.processDatasets(config, spark) + ) + } + + it should "not index datasets for unknown provider" in { + val provider = jobConfig.withValue("cloud.storage.provider", ConfigValueFactory.fromAnyRef("ibm")) + val dataset = DatasetRegistry.getDataset("md1") + MasterDataProcessorIndexer.processDatasets(provider, spark) + val edata = Edata(metric = Map(mockMetrics.getMetricName("failure_dataset_count") -> 1, mockMetrics.getMetricName("total_dataset_count") -> 1), labels = List(MetricLabel("job", "MasterDataIndexer"), MetricLabel("datasetId", dataset.get.id), MetricLabel("cloud", s"${provider.getString("cloud.storage.provider")}")), err = "FAILED", errMsg = "Unsupported provider") + } + + it should "throw exception when datasource is null" in { + val dataset = DatasetRegistry.getDataset("md5") + the[ObsrvException] thrownBy { + MasterDataProcessorIndexer.fetchDatasource(dataset.get) + } should have message ErrorConstants.ERR_DATASOURCE_NOT_FOUND.errorMsg + } + + it should "create a SparkSession with default master (local[*]) if not provided in the config" in { + val appName = "MasterDataIndexer" + val configString = "" // No spark.master in the config + val config = ConfigFactory.parseString(configString) + val sparkSession = CommonUtil.getSparkSession(appName, config) + sparkSession should not be null + sparkSession.conf.get("spark.master") shouldEqual "local[*]" + sparkSession.conf.get("spark.app.name") shouldEqual appName + } + + it should "create a SparkSession with provided app name and master" in { + val appName = "TestApp" + val master = "local[2]" + val configString = + s""" + |spark.master = "$master" + """.stripMargin + val config = ConfigFactory.parseString(configString) + val sparkSession = CommonUtil.getSparkSession(appName, config) + sparkSession should not be null + sparkSession.conf.get("spark.app.name") shouldEqual appName + } + + it should "throw exception while submitting ingestion" in { + val config = jobConfig.withValue("cloud.storage.container", ConfigValueFactory.fromAnyRef(s"${pwd}/obsrv-data")) + val dataset = DatasetRegistry.getDataset("md1") + assertThrows[Exception]( + MasterDataProcessorIndexer.processDataset(config, dataset.get, spark) + ) + } + + it should "return proper provider format for each cloud provider" in { + StorageUtil.providerFormat("aws") shouldEqual BlobProvider("s3a", "s3", "s3") + StorageUtil.providerFormat("azure") shouldEqual BlobProvider("wasbs", "azure", "azure") + StorageUtil.providerFormat("gcloud") shouldEqual BlobProvider("gs", "google", "gs") + StorageUtil.providerFormat("cephs3") shouldEqual BlobProvider("s3a", "s3", "s3") + StorageUtil.providerFormat("oci") shouldEqual BlobProvider("s3a", "s3", "s3") + } + +} diff --git a/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala b/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala index 89efec4c..88efb7a6 100644 --- a/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala +++ b/dataset-registry/src/main/scala/org/sunbird/obsrv/service/DatasetRegistryService.scala @@ -9,7 +9,6 @@ import java.io.File import java.sql.{ResultSet, Timestamp} object DatasetRegistryService { - private val configFile = new File("/data/flink/conf/baseconfig.conf") // $COVERAGE-OFF$ This code only executes within a flink cluster val config: Config = if (configFile.exists()) { @@ -37,7 +36,7 @@ object DatasetRegistryService { val dataset = parseDataset(result) (dataset.id, dataset) }).toMap - } finally { + } finally { postgresConnect.closeConnection() } } @@ -47,7 +46,7 @@ object DatasetRegistryService { val postgresConnect = new PostgresConnect(postgresConfig) try { val rs = postgresConnect.executeQuery(s"SELECT * FROM datasets where id='$id'") - if(rs.next()) { + if (rs.next()) { Some(parseDataset(rs)) } else { None @@ -118,9 +117,10 @@ object DatasetRegistryService { } def updateConnectorStats(id: String, lastFetchTimestamp: Timestamp, records: Long): Int = { - val query = s"UPDATE dataset_source_config SET connector_stats = jsonb_set(jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{records}'," + - s" ((COALESCE(connector_stats->>'records', '0')::int + $records)::text)::jsonb, true), '{last_fetch_timestamp}', " + - s"to_jsonb('$lastFetchTimestamp'::timestamp), true) WHERE id = '$id'" + val query = s"UPDATE dataset_source_config SET connector_stats = coalesce(connector_stats, '{}')::jsonb || " + + s"jsonb_build_object('records', COALESCE(connector_stats->>'records', '0')::int + '$records'::int) || " + + s"jsonb_build_object('last_fetch_timestamp', '${lastFetchTimestamp}'::timestamp) || " + + s"jsonb_build_object('last_run_timestamp', '${new Timestamp(System.currentTimeMillis())}'::timestamp) WHERE id = '$id';" updateRegistry(query) } @@ -155,7 +155,7 @@ object DatasetRegistryService { val datasetConfig = rs.getString("dataset_config") val status = rs.getString("status") val tagArray = rs.getArray("tags") - val tags = if(tagArray != null) tagArray.getArray.asInstanceOf[Array[String]] else null + val tags = if (tagArray != null) tagArray.getArray.asInstanceOf[Array[String]] else null val dataVersion = rs.getInt("data_version") Dataset(datasetId, datasetType, @@ -182,7 +182,7 @@ object DatasetRegistryService { DatasetSourceConfig(id = id, datasetId = datasetId, connectorType = connectorType, JSONUtil.deserialize[ConnectorConfig](connectorConfig), status, - if(connectorStats != null) Some(JSONUtil.deserialize[ConnectorStats](connectorStats)) else None + if (connectorStats != null) Some(JSONUtil.deserialize[ConnectorStats](connectorStats)) else None ) } @@ -204,7 +204,7 @@ object DatasetRegistryService { val status = rs.getString("status") val mode = rs.getString("mode") - DatasetTransformation(id, datasetId, fieldKey, JSONUtil.deserialize[TransformationFunction](transformationFunction), status, Some(if(mode != null) TransformMode.withName(mode) else TransformMode.Strict)) + DatasetTransformation(id, datasetId, fieldKey, JSONUtil.deserialize[TransformationFunction](transformationFunction), status, Some(if (mode != null) TransformMode.withName(mode) else TransformMode.Strict)) } } \ No newline at end of file diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/model/ErrorConstants.scala b/framework/src/main/scala/org/sunbird/obsrv/core/model/ErrorConstants.scala index efac0fbe..9c69aff2 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/model/ErrorConstants.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/model/ErrorConstants.scala @@ -3,7 +3,9 @@ package org.sunbird.obsrv.core.model object ErrorConstants extends Enumeration { type Error = ErrorValue + case class ErrorValue(errorCode: String, errorMsg: String) + protected final def ErrorInternalValue(errorCode: String, errorMsg: String): ErrorValue = { ErrorValue(errorCode, errorMsg) } @@ -26,7 +28,7 @@ object ErrorConstants extends Enumeration { val DENORM_KEY_NOT_A_STRING_OR_NUMBER = ErrorInternalValue("ERR_DENORM_1015", "Denorm key value is not a String or Number") val DENORM_DATA_NOT_FOUND = ErrorInternalValue("ERR_DENORM_1016", "Denorm data not found for the given key") val MISSING_DATASET_CONFIG_KEY = ErrorInternalValue("ERR_MASTER_DATA_1017", "Master dataset configuration key is missing") - val ERR_INVALID_EVENT = ErrorInternalValue("ERR_EXT_1018", "Invalid JSON event, error while deserializing the event") + val ERR_INVALID_EVENT = ErrorInternalValue("ERR_EXT_1018", "Invalid JSON event, error while deserializing the event") val INDEX_KEY_MISSING_OR_BLANK = ErrorInternalValue("ERR_ROUTER_1019", "Unable to index data as the timestamp key is missing or blank or not a datetime value") val INVALID_EXPR_FUNCTION = ErrorInternalValue("ERR_TRANSFORM_1020", "Transformation expression function is not valid") val ERR_EVAL_EXPR_FUNCTION = ErrorInternalValue("ERR_TRANSFORM_1021", "Unable to evaluate the transformation expression function") @@ -36,5 +38,9 @@ object ErrorConstants extends Enumeration { val SYSTEM_SETTING_INVALID_TYPE = ErrorInternalValue("ERR_SYSTEM_SETTING_1025", "Invalid value type for system setting") val SYSTEM_SETTING_NOT_FOUND = ErrorInternalValue("ERR_SYSTEM_SETTING_1026", "System setting not found for requested key") val SYSTEM_SETTING_DEFAULT_VALUE_NOT_FOUND = ErrorInternalValue("ERR_SYSTEM_SETTING_1027", "Default value not found for requested key") - -} + val HTTP_SERVER_ERR = ErrorInternalValue("ERR_SERVER_CONNECTION_1028", "Connection refused.") + val ERR_DATASOURCE_NOT_FOUND = ErrorInternalValue("ERR_MDP_1029", "Datasource not found.") + val UNSUPPORTED_PROVIDER = ErrorInternalValue("ERR_UNSUPPORTED_PROVIDER_1030", "Unsupported provider.") + val ERR_SUBMIT_INGESTION_FAILED = ErrorInternalValue("ERR_MDP_1031", "Unable to submit ingestion task to druid.") + val ERR_DELETE_DATASOURCE_FAILED = ErrorInternalValue("ERR_MDP_1032", "Failed to delete datasource.") +} \ No newline at end of file diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/model/SystemConfig.scala b/framework/src/main/scala/org/sunbird/obsrv/core/model/SystemConfig.scala index 118e8c53..decb916f 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/model/SystemConfig.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/model/SystemConfig.scala @@ -88,21 +88,26 @@ object SystemConfigService { @throws[Exception] def getAllSystemSettings: List[SystemSetting] = { val postgresConnect = new PostgresConnect(postgresConfig) - val rs = postgresConnect.executeQuery("SELECT * FROM system_settings") - val result = Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => { - parseSystemSetting(result) - }).toList - postgresConnect.closeConnection() - result + try { + val rs = postgresConnect.executeQuery("SELECT * FROM system_settings") + val result = Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => { + parseSystemSetting(result) + }).toList + result + } finally { + postgresConnect.closeConnection() + } } @throws[Exception] def getSystemSetting(key: String): Option[SystemSetting] = { val postgresConnect = new PostgresConnect(postgresConfig) - val rs = postgresConnect.executeQuery(s"SELECT * FROM system_settings WHERE key = '$key'") - if (rs.next) { - Option(parseSystemSetting(rs)) - } else None + try { + val rs = postgresConnect.executeQuery(s"SELECT * FROM system_settings WHERE key = '$key'") + if (rs.next) Option(parseSystemSetting(rs)) else None + } finally { + postgresConnect.closeConnection() + } } private def parseSystemSetting(rs: ResultSet): SystemSetting = { @@ -114,4 +119,4 @@ object SystemConfigService { SystemSetting(key, value, category, valueType, Option(label)) } -} +} \ No newline at end of file diff --git a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseJobConfig.scala b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseJobConfig.scala index dc6eaa66..cb4657c3 100644 --- a/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseJobConfig.scala +++ b/framework/src/main/scala/org/sunbird/obsrv/core/streaming/BaseJobConfig.scala @@ -17,7 +17,8 @@ abstract class BaseJobConfig[T](val config: Config, val jobName: String) extends implicit val metricTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) - def defaultDatasetID: String = SystemConfig.getString("defaultDatasetId", "ALL") + lazy val defaultDatasetID: String = SystemConfig.getString("defaultDatasetId", "ALL") + private val kafkaProducerBrokerServers: String = config.getString("kafka.producer.broker-servers") private val kafkaConsumerBrokerServers: String = config.getString("kafka.consumer.broker-servers") // Producer Properties @@ -91,4 +92,4 @@ abstract class BaseJobConfig[T](val config: Config, val jobName: String) extends val CONST_OBSRV_META = "obsrv_meta" val CONST_DATASET = "dataset" val CONST_EVENT = "event" -} +} \ No newline at end of file diff --git a/pipeline/kafka-connector/pom.xml b/pipeline/kafka-connector/pom.xml deleted file mode 100644 index 65aa4d68..00000000 --- a/pipeline/kafka-connector/pom.xml +++ /dev/null @@ -1,263 +0,0 @@ - - - - 4.0.0 - - org.sunbird.obsrv - pipeline - 1.0 - - - org.sunbird.obsrv.pipeline - kafka-connector - 1.0.0 - jar - Kafka Connector - - Reads data from source kafka topic(s) and writes them to a configurable topic - - - - UTF-8 - 1.4.0 - - - - - org.apache.flink - flink-streaming-scala_${scala.maj.version} - ${flink.version} - provided - - - org.sunbird.obsrv - dataset-registry - 1.0.0 - - - org.apache.kafka - kafka-clients - - - - - joda-time - joda-time - 2.12.5 - - - com.fasterxml.jackson.datatype - jackson-datatype-joda - 2.15.2 - - - org.sunbird.obsrv - framework - 1.0.0 - - - org.sunbird.obsrv - framework - 1.0.0 - test-jar - test - - - org.sunbird.obsrv - dataset-registry - 1.0.0 - test-jar - test - - - org.apache.flink - flink-test-utils - ${flink.version} - test - - - org.apache.flink - flink-runtime - ${flink.version} - test - tests - - - org.apache.kafka - kafka-clients - ${kafka.version} - test - - - org.apache.kafka - kafka_${scala.maj.version} - ${kafka.version} - test - - - com.github.codemonstur - embedded-redis - 1.0.0 - test - - - io.github.embeddedkafka - embedded-kafka_2.12 - 3.4.0 - test - - - io.zonky.test - embedded-postgres - 2.0.3 - test - - - org.apache.flink - flink-streaming-java - ${flink.version} - test - tests - - - org.scalatest - scalatest_2.12 - 3.0.6 - test - - - org.mockito - mockito-core - 3.3.3 - test - - - - - src/main/scala - src/test/scala - - - org.apache.maven.plugins - maven-compiler-plugin - 3.8.1 - - 11 - - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.1 - - - - package - - shade - - - - - com.google.code.findbugs:jsr305 - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - org.sunbird.obsrv.connector.task.KafkaConnectorStreamTask - - - - reference.conf - - - - - - - - - net.alchim31.maven - scala-maven-plugin - 4.4.0 - - ${java.target.runtime} - ${java.target.runtime} - ${scala.version} - false - - - - scala-compile-first - process-resources - - add-source - compile - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - - maven-surefire-plugin - 2.22.2 - - true - - - - - org.scalatest - scalatest-maven-plugin - 1.0 - - ${project.build.directory}/surefire-reports - . - dp-duplication-testsuite.txt - - - - test - - test - - - - - - org.scoverage - scoverage-maven-plugin - ${scoverage.plugin.version} - - ${scala.version} - true - true - - - - - - diff --git a/pipeline/kafka-connector/src/main/resources/kafka-connector.conf b/pipeline/kafka-connector/src/main/resources/kafka-connector.conf deleted file mode 100644 index 9b5c575b..00000000 --- a/pipeline/kafka-connector/src/main/resources/kafka-connector.conf +++ /dev/null @@ -1,16 +0,0 @@ -include "baseconfig.conf" - -kafka { - input.topic = ${job.env}".test" - // output.topic = ${job.env}".ingest" - event.max.size = "1048576" # Max is only 1MB - groupId = ${job.env}"-kafkaconnector-group" - producer { - max-request-size = 5242880 - } -} - -task { - consumer.parallelism = 1 - downstream.operators.parallelism = 1 -} \ No newline at end of file diff --git a/pipeline/kafka-connector/src/main/scala/org/sunbird/obsrv/connector/task/KafkaConnectorConfig.scala b/pipeline/kafka-connector/src/main/scala/org/sunbird/obsrv/connector/task/KafkaConnectorConfig.scala deleted file mode 100644 index 05ccaa8e..00000000 --- a/pipeline/kafka-connector/src/main/scala/org/sunbird/obsrv/connector/task/KafkaConnectorConfig.scala +++ /dev/null @@ -1,25 +0,0 @@ -package org.sunbird.obsrv.connector.task - -import com.typesafe.config.Config -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.streaming.api.scala.OutputTag -import org.sunbird.obsrv.core.streaming.BaseJobConfig - -import scala.collection.mutable - -class KafkaConnectorConfig(override val config: Config) extends BaseJobConfig[String](config, "KafkaConnectorJob") { - - private val serialVersionUID = 2905979435603791379L - - implicit val mapTypeInfo: TypeInformation[mutable.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[mutable.Map[String, AnyRef]]) - implicit val stringTypeInfo: TypeInformation[String] = TypeExtractor.getForClass(classOf[String]) - - override def inputTopic(): String = "" - - override def inputConsumer(): String = "" - - override def successTag(): OutputTag[String] = OutputTag[String]("dummy-events") - - override def failedEventsOutputTag(): OutputTag[String] = OutputTag[String]("failed-events") -} diff --git a/pipeline/kafka-connector/src/main/scala/org/sunbird/obsrv/connector/task/KafkaConnectorStreamTask.scala b/pipeline/kafka-connector/src/main/scala/org/sunbird/obsrv/connector/task/KafkaConnectorStreamTask.scala deleted file mode 100644 index 175f64fa..00000000 --- a/pipeline/kafka-connector/src/main/scala/org/sunbird/obsrv/connector/task/KafkaConnectorStreamTask.scala +++ /dev/null @@ -1,71 +0,0 @@ -package org.sunbird.obsrv.connector.task - -import com.typesafe.config.ConfigFactory -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.joda.time.{DateTime, DateTimeZone} -import org.sunbird.obsrv.core.streaming.{BaseStreamTask, FlinkKafkaConnector} -import org.sunbird.obsrv.core.util.{FlinkUtil, JSONUtil} -import org.sunbird.obsrv.registry.DatasetRegistry - -import java.io.File - -class KafkaConnectorStreamTask(config: KafkaConnectorConfig, kafkaConnector: FlinkKafkaConnector) extends BaseStreamTask[String] { - - private val serialVersionUID = -7729362727131516112L - - // $COVERAGE-OFF$ Disabling scoverage as the below code can only be invoked within flink cluster - def process(): Unit = { - implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config) - env.execute(config.jobName) - } - - override def processStream(dataStream: DataStream[String]): DataStream[String] = { - null - } - // $COVERAGE-ON$ - - def process(env: StreamExecutionEnvironment): Unit = { - val datasetSourceConfig = DatasetRegistry.getAllDatasetSourceConfig() - datasetSourceConfig.map { configList => - configList.filter(_.connectorType.equalsIgnoreCase("kafka")).map { - dataSourceConfig => - val dataStream: DataStream[String] = getStringDataStream(env, config, List(dataSourceConfig.connectorConfig.topic), - config.kafkaConsumerProperties(kafkaBrokerServers = Some(dataSourceConfig.connectorConfig.kafkaBrokers), - kafkaConsumerGroup = Some(s"kafka-${dataSourceConfig.connectorConfig.topic}-consumer")), - consumerSourceName = s"kafka-${dataSourceConfig.connectorConfig.topic}", kafkaConnector) - val datasetId = dataSourceConfig.datasetId - val kafkaOutputTopic = DatasetRegistry.getDataset(datasetId).get.datasetConfig.entryTopic - val resultStream: DataStream[String] = dataStream.map { streamData: String => { - val syncts = java.lang.Long.valueOf(new DateTime(DateTimeZone.UTC).getMillis) - JSONUtil.getJsonType(streamData) match { - case "ARRAY" => s"""{"dataset":"$datasetId","syncts":$syncts,"events":$streamData}""" - case _ => s"""{"dataset":"$datasetId","syncts":$syncts,"event":$streamData}""" - } - } - }.returns(classOf[String]) - resultStream.sinkTo(kafkaConnector.kafkaSink[String](kafkaOutputTopic)) - .name(s"$datasetId-kafka-connector-sink").uid(s"$datasetId-kafka-connector-sink") - .setParallelism(config.downstreamOperatorsParallelism) - } - } - } - -} - -// $COVERAGE-OFF$ Disabling scoverage as the below code can only be invoked within flink cluster -object KafkaConnectorStreamTask { - - def main(args: Array[String]): Unit = { - val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path")) - val config = configFilePath.map { - path => ConfigFactory.parseFile(new File(path)).resolve() - }.getOrElse(ConfigFactory.load("kafka-connector.conf").withFallback(ConfigFactory.systemEnvironment())) - val kafkaConnectorConfig = new KafkaConnectorConfig(config) - val kafkaUtil = new FlinkKafkaConnector(kafkaConnectorConfig) - val task = new KafkaConnectorStreamTask(kafkaConnectorConfig, kafkaUtil) - task.process() - } -} -// $COVERAGE-ON$ \ No newline at end of file diff --git a/pipeline/kafka-connector/src/test/resources/test.conf b/pipeline/kafka-connector/src/test/resources/test.conf deleted file mode 100644 index 87306136..00000000 --- a/pipeline/kafka-connector/src/test/resources/test.conf +++ /dev/null @@ -1,14 +0,0 @@ -include "base-test.conf" - -kafka { - input.topic = "flink.test" - groupId = "flink-kafkaconnector-group" - producer { - max-request-size = 5242880 - } -} - -task { - consumer.parallelism = 1 - downstream.operators.parallelism = 1 -} \ No newline at end of file diff --git a/pipeline/kafka-connector/src/test/scala/org/sunbird/obsrv/connector/KafkaConnectorStreamTestSpec.scala b/pipeline/kafka-connector/src/test/scala/org/sunbird/obsrv/connector/KafkaConnectorStreamTestSpec.scala deleted file mode 100644 index bf86eafa..00000000 --- a/pipeline/kafka-connector/src/test/scala/org/sunbird/obsrv/connector/KafkaConnectorStreamTestSpec.scala +++ /dev/null @@ -1,126 +0,0 @@ -package org.sunbird.obsrv.connector - -import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.test.util.MiniClusterWithClientResource -import org.apache.kafka.common.serialization.StringDeserializer -import org.scalatest.Matchers._ -import org.sunbird.obsrv.BaseMetricsReporter -import org.sunbird.obsrv.connector.task.{KafkaConnectorConfig, KafkaConnectorStreamTask} -import org.sunbird.obsrv.core.streaming.FlinkKafkaConnector -import org.sunbird.obsrv.core.util.{FlinkUtil, JSONUtil, PostgresConnect} -import org.sunbird.obsrv.spec.BaseSpecWithDatasetRegistry - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future -import scala.concurrent.duration._ - -class KafkaConnectorStreamTestSpec extends BaseSpecWithDatasetRegistry { - - val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setConfiguration(testConfiguration()) - .setNumberSlotsPerTaskManager(1) - .setNumberTaskManagers(1) - .build) - - val pConfig = new KafkaConnectorConfig(config) - val kafkaConnector = new FlinkKafkaConnector(pConfig) - val customKafkaConsumerProperties: Map[String, String] = Map[String, String]("auto.offset.reset" -> "earliest", "group.id" -> "test-event-schema-group") - implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = - EmbeddedKafkaConfig( - kafkaPort = 9093, - zooKeeperPort = 2183, - customConsumerProperties = customKafkaConsumerProperties - ) - implicit val deserializer: StringDeserializer = new StringDeserializer() - private val VALID_JSON_EVENT = """{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}""" - private val VALID_JSON_EVENT_ARRAY = """[{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}}]""" - private val INVALID_JSON_EVENT = """{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}""" - - def testConfiguration(): Configuration = { - val config = new Configuration() - config.setString("metrics.reporter", "job_metrics_reporter") - config.setString("metrics.reporter.job_metrics_reporter.class", classOf[BaseMetricsReporter].getName) - config - } - - override def beforeAll(): Unit = { - super.beforeAll() - BaseMetricsReporter.gaugeMetrics.clear() - EmbeddedKafka.start()(embeddedKafkaConfig) - prepareTestData() - createTestTopics() - EmbeddedKafka.publishStringMessageToKafka("d1-topic", VALID_JSON_EVENT) - EmbeddedKafka.publishStringMessageToKafka("d2-topic", VALID_JSON_EVENT_ARRAY) - EmbeddedKafka.publishStringMessageToKafka("d3-topic", INVALID_JSON_EVENT) - - flinkCluster.before() - } - - private def prepareTestData(): Unit = { - val postgresConnect = new PostgresConnect(postgresConfig) - postgresConnect.execute("insert into datasets(id, type, data_schema, router_config, dataset_config, status, created_by, updated_by, created_date, updated_date, tags) values ('d3', 'dataset', '{\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"id\":\"https://sunbird.obsrv.com/test.json\",\"title\":\"Test Schema\",\"description\":\"Test Schema\",\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"vehicleCode\":{\"type\":\"string\"},\"date\":{\"type\":\"string\"},\"dealer\":{\"type\":\"object\",\"properties\":{\"dealerCode\":{\"type\":\"string\"},\"locationId\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"},\"phone\":{\"type\":\"string\"}},\"required\":[\"dealerCode\",\"locationId\"]},\"metrics\":{\"type\":\"object\",\"properties\":{\"bookingsTaken\":{\"type\":\"number\"},\"deliveriesPromised\":{\"type\":\"number\"},\"deliveriesDone\":{\"type\":\"number\"}}}},\"required\":[\"id\",\"vehicleCode\",\"date\",\"dealer\",\"metrics\"]}', '{\"topic\":\"d2-events\"}', '{\"data_key\":\"id\",\"timestamp_key\":\"date\",\"entry_topic\":\"ingest\"}', 'Live', 'System', 'System', now(), now(), ARRAY['Tag1','Tag2']);") - postgresConnect.execute("insert into dataset_source_config values('sc1', 'd1', 'kafka', '{\"kafkaBrokers\":\"localhost:9093\",\"topic\":\"d1-topic\"}', 'Live', null, 'System', 'System', now(), now());") - postgresConnect.execute("insert into dataset_source_config values('sc2', 'd1', 'rdbms', '{\"type\":\"postgres\",\"tableName\":\"test-table\"}', 'Live', null, 'System', 'System', now(), now());") - postgresConnect.execute("insert into dataset_source_config values('sc3', 'd2', 'kafka', '{\"kafkaBrokers\":\"localhost:9093\",\"topic\":\"d2-topic\"}', 'Live', null, 'System', 'System', now(), now());") - postgresConnect.execute("insert into dataset_source_config values('sc4', 'd3', 'kafka', '{\"kafkaBrokers\":\"localhost:9093\",\"topic\":\"d3-topic\"}', 'Live', null, 'System', 'System', now(), now());") - postgresConnect.closeConnection() - } - - override def afterAll(): Unit = { - super.afterAll() - flinkCluster.after() - EmbeddedKafka.stop() - } - - def createTestTopics(): Unit = { - List( - "d1-topic", "d2-topic", "d3-topic", pConfig.kafkaSystemTopic, "ingest" - ).foreach(EmbeddedKafka.createCustomTopic(_)) - } - - "KafkaConnectorStreamTestSpec" should "validate the kafka connector job" in { - - implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(pConfig) - val task = new KafkaConnectorStreamTask(pConfig, kafkaConnector) - task.process(env) - Future { - env.execute(pConfig.jobName) - } - - val ingestEvents = EmbeddedKafka.consumeNumberMessagesFrom[String]("ingest", 3, timeout = 30.seconds) - validateIngestEvents(ingestEvents) - - pConfig.inputTopic() should be ("") - pConfig.inputConsumer() should be ("") - pConfig.successTag().getId should be ("dummy-events") - pConfig.failedEventsOutputTag().getId should be ("failed-events") - } - - private def validateIngestEvents(ingestEvents: List[String]): Unit = { - ingestEvents.size should be(3) - ingestEvents.foreach{event: String => { - if(event.contains(""""dataset":"d1"""")) { - JSONUtil.getJsonType(event) should be ("OBJECT") - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](event) - eventMap.get("dataset").get.asInstanceOf[String] should be ("d1") - eventMap.get("syncts").isDefined should be (true) - eventMap.contains("event") should be (true) - } else if(event.contains(""""dataset":"d2"""")) { - JSONUtil.getJsonType(event) should be("OBJECT") - val eventMap = JSONUtil.deserialize[Map[String, AnyRef]](event) - eventMap.get("dataset").get.asInstanceOf[String] should be("d2") - eventMap.get("syncts").isDefined should be(true) - eventMap.contains("events") should be(true) - JSONUtil.getJsonType(JSONUtil.serialize(eventMap.get("events"))) should be("ARRAY") - } else { - JSONUtil.getJsonType(event) should be ("NOT_A_JSON") - event.contains(""""event":{"id":"1234","vehicleCode":"HYUN-CRE-D6","date":"2023-03-01","dealer":{"dealerCode":"KUNUnited","locationId":"KUN1","email":"dealer1@gmail.com","phone":"9849012345"},"metrics":{"bookingsTaken":50,"deliveriesPromised":20,"deliveriesDone":19}""") should be(true) - } - }} - - } - -} \ No newline at end of file diff --git a/pipeline/pom.xml b/pipeline/pom.xml index 25d19b66..9c37e956 100644 --- a/pipeline/pom.xml +++ b/pipeline/pom.xml @@ -22,7 +22,6 @@ transformer druid-router pipeline-merged - kafka-connector master-data-processor