diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml
index ed9880a0..f92c23fc 100644
--- a/.github/workflows/check.yml
+++ b/.github/workflows/check.yml
@@ -10,8 +10,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- - name: Run build
- run: "./gradlew clean :app:shadowJar"
+ - name: Gradle build with cache
+ uses: burrunan/gradle-cache-action@v1
+ with:
+ arguments: ":app:shadowJar"
- name: Run integration tests
id: tests
uses: data-catering/insta-integration@v1
diff --git a/app/src/main/resources/application.conf b/app/src/main/resources/application.conf
index 14898ef3..3966ef0d 100644
--- a/app/src/main/resources/application.conf
+++ b/app/src/main/resources/application.conf
@@ -1,97 +1,144 @@
flags {
- enableGeneratePlanAndTasks = false
- enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS}
enableCount = true
enableCount = ${?ENABLE_COUNT}
enableGenerateData = true
enableGenerateData = ${?ENABLE_GENERATE_DATA}
+ enableGeneratePlanAndTasks = false
+ enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS}
enableRecordTracking = false
enableRecordTracking = ${?ENABLE_RECORD_TRACKING}
enableDeleteGeneratedRecords = false
enableDeleteGeneratedRecords = ${?ENABLE_DELETE_GENERATED_RECORDS}
enableFailOnError = true
enableFailOnError = ${?ENABLE_FAIL_ON_ERROR}
+ enableUniqueCheck = true
+ enableUniqueCheck = ${?ENABLE_UNIQUE_CHECK}
enableSinkMetadata = true
- enableSinkMetadata = ${?ENABLED_SINK_METADATA}
+ enableSinkMetadata = ${?ENABLE_SINK_METADATA}
enableSaveReports = true
- enableSaveReports = ${?ENABLED_SAVE_REPORTS}
+ enableSaveReports = ${?ENABLE_SAVE_REPORTS}
enableValidation = false
- enableValidation = ${?ENABLED_VALIDATION}
+ enableValidation = ${?ENABLE_VALIDATION}
+ enableGenerateValidations = false
+ enableGenerateValidations = ${?ENABLE_GENERATE_VALIDATIONS}
+ enableAlerts = false
+ enableAlerts = ${?ENABLE_ALERTS}
}
folders {
generatedPlanAndTaskFolderPath = "/tmp"
generatedPlanAndTaskFolderPath = ${?GENERATED_PLAN_AND_TASK_FOLDER_PATH}
- planFilePath = "/plan/customer-create-plan.yaml"
+ planFilePath = "app/src/test/resources/sample/plan/customer-create-plan.yaml"
planFilePath = ${?PLAN_FILE_PATH}
- taskFolderPath = "/task"
+ taskFolderPath = "app/src/test/resources/sample/task"
taskFolderPath = ${?TASK_FOLDER_PATH}
recordTrackingFolderPath = "/tmp/data/generated/recordTracking"
recordTrackingFolderPath = ${?RECORD_TRACKING_FOLDER_PATH}
+ recordTrackingForValidationFolderPath = "/tmp/data/validation/recordTracking"
+ recordTrackingForValidationFolderPath = ${?RECORD_TRACKING_VALIDATION_FOLDER_PATH}
generatedReportsFolderPath = "app/src/test/resources/sample/html"
generatedReportsFolderPath = ${?GENERATED_REPORTS_FOLDER_PATH}
- validationFolderPath = "app/src/test/resources/sample/validation"
- validationFolderPath = ${?VALIDATION_FOLDER_PATH}
}
metadata {
numRecordsFromDataSource = 10000
- numRecordsFromDataSource = ${?METADATA_NUM_RECORDS_FROM_DATA_SOURCE}
+ numRecordsFromDataSource = ${?NUM_RECORDS_FROM_DATA_SOURCE}
numRecordsForAnalysis = 10000
- numRecordsForAnalysis = ${?METADATA_NUM_RECORDS_FOR_ANALYSIS}
+ numRecordsForAnalysis = ${?NUM_RECORDS_FOR_ANALYSIS}
oneOfDistinctCountVsCountThreshold = 0.1
- oneOfDistinctCountVsCountThreshold = ${?METADATA_ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD}
+ oneOfDistinctCountVsCountThreshold = ${?ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD}
+ oneOfMinCount = 1000
+ oneOfMinCount = ${?ONE_OF_MIN_COUNT}
+ numGeneratedSamples = 10
+ numGeneratedSamples = ${?NUM_GENERATED_SAMPLES}
}
generation {
- numRecordsPerBatch = 100000
- numRecordsPerBatch = ${?GENERATION_NUM_RECORDS_PER_BATCH}
+ numRecordsPerBatch = 1000000
+ numRecordsPerBatch = ${?NUM_RECORDS_PER_BATCH}
}
-validation {}
-alert {}
+validation {
+ numSampleErrorRecords = 5
+ numSampleErrorRecords = ${?NUM_SAMPLE_ERROR_RECORDS}
+ enableDeleteRecordTrackingFiles = true
+ enableDeleteRecordTrackingFiles = ${?ENABLE_DELETE_RECORD_TRACKING_FILES}
+}
+
+alert {
+ triggerOn = "all"
+ triggerOn = ${?ALERT_TRIGGER_ON}
+ slackAlertConfig {
+ token = ""
+ token = ${?ALERT_SLACK_TOKEN}
+ channels = []
+ channels = ${?ALERT_SLACK_CHANNELS}
+ }
+}
runtime {
master = "local[*]"
master = ${?DATA_CATERER_MASTER}
config {
- "spark.sql.cbo.enabled" = "true"
- "spark.sql.adaptive.enabled" = "true"
- "spark.sql.cbo.planStats.enabled" = "true"
- "spark.sql.legacy.allowUntypedScalaUDF" = "true"
- "spark.sql.statistics.histogram.enabled" = "true"
- "spark.sql.shuffle.partitions" = "10"
- "spark.sql.catalog.postgres" = ""
- "spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog"
- "spark.hadoop.fs.s3a.directory.marker.retention" = "keep"
- "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true"
- #"spark.hadoop.fs.defaultFS" = "s3a://my-bucket"
+ "spark.sql.cbo.enabled": "true",
+ "spark.sql.adaptive.enabled": "true",
+ "spark.sql.cbo.planStats.enabled": "true",
+ "spark.sql.legacy.allowUntypedScalaUDF": "true",
+ "spark.sql.legacy.allowParameterlessCount": "true",
+ "spark.sql.statistics.histogram.enabled": "true",
+ "spark.sql.shuffle.partitions": "10",
+ "spark.sql.catalog.postgres": "",
+ "spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog",
+ "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog",
+ "spark.sql.catalog.iceberg.type": "hadoop",
+ "spark.hadoop.fs.s3a.directory.marker.retention": "keep",
+ "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true",
+ "spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",
+ "spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem",
+ "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
+ }
+}
+
+json {
+ json {
+ }
+}
+
+csv {
+ csv {
+ }
+}
+
+delta {
+ delta {
+ }
+}
+
+iceberg {
+ iceberg {
+ }
+}
+
+orc {
+ orc {
+ }
+}
+
+parquet {
+ parquet {
}
}
-# connection type
jdbc {
-# connection name
postgres {
-# connection details
url = "jdbc:postgresql://localhost:5432/customer"
url = ${?POSTGRES_URL}
user = "postgres"
- user = ${?POSTGRES_USERNAME}
+ user = ${?POSTGRES_USER}
password = "postgres"
password = ${?POSTGRES_PASSWORD}
driver = "org.postgresql.Driver"
}
- postgresDvd {
- url = "jdbc:postgresql://localhost:5432/dvdrental"
- url = ${?POSTGRES_URL}
- user = "postgres"
- user = ${?POSTGRES_USERNAME}
- password = "postgres"
- password = ${?POSTGRES_PASSWORD}
- driver = "org.postgresql.Driver"
- stringtype = "unspecified"
- }
mysql {
url = "jdbc:mysql://localhost:3306/customer"
url = ${?MYSQL_URL}
@@ -103,6 +150,7 @@ jdbc {
}
}
+
org.apache.spark.sql.cassandra {
cassandra {
spark.cassandra.connection.host = "localhost"
@@ -140,30 +188,10 @@ jms {
kafka {
kafka {
- kafka.bootstrap.servers = "localhost:9092"
+ kafka.bootstrap.servers = "localhost:29092"
kafka.bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS}
}
}
-parquet {
- parquet {
- path = "app/src/test/resources/sample"
- path = ${?PARQUET_PATH}
- }
-}
-
-json {
- json {
- path = "app/src/test/resources/sample"
- path = ${?JSON_PATH}
- }
-}
-
-csv {
- csv {
- path = "app/src/test/resources/sample"
- path = ${?CSV_PATH}
- }
-}
datastax-java-driver.advanced.metadata.schema.refreshed-keyspaces = [ "/.*/" ]
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala
index f2f1e7c5..97a32ca7 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/BatchDataProcessor.scala
@@ -68,14 +68,19 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
s"new-num-records=${additionalDf.count()}, actual-num-records=$dfRecordCount")
}
- while (targetNumRecords != dfRecordCount && retries < maxRetries) {
- retries += 1
- generateAdditionalRecords()
- }
- if (targetNumRecords != dfRecordCount && retries == maxRetries) {
- LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " +
- s"Can be due to limited number of potential unique records, " +
- s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}")
+ //if random amount of records, don't try to regenerate more records
+ if (s.count.generator.isEmpty && s.count.perColumn.forall(_.generator.isEmpty)) {
+ while (targetNumRecords != dfRecordCount && retries < maxRetries) {
+ retries += 1
+ generateAdditionalRecords()
+ }
+ if (targetNumRecords != dfRecordCount && retries == maxRetries) {
+ LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " +
+ s"Can be due to limited number of potential unique records, " +
+ s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}")
+ }
+ } else {
+ LOGGER.debug("Random amount of records generated, not attempting to generate more records")
}
trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount))
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala
index 6dbd1515..94b1278d 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/DataGenerationResultWriter.scala
@@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_GENERATED_REPORTS_FOLDER_PATH, SPECIFIC_DATA_SOURCE_OPTIONS}
import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Field, Plan, Step, Task}
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
-import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_VALIDATIONS_HTML}
+import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_CATERING_SVG, REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_MAIN_CSS, REPORT_RESULT_JSON, REPORT_TASK_HTML, REPORT_VALIDATIONS_HTML}
import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor
import io.github.datacatering.datacaterer.core.util.FileUtil.writeStringToFile
@@ -50,41 +50,19 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig
try {
fileWriter(REPORT_HOME_HTML, htmlWriter.index(plan, stepSummary, taskSummary, dataSourceSummary,
validationResults, dataCatererConfiguration.flagsConfig, sparkRecordListener))
- fileWriter("tasks.html", htmlWriter.taskDetails(taskSummary))
+ fileWriter(REPORT_TASK_HTML, htmlWriter.taskDetails(taskSummary))
fileWriter(REPORT_FIELDS_HTML, htmlWriter.stepDetails(stepSummary))
fileWriter(REPORT_DATA_SOURCES_HTML, htmlWriter.dataSourceDetails(stepSummary.flatMap(_.dataSourceResults)))
fileWriter(REPORT_VALIDATIONS_HTML, htmlWriter.validations(validationResults, validationConfig))
- writeStringToFile(fileSystem, s"$reportFolder/results.json", resultsAsJson(generationResult, validationResults))
-
- copyHtmlResources(fileSystem, reportFolder)
+ writeStringToFile(fileSystem, s"$reportFolder/$REPORT_RESULT_JSON", resultsAsJson(generationResult, validationResults))
+ writeStringToFile(fileSystem, s"$reportFolder/$REPORT_DATA_CATERING_SVG", htmlWriter.dataCateringSvg)
+ writeStringToFile(fileSystem, s"$reportFolder/$REPORT_MAIN_CSS", htmlWriter.mainCss)
} catch {
case ex: Exception =>
LOGGER.error("Failed to write data generation summary to HTML files", ex)
}
}
- private def copyHtmlResources(fileSystem: FileSystem, folder: String): Unit = {
- val resources = List("main.css", "data_catering_transparent.svg")
- if (!foldersConfig.generatedReportsFolderPath.equalsIgnoreCase(DEFAULT_GENERATED_REPORTS_FOLDER_PATH)) {
- resources.foreach(resource => {
- val defaultResourcePath = new Path(s"file:///$DEFAULT_GENERATED_REPORTS_FOLDER_PATH/$resource")
- val tryLocalUri = Try(new Path(getClass.getResource(s"/report/$resource").toURI))
- val resourcePath = tryLocalUri match {
- case Failure(_) =>
- defaultResourcePath
- case Success(value) =>
- Try(value.getName) match {
- case Failure(_) => defaultResourcePath
- case Success(name) =>
- if (name.startsWith("jar:")) defaultResourcePath else value
- }
- }
- val destination = s"file:///$folder/$resource"
- fileSystem.copyFromLocalFile(resourcePath, new Path(destination))
- })
- }
- }
-
private def writeToFile(fileSystem: FileSystem, folderPath: String)(fileName: String, content: Node): Unit = {
writeStringToFile(fileSystem, s"$folderPath/$fileName", content.toString())
}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala
index fffb5758..023c9a15 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/generator/result/ResultHtmlWriter.scala
@@ -739,6 +739,183 @@ class ResultHtmlWriter {
}
+ def mainCss: String = {
+ """.box-iframe {
+ | float: left;
+ | margin-right: 10px;
+ |}
+ |
+ |body {
+ | margin: 0;
+ |}
+ |
+ |.top-banner {
+ | height: fit-content;
+ | background-color: #ff6e42;
+ | padding: 0 .2rem;
+ | display: flex;
+ |}
+ |
+ |.top-banner span {
+ | color: #f2f2f2;
+ | font-size: 17px;
+ | padding: 5px 6px;
+ | display: flex;
+ | align-items: center;
+ |}
+ |
+ |.logo {
+ | padding: 5px;
+ | height: 45px;
+ | width: auto;
+ | display: flex;
+ | align-items: center;
+ | justify-content: center;
+ |}
+ |
+ |.logo:hover {
+ | background-color: #ff9100;
+ | color: black;
+ |}
+ |
+ |.top-banner img {
+ | height: 35px;
+ | width: auto;
+ | display: flex;
+ | justify-content: center;
+ | vertical-align: middle;
+ |}
+ |
+ |.topnav {
+ | overflow: hidden;
+ | background-color: #ff6e42;
+ |}
+ |
+ |.topnav a {
+ | float: left;
+ | color: #f2f2f2;
+ | text-align: center;
+ | padding: 8px 10px;
+ | text-decoration: none;
+ | font-size: 17px;
+ |}
+ |
+ |.topnav a:hover {
+ | background-color: #ff9100;
+ | color: black;
+ |}
+ |
+ |.topnav a.active {
+ | color: black;
+ |}
+ |
+ |table {
+ | overflow: hidden;
+ | transition: max-height 0.2s ease-out;
+ |}
+ |
+ |table.codegrid {
+ | font-family: monospace;
+ | font-size: 12px;
+ | width: auto !important;
+ |}
+ |
+ |table.statementlist {
+ | width: auto !important;
+ | font-size: 13px;
+ |}
+ |
+ |table.codegrid td {
+ | padding: 0 !important;
+ | border: 0 !important
+ |}
+ |
+ |table td.linenumber {
+ | width: 40px !important;
+ |}
+ |
+ |td {
+ | white-space: normal
+ |}
+ |
+ |.table thead th {
+ | position: sticky;
+ | top: 0;
+ | z-index: 1;
+ |}
+ |
+ |table, tr, td, th {
+ | border-collapse: collapse;
+ |}
+ |
+ |.table-collapsible {
+ | max-height: 0;
+ | overflow: hidden;
+ | transition: max-height 0.2s ease-out;
+ |}
+ |
+ |.collapsible {
+ | background-color: lightgray;
+ | color: black;
+ | cursor: pointer;
+ | width: 100%;
+ | border: none;
+ | text-align: left;
+ | outline: none;
+ |}
+ |
+ |.collapsible:after {
+ | content: "\02795"; /* Unicode character for "plus" sign (+) */
+ | color: white;
+ | float: right;
+ |}
+ |
+ |.active:after {
+ | content: "\2796"; /* Unicode character for "minus" sign (-) */
+ |}
+ |
+ |.outer-container {
+ | display: flex;
+ | flex-direction: column;
+ | height: 100vh;
+ |}
+ |
+ |.top-container {
+ | height: 50%;
+ | overflow: auto;
+ | resize: vertical;
+ |}
+ |
+ |.bottom-container {
+ | flex: 1;
+ | min-height: 0;
+ | height: 50%;
+ | overflow: auto;
+ | resize: vertical;
+ |}
+ |
+ |.slider {
+ | text-align: center;
+ | background-color: #dee2e6;
+ | cursor: row-resize;
+ | user-select: none;
+ |}
+ |
+ |.selected-row {
+ | background-color: #ff6e42 !important;
+ |}
+ |
+ |.progress {
+ | white-space: normal;
+ | background-color: #d9534f;
+ |}
+ |
+ |.progress-bar {
+ | color: black;
+ |}
+ |""".stripMargin
+ }
+
def plugins: NodeBuffer = {
@@ -749,4 +926,6 @@ class ResultHtmlWriter {
}
+
+ def dataCateringSvg: String = ""
}
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala
index a3ec3593..dbfdf6df 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/Constants.scala
@@ -54,9 +54,13 @@ object Constants {
lazy val COUNT_NUM_RECORDS = "numRecords"
//report
+ lazy val REPORT_DATA_CATERING_SVG = "data_catering_transparent.svg"
lazy val REPORT_DATA_SOURCES_HTML = "data-sources.html"
lazy val REPORT_FIELDS_HTML = "steps.html"
lazy val REPORT_HOME_HTML = "index.html"
+ lazy val REPORT_MAIN_CSS = "main.css"
+ lazy val REPORT_RESULT_JSON = "results.json"
+ lazy val REPORT_TASK_HTML = "tasks.html"
lazy val REPORT_VALIDATIONS_HTML = "validations.html"
//connection group type
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala
index 3b98c8b3..d12dc4e0 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/model/ValidationModels.scala
@@ -1,6 +1,7 @@
package io.github.datacatering.datacaterer.core.model
import io.github.datacatering.datacaterer.api.model.{ExpressionValidation, Validation}
+import io.github.datacatering.datacaterer.core.util.ConfigUtil.cleanseOptions
import io.github.datacatering.datacaterer.core.util.ResultWriterUtil.getSuccessSymbol
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
@@ -29,15 +30,19 @@ case class ValidationConfigResult(
}
def jsonSummary(numErrorSamples: Int): Map[String, Any] = {
- val validationRes = dataSourceValidationResults.flatMap(_.validationResults)
+ val validationRes = dataSourceValidationResults.flatMap(dsv =>
+ dsv.validationResults.map(v => (dsv.dataSourceName, dsv.options, v))
+ )
if (validationRes.nonEmpty) {
- val (numSuccess, successRate, isSuccess) = baseSummary(validationRes)
- val errorMap = validationRes.filter(!_.isSuccess).map(res => {
- val validationDetails = res.validation.toOptions.map(v => (v.head, v.last)).toMap
+ val (numSuccess, successRate, isSuccess) = baseSummary(validationRes.map(_._3))
+ val errorMap = validationRes.filter(vr => !vr._3.isSuccess).map(res => {
+ val validationDetails = res._3.validation.toOptions.map(v => (v.head, v.last)).toMap
Map(
+ "dataSourceName" -> res._1,
+ "options" -> cleanseOptions(res._2),
"validation" -> validationDetails,
- "numErrors" -> res.numErrors,
- "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res)
+ "numErrors" -> res._3.numErrors,
+ "sampleErrorValues" -> getErrorSamplesAsMap(numErrorSamples, res._3)
)
})
val baseValidationMap = Map(
diff --git a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala
index 32be44d9..35e22029 100644
--- a/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala
+++ b/app/src/main/scala/io/github/datacatering/datacaterer/core/util/UniqueFieldsUtil.scala
@@ -100,8 +100,10 @@ class UniqueFieldsUtil(plan: Plan, executableTasks: List[(TaskSummary, Task)])(i
val uniqueKeys = step.gatherUniqueFields
val uniqueKeyUf = if (uniqueKeys.nonEmpty) uniqueKeys.map(u => UniqueFields(t._1.dataSourceName, step.name, List(u))) else List()
val allKeys = primaryKeyUf ++ uniqueKeyUf
- LOGGER.debug(s"Found unique fields that require unique values, " +
- s"data-source-name=${t._1.dataSourceName}, step-name=${step.name}, columns=${allKeys.map(_.columns).mkString(",")}")
+ if (allKeys.nonEmpty) {
+ LOGGER.debug(s"Found unique fields that require unique values, " +
+ s"data-source-name=${t._1.dataSourceName}, step-name=${step.name}, columns=${allKeys.map(_.columns).mkString(",")}")
+ }
allKeys
})
})
diff --git a/app/src/test/resources/application.conf b/app/src/test/resources/application.conf
index 2cc0ffbd..cc30ce98 100644
--- a/app/src/test/resources/application.conf
+++ b/app/src/test/resources/application.conf
@@ -32,10 +32,15 @@ runtime{
"spark.sql.legacy.allowUntypedScalaUDF" = "true"
"spark.sql.statistics.histogram.enabled" = "true"
"spark.sql.shuffle.partitions" = "10"
- "spark.sql.catalog.postgres" = ""
- "spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog"
- "spark.hadoop.fs.s3a.directory.marker.retention" = "keep"
- "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true"
+ "spark.sql.catalog.postgres": "",
+ "spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog",
+ "spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog",
+ "spark.sql.catalog.iceberg.type": "hadoop",
+ "spark.hadoop.fs.s3a.directory.marker.retention": "keep",
+ "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true",
+ "spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",
+ "spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem",
+ "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
}
}
diff --git a/app/src/test/resources/sample/sql/postgres/customer.sql b/app/src/test/resources/sample/sql/postgres/customer.sql
index 28304210..d5bffb53 100644
--- a/app/src/test/resources/sample/sql/postgres/customer.sql
+++ b/app/src/test/resources/sample/sql/postgres/customer.sql
@@ -25,18 +25,18 @@ CREATE TABLE IF NOT EXISTS account.balances
(
account_number VARCHAR(20) UNIQUE NOT NULL,
create_time TIMESTAMP,
+ account_status VARCHAR(10),
balance DOUBLE PRECISION,
PRIMARY KEY (account_number, create_time)
);
CREATE TABLE IF NOT EXISTS account.transactions
(
- account_number VARCHAR(20) UNIQUE NOT NULL,
+ account_number VARCHAR(20) NOT NULL REFERENCES account.balances (account_number),
create_time TIMESTAMP,
transaction_id VARCHAR(20),
amount DOUBLE PRECISION,
- PRIMARY KEY (account_number, create_time, transaction_id),
- CONSTRAINT fk_txn_account_number FOREIGN KEY (account_number) REFERENCES account.balances (account_number)
+ PRIMARY KEY (account_number, create_time, transaction_id)
);
CREATE TABLE IF NOT EXISTS account.mapping
diff --git a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml
index c4cd1ed9..16973650 100644
--- a/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml
+++ b/app/src/test/resources/sample/task/postgres/postgres-balance-transaction-task.yaml
@@ -13,7 +13,6 @@ steps:
type: "regex"
options:
regex: "ACC1[0-9]{5,10}"
- isUnique: true
- name: "create_time"
type: "timestamp"
- name: "account_status"
diff --git a/insta-integration.yaml b/insta-integration.yaml
index dbf6b126..84038e61 100644
--- a/insta-integration.yaml
+++ b/insta-integration.yaml
@@ -6,19 +6,21 @@ run:
env:
PLAN_FILE_PATH: app/src/test/resources/sample/plan/account-balance-transaction-plan.yaml
TASK_FOLDER_PATH: app/src/test/resources/sample/task
+ APPLICATION_CONFIG_PATH: app/src/main/resources/application.conf
+ generateFirst: false
test:
validation:
postgres:
- options:
dbtable: account.balances
validations:
- - expr: ISNOTNULL(account_id)
+ - expr: ISNOTNULL(account_number)
- aggType: count
aggExpr: count == 1000
- options:
dbtable: account.transactions
validations:
- - expr: ISNOTNULL(account_id)
+ - expr: ISNOTNULL(account_number)
- aggType: count
aggExpr: count == 5000
- groupByCols: [account_number]