diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index ed9880a0..f92c23fc 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -10,8 +10,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Run build - run: "./gradlew clean :app:shadowJar" + - name: Gradle build with cache + uses: burrunan/gradle-cache-action@v1 + with: + arguments: ":app:shadowJar" - name: Run integration tests id: tests uses: data-catering/insta-integration@v1 diff --git a/app/src/main/resources/application.conf b/app/src/main/resources/application.conf index 14898ef3..3966ef0d 100644 --- a/app/src/main/resources/application.conf +++ b/app/src/main/resources/application.conf @@ -1,97 +1,144 @@ flags { - enableGeneratePlanAndTasks = false - enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS} enableCount = true enableCount = ${?ENABLE_COUNT} enableGenerateData = true enableGenerateData = ${?ENABLE_GENERATE_DATA} + enableGeneratePlanAndTasks = false + enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS} enableRecordTracking = false enableRecordTracking = ${?ENABLE_RECORD_TRACKING} enableDeleteGeneratedRecords = false enableDeleteGeneratedRecords = ${?ENABLE_DELETE_GENERATED_RECORDS} enableFailOnError = true enableFailOnError = ${?ENABLE_FAIL_ON_ERROR} + enableUniqueCheck = true + enableUniqueCheck = ${?ENABLE_UNIQUE_CHECK} enableSinkMetadata = true - enableSinkMetadata = ${?ENABLED_SINK_METADATA} + enableSinkMetadata = ${?ENABLE_SINK_METADATA} enableSaveReports = true - enableSaveReports = ${?ENABLED_SAVE_REPORTS} + enableSaveReports = ${?ENABLE_SAVE_REPORTS} enableValidation = false - enableValidation = ${?ENABLED_VALIDATION} + enableValidation = ${?ENABLE_VALIDATION} + enableGenerateValidations = false + enableGenerateValidations = ${?ENABLE_GENERATE_VALIDATIONS} + enableAlerts = false + enableAlerts = ${?ENABLE_ALERTS} } folders { generatedPlanAndTaskFolderPath = "/tmp" generatedPlanAndTaskFolderPath = ${?GENERATED_PLAN_AND_TASK_FOLDER_PATH} - planFilePath = "/plan/customer-create-plan.yaml" + planFilePath = "app/src/test/resources/sample/plan/customer-create-plan.yaml" planFilePath = ${?PLAN_FILE_PATH} - taskFolderPath = "/task" + taskFolderPath = "app/src/test/resources/sample/task" taskFolderPath = ${?TASK_FOLDER_PATH} recordTrackingFolderPath = "/tmp/data/generated/recordTracking" recordTrackingFolderPath = ${?RECORD_TRACKING_FOLDER_PATH} + recordTrackingForValidationFolderPath = "/tmp/data/validation/recordTracking" + recordTrackingForValidationFolderPath = ${?RECORD_TRACKING_VALIDATION_FOLDER_PATH} generatedReportsFolderPath = "app/src/test/resources/sample/html" generatedReportsFolderPath = ${?GENERATED_REPORTS_FOLDER_PATH} - validationFolderPath = "app/src/test/resources/sample/validation" - validationFolderPath = ${?VALIDATION_FOLDER_PATH} } metadata { numRecordsFromDataSource = 10000 - numRecordsFromDataSource = ${?METADATA_NUM_RECORDS_FROM_DATA_SOURCE} + numRecordsFromDataSource = ${?NUM_RECORDS_FROM_DATA_SOURCE} numRecordsForAnalysis = 10000 - numRecordsForAnalysis = ${?METADATA_NUM_RECORDS_FOR_ANALYSIS} + numRecordsForAnalysis = ${?NUM_RECORDS_FOR_ANALYSIS} oneOfDistinctCountVsCountThreshold = 0.1 - oneOfDistinctCountVsCountThreshold = ${?METADATA_ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD} + oneOfDistinctCountVsCountThreshold = ${?ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD} + oneOfMinCount = 1000 + oneOfMinCount = ${?ONE_OF_MIN_COUNT} + numGeneratedSamples = 10 + numGeneratedSamples = ${?NUM_GENERATED_SAMPLES} } generation { - numRecordsPerBatch = 100000 - numRecordsPerBatch = ${?GENERATION_NUM_RECORDS_PER_BATCH} + numRecordsPerBatch = 1000000 + numRecordsPerBatch = ${?NUM_RECORDS_PER_BATCH} } -validation {} -alert {} +validation { + numSampleErrorRecords = 5 + numSampleErrorRecords = ${?NUM_SAMPLE_ERROR_RECORDS} + enableDeleteRecordTrackingFiles = true + enableDeleteRecordTrackingFiles = ${?ENABLE_DELETE_RECORD_TRACKING_FILES} +} + +alert { + triggerOn = "all" + triggerOn = ${?ALERT_TRIGGER_ON} + slackAlertConfig { + token = "" + token = ${?ALERT_SLACK_TOKEN} + channels = [] + channels = ${?ALERT_SLACK_CHANNELS} + } +} runtime { master = "local[*]" master = ${?DATA_CATERER_MASTER} config { - "spark.sql.cbo.enabled" = "true" - "spark.sql.adaptive.enabled" = "true" - "spark.sql.cbo.planStats.enabled" = "true" - "spark.sql.legacy.allowUntypedScalaUDF" = "true" - "spark.sql.statistics.histogram.enabled" = "true" - "spark.sql.shuffle.partitions" = "10" - "spark.sql.catalog.postgres" = "" - "spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog" - "spark.hadoop.fs.s3a.directory.marker.retention" = "keep" - "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true" - #"spark.hadoop.fs.defaultFS" = "s3a://my-bucket" + "spark.sql.cbo.enabled": "true", + "spark.sql.adaptive.enabled": "true", + "spark.sql.cbo.planStats.enabled": "true", + "spark.sql.legacy.allowUntypedScalaUDF": "true", + "spark.sql.legacy.allowParameterlessCount": "true", + "spark.sql.statistics.histogram.enabled": "true", + "spark.sql.shuffle.partitions": "10", + "spark.sql.catalog.postgres": "", + "spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog", + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "hadoop", + "spark.hadoop.fs.s3a.directory.marker.retention": "keep", + "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true", + "spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem", + "spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem", + "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + } +} + +json { + json { + } +} + +csv { + csv { + } +} + +delta { + delta { + } +} + +iceberg { + iceberg { + } +} + +orc { + orc { + } +} + +parquet { + parquet { } } -# connection type jdbc { -# connection name postgres { -# connection details url = "jdbc:postgresql://localhost:5432/customer" url = ${?POSTGRES_URL} user = "postgres" - user = ${?POSTGRES_USERNAME} + user = ${?POSTGRES_USER} password = "postgres" password = ${?POSTGRES_PASSWORD} driver = "org.postgresql.Driver" } - postgresDvd { - url = "jdbc:postgresql://localhost:5432/dvdrental" - url = ${?POSTGRES_URL} - user = "postgres" - user = ${?POSTGRES_USERNAME} - password = "postgres" - password = ${?POSTGRES_PASSWORD} - driver = "org.postgresql.Driver" - stringtype = "unspecified" - } mysql { url = "jdbc:mysql://localhost:3306/customer" url = ${?MYSQL_URL} @@ -103,6 +150,7 @@ jdbc { } } + org.apache.spark.sql.cassandra { cassandra { spark.cassandra.connection.host = "localhost" @@ -140,30 +188,10 @@ jms { kafka { kafka { - kafka.bootstrap.servers = "localhost:9092" + kafka.bootstrap.servers = "localhost:29092" kafka.bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS} } } -parquet { - parquet { - path = "app/src/test/resources/sample" - path = ${?PARQUET_PATH} - } -} - -json { - json { - path = "app/src/test/resources/sample" - path = ${?JSON_PATH} - } -} - -csv { - csv { - path = "app/src/test/resources/sample" - path = ${?CSV_PATH} - } -} datastax-java-driver.advanced.metadata.schema.refreshed-keyspaces = [ "/.*/" ] diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala index f2f1e7c5..97a32ca7 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala @@ -68,14 +68,19 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String s"new-num-records=${additionalDf.count()}, actual-num-records=$dfRecordCount") } - while (targetNumRecords != dfRecordCount && retries < maxRetries) { - retries += 1 - generateAdditionalRecords() - } - if (targetNumRecords != dfRecordCount && retries == maxRetries) { - LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " + - s"Can be due to limited number of potential unique records, " + - s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}") + //if random amount of records, don't try to regenerate more records + if (s.count.generator.isEmpty && s.count.perColumn.forall(_.generator.isEmpty)) { + while (targetNumRecords != dfRecordCount && retries < maxRetries) { + retries += 1 + generateAdditionalRecords() + } + if (targetNumRecords != dfRecordCount && retries == maxRetries) { + LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " + + s"Can be due to limited number of potential unique records, " + + s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}") + } + } else { + LOGGER.debug("Random amount of records generated, not attempting to generate more records") } trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount)) diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala index 6dbd1515..94b1278d 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_GENERATED_REPORTS_FOLDER_PATH, SPECIFIC_DATA_SOURCE_OPTIONS} import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Field, Plan, Step, Task} import io.github.datacatering.datacaterer.core.listener.SparkRecordListener -import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_VALIDATIONS_HTML} +import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_CATERING_SVG, REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_MAIN_CSS, REPORT_RESULT_JSON, REPORT_TASK_HTML, REPORT_VALIDATIONS_HTML} import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult} import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor import io.github.datacatering.datacaterer.core.util.FileUtil.writeStringToFile @@ -50,41 +50,19 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig try { fileWriter(REPORT_HOME_HTML, htmlWriter.index(plan, stepSummary, taskSummary, dataSourceSummary, validationResults, dataCatererConfiguration.flagsConfig, sparkRecordListener)) - fileWriter("tasks.html", htmlWriter.taskDetails(taskSummary)) + fileWriter(REPORT_TASK_HTML, htmlWriter.taskDetails(taskSummary)) fileWriter(REPORT_FIELDS_HTML, htmlWriter.stepDetails(stepSummary)) fileWriter(REPORT_DATA_SOURCES_HTML, htmlWriter.dataSourceDetails(stepSummary.flatMap(_.dataSourceResults))) fileWriter(REPORT_VALIDATIONS_HTML, htmlWriter.validations(validationResults, validationConfig)) - writeStringToFile(fileSystem, s"$reportFolder/results.json", resultsAsJson(generationResult, validationResults)) - - copyHtmlResources(fileSystem, reportFolder) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_RESULT_JSON", resultsAsJson(generationResult, validationResults)) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_DATA_CATERING_SVG", htmlWriter.dataCateringSvg) + writeStringToFile(fileSystem, s"$reportFolder/$REPORT_MAIN_CSS", htmlWriter.mainCss) } catch { case ex: Exception => LOGGER.error("Failed to write data generation summary to HTML files", ex) } } - private def copyHtmlResources(fileSystem: FileSystem, folder: String): Unit = { - val resources = List("main.css", "data_catering_transparent.svg") - if (!foldersConfig.generatedReportsFolderPath.equalsIgnoreCase(DEFAULT_GENERATED_REPORTS_FOLDER_PATH)) { - resources.foreach(resource => { - val defaultResourcePath = new Path(s"file:///$DEFAULT_GENERATED_REPORTS_FOLDER_PATH/$resource") - val tryLocalUri = Try(new Path(getClass.getResource(s"/report/$resource").toURI)) - val resourcePath = tryLocalUri match { - case Failure(_) => - defaultResourcePath - case Success(value) => - Try(value.getName) match { - case Failure(_) => defaultResourcePath - case Success(name) => - if (name.startsWith("jar:")) defaultResourcePath else value - } - } - val destination = s"file:///$folder/$resource" - fileSystem.copyFromLocalFile(resourcePath, new Path(destination)) - }) - } - } - private def writeToFile(fileSystem: FileSystem, folderPath: String)(fileName: String, content: Node): Unit = { writeStringToFile(fileSystem, s"$folderPath/$fileName", content.toString()) } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala index fffb5758..023c9a15 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala @@ -739,6 +739,183 @@ class ResultHtmlWriter { } + def mainCss: String = { + """.box-iframe { + | float: left; + | margin-right: 10px; + |} + | + |body { + | margin: 0; + |} + | + |.top-banner { + | height: fit-content; + | background-color: #ff6e42; + | padding: 0 .2rem; + | display: flex; + |} + | + |.top-banner span { + | color: #f2f2f2; + | font-size: 17px; + | padding: 5px 6px; + | display: flex; + | align-items: center; + |} + | + |.logo { + | padding: 5px; + | height: 45px; + | width: auto; + | display: flex; + | align-items: center; + | justify-content: center; + |} + | + |.logo:hover { + | background-color: #ff9100; + | color: black; + |} + | + |.top-banner img { + | height: 35px; + | width: auto; + | display: flex; + | justify-content: center; + | vertical-align: middle; + |} + | + |.topnav { + | overflow: hidden; + | background-color: #ff6e42; + |} + | + |.topnav a { + | float: left; + | color: #f2f2f2; + | text-align: center; + | padding: 8px 10px; + | text-decoration: none; + | font-size: 17px; + |} + | + |.topnav a:hover { + | background-color: #ff9100; + | color: black; + |} + | + |.topnav a.active { + | color: black; + |} + | + |table { + | overflow: hidden; + | transition: max-height 0.2s ease-out; + |} + | + |table.codegrid { + | font-family: monospace; + | font-size: 12px; + | width: auto !important; + |} + | + |table.statementlist { + | width: auto !important; + | font-size: 13px; + |} + | + |table.codegrid td { + | padding: 0 !important; + | border: 0 !important + |} + | + |table td.linenumber { + | width: 40px !important; + |} + | + |td { + | white-space: normal + |} + | + |.table thead th { + | position: sticky; + | top: 0; + | z-index: 1; + |} + | + |table, tr, td, th { + | border-collapse: collapse; + |} + | + |.table-collapsible { + | max-height: 0; + | overflow: hidden; + | transition: max-height 0.2s ease-out; + |} + | + |.collapsible { + | background-color: lightgray; + | color: black; + | cursor: pointer; + | width: 100%; + | border: none; + | text-align: left; + | outline: none; + |} + | + |.collapsible:after { + | content: "\02795"; /* Unicode character for "plus" sign (+) */ + | color: white; + | float: right; + |} + | + |.active:after { + | content: "\2796"; /* Unicode character for "minus" sign (-) */ + |} + | + |.outer-container { + | display: flex; + | flex-direction: column; + | height: 100vh; + |} + | + |.top-container { + | height: 50%; + | overflow: auto; + | resize: vertical; + |} + | + |.bottom-container { + | flex: 1; + | min-height: 0; + | height: 50%; + | overflow: auto; + | resize: vertical; + |} + | + |.slider { + | text-align: center; + | background-color: #dee2e6; + | cursor: row-resize; + | user-select: none; + |} + | + |.selected-row { + | background-color: #ff6e42 !important; + |} + | + |.progress { + | white-space: normal; + | background-color: #d9534f; + |} + | + |.progress-bar { + | color: black; + |} + |""".stripMargin + } + def plugins: NodeBuffer = { @@ -749,4 +926,6 @@ class ResultHtmlWriter { } + + def dataCateringSvg: String = "" } diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala index a3ec3593..dbfdf6df 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala @@ -54,9 +54,13 @@ object Constants { lazy val COUNT_NUM_RECORDS = "numRecords" //report + lazy val REPORT_DATA_CATERING_SVG = "data_catering_transparent.svg" lazy val REPORT_DATA_SOURCES_HTML = "data-sources.html" lazy val REPORT_FIELDS_HTML = "steps.html" lazy val REPORT_HOME_HTML = "index.html" + lazy val REPORT_MAIN_CSS = "main.css" + lazy val REPORT_RESULT_JSON = "results.json" + lazy val REPORT_TASK_HTML = "tasks.html" lazy val REPORT_VALIDATIONS_HTML = "validations.html" //connection group type diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala index 3b98c8b3..d12dc4e0 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala @@ -1,6 +1,7 @@ package io.github.datacatering.datacaterer.core.model import io.github.datacatering.datacaterer.api.model.{ExpressionValidation, Validation} +import io.github.datacatering.datacaterer.core.util.ConfigUtil.cleanseOptions import io.github.datacatering.datacaterer.core.util.ResultWriterUtil.getSuccessSymbol import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema @@ -29,15 +30,19 @@ case class ValidationConfigResult( } def jsonSummary(numErrorSamples: Int): Map[String, Any] = { - val validationRes = dataSourceValidationResults.flatMap(_.validationResults) + val validationRes = dataSourceValidationResults.flatMap(dsv => + dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v)) + ) if (validationRes.nonEmpty) { - val (numSuccess, successRate, isSuccess) = baseSummary(validationRes) - val errorMap = validationRes.filter(!_.isSuccess).map(res => { - val validationDetails = res.validation.toOptions.map(v => (v.head, v.last)).toMap + val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3)) + val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => { + val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap Map( + "dataSourceName" -> res._1, + "options" -> cleanseOptions(res._2), "validation" -> validationDetails, - "numErrors" -> res.numErrors, - "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res) + "numErrors" -> res._3.numErrors, + "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res._3) ) }) val baseValidationMap = Map( diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala index 32be44d9..35e22029 100644 --- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala +++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala @@ -100,8 +100,10 @@ class UniqueFieldsUtil(plan: Plan, executableTasks: List[(TaskSummary, Task)])(i val uniqueKeys = step.gatherUniqueFields val uniqueKeyUf = if (uniqueKeys.nonEmpty) uniqueKeys.map(u => UniqueFields(t._1.dataSourceName, step.name, List(u))) else List() val allKeys = primaryKeyUf ++ uniqueKeyUf - LOGGER.debug(s"Found unique fields that require unique values, " + - s"data-source-name=${t._1.dataSourceName}, step-name=${step.name}, columns=${allKeys.map(_.columns).mkString(",")}") + if (allKeys.nonEmpty) { + LOGGER.debug(s"Found unique fields that require unique values, " + + s"data-source-name=${t._1.dataSourceName}, step-name=${step.name}, columns=${allKeys.map(_.columns).mkString(",")}") + } allKeys }) }) diff --git a/app/src/test/resources/application.conf b/app/src/test/resources/application.conf index 2cc0ffbd..cc30ce98 100644 --- a/app/src/test/resources/application.conf +++ b/app/src/test/resources/application.conf @@ -32,10 +32,15 @@ runtime{ "spark.sql.legacy.allowUntypedScalaUDF" = "true" "spark.sql.statistics.histogram.enabled" = "true" "spark.sql.shuffle.partitions" = "10" - "spark.sql.catalog.postgres" = "" - "spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog" - "spark.hadoop.fs.s3a.directory.marker.retention" = "keep" - "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true" + "spark.sql.catalog.postgres": "", + "spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog", + "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.iceberg.type": "hadoop", + "spark.hadoop.fs.s3a.directory.marker.retention": "keep", + "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true", + "spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem", + "spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem", + "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } } diff --git a/app/src/test/resources/sample/sql/postgres/customer.sql b/app/src/test/resources/sample/sql/postgres/customer.sql index 28304210..d5bffb53 100644 --- a/app/src/test/resources/sample/sql/postgres/customer.sql +++ b/app/src/test/resources/sample/sql/postgres/customer.sql @@ -25,18 +25,18 @@ CREATE TABLE IF NOT EXISTS account.balances ( account_number VARCHAR(20) UNIQUE NOT NULL, create_time TIMESTAMP, + account_status VARCHAR(10), balance DOUBLE PRECISION, PRIMARY KEY (account_number, create_time) ); CREATE TABLE IF NOT EXISTS account.transactions ( - account_number VARCHAR(20) UNIQUE NOT NULL, + account_number VARCHAR(20) NOT NULL REFERENCES account.balances (account_number), create_time TIMESTAMP, transaction_id VARCHAR(20), amount DOUBLE PRECISION, - PRIMARY KEY (account_number, create_time, transaction_id), - CONSTRAINT fk_txn_account_number FOREIGN KEY (account_number) REFERENCES account.balances (account_number) + PRIMARY KEY (account_number, create_time, transaction_id) ); CREATE TABLE IF NOT EXISTS account.mapping diff --git a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml index c4cd1ed9..16973650 100644 --- a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml +++ b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml @@ -13,7 +13,6 @@ steps: type: "regex" options: regex: "ACC1[0-9]{5,10}" - isUnique: true - name: "create_time" type: "timestamp" - name: "account_status" diff --git a/insta-integration.yaml b/insta-integration.yaml index dbf6b126..84038e61 100644 --- a/insta-integration.yaml +++ b/insta-integration.yaml @@ -6,19 +6,21 @@ run: env: PLAN_FILE_PATH: app/src/test/resources/sample/plan/account-balance-transaction-plan.yaml TASK_FOLDER_PATH: app/src/test/resources/sample/task + APPLICATION_CONFIG_PATH: app/src/main/resources/application.conf + generateFirst: false test: validation: postgres: - options: dbtable: account.balances validations: - - expr: ISNOTNULL(account_id) + - expr: ISNOTNULL(account_number) - aggType: count aggExpr: count == 1000 - options: dbtable: account.transactions validations: - - expr: ISNOTNULL(account_id) + - expr: ISNOTNULL(account_number) - aggType: count aggExpr: count == 5000 - groupByCols: [account_number]