Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insta integration #70

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
0980175
Add in generateFirst flag to false in insta-integration
pflooky Jul 9, 2024
a13cd45
Use gradle cache in github action
pflooky Jul 9, 2024
daa7ddd
Add in missing account_status from balances test table
pflooky Jul 9, 2024
169831d
Use debug log level for insta-integration
pflooky Jul 9, 2024
a35b2b8
Try set transactions acocunt_number to be unique
pflooky Jul 9, 2024
ce4e7fc
Take out isUnique from account.balances test, check if random amount …
pflooky Jul 9, 2024
42ed765
Fix if statement for checking random amount of records
pflooky Jul 9, 2024
a54c144
Remove unique from account_number in account.transactions table def
pflooky Jul 9, 2024
791fbdb
Add in APPLICATION_CONFIG_PATH to env for insta-integration
pflooky Jul 9, 2024
c26be6a
Update application.conf files
pflooky Jul 9, 2024
6ea13d1
Update data source name to postgres in main application.conf
pflooky Jul 9, 2024
d1f1cdb
Try default back to local resource path for html resources
pflooky Jul 9, 2024
c6fb039
Try default back to local resource path for html resources
pflooky Jul 9, 2024
786648a
Add in data source name and options to error validation details, writ…
pflooky Jul 9, 2024
19371d4
Take out clean from gradle command in github action
pflooky Jul 9, 2024
6fa3f0c
Use account_number instead of account_id for insta-integration valida…
pflooky Jul 9, 2024
db02a4d
Fix insta-integration github action, don't try to generate more recor…
pflooky Jul 9, 2024
e7ca434
Merge branch 'insta-integration' of github.com:data-catering/data-cat…
pflooky Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run build
run: "./gradlew clean :app:shadowJar"
- name: Gradle build with cache
uses: burrunan/gradle-cache-action@v1
with:
arguments: ":app:shadowJar"
- name: Run integration tests
id: tests
uses: data-catering/insta-integration@v1
Expand Down
152 changes: 90 additions & 62 deletions app/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,97 +1,144 @@
flags {
enableGeneratePlanAndTasks = false
enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS}
enableCount = true
enableCount = ${?ENABLE_COUNT}
enableGenerateData = true
enableGenerateData = ${?ENABLE_GENERATE_DATA}
enableGeneratePlanAndTasks = false
enableGeneratePlanAndTasks = ${?ENABLE_GENERATE_PLAN_AND_TASKS}
enableRecordTracking = false
enableRecordTracking = ${?ENABLE_RECORD_TRACKING}
enableDeleteGeneratedRecords = false
enableDeleteGeneratedRecords = ${?ENABLE_DELETE_GENERATED_RECORDS}
enableFailOnError = true
enableFailOnError = ${?ENABLE_FAIL_ON_ERROR}
enableUniqueCheck = true
enableUniqueCheck = ${?ENABLE_UNIQUE_CHECK}
enableSinkMetadata = true
enableSinkMetadata = ${?ENABLED_SINK_METADATA}
enableSinkMetadata = ${?ENABLE_SINK_METADATA}
enableSaveReports = true
enableSaveReports = ${?ENABLED_SAVE_REPORTS}
enableSaveReports = ${?ENABLE_SAVE_REPORTS}
enableValidation = false
enableValidation = ${?ENABLED_VALIDATION}
enableValidation = ${?ENABLE_VALIDATION}
enableGenerateValidations = false
enableGenerateValidations = ${?ENABLE_GENERATE_VALIDATIONS}
enableAlerts = false
enableAlerts = ${?ENABLE_ALERTS}
}

folders {
generatedPlanAndTaskFolderPath = "/tmp"
generatedPlanAndTaskFolderPath = ${?GENERATED_PLAN_AND_TASK_FOLDER_PATH}
planFilePath = "/plan/customer-create-plan.yaml"
planFilePath = "app/src/test/resources/sample/plan/customer-create-plan.yaml"
planFilePath = ${?PLAN_FILE_PATH}
taskFolderPath = "/task"
taskFolderPath = "app/src/test/resources/sample/task"
taskFolderPath = ${?TASK_FOLDER_PATH}
recordTrackingFolderPath = "/tmp/data/generated/recordTracking"
recordTrackingFolderPath = ${?RECORD_TRACKING_FOLDER_PATH}
recordTrackingForValidationFolderPath = "/tmp/data/validation/recordTracking"
recordTrackingForValidationFolderPath = ${?RECORD_TRACKING_VALIDATION_FOLDER_PATH}
generatedReportsFolderPath = "app/src/test/resources/sample/html"
generatedReportsFolderPath = ${?GENERATED_REPORTS_FOLDER_PATH}
validationFolderPath = "app/src/test/resources/sample/validation"
validationFolderPath = ${?VALIDATION_FOLDER_PATH}
}

metadata {
numRecordsFromDataSource = 10000
numRecordsFromDataSource = ${?METADATA_NUM_RECORDS_FROM_DATA_SOURCE}
numRecordsFromDataSource = ${?NUM_RECORDS_FROM_DATA_SOURCE}
numRecordsForAnalysis = 10000
numRecordsForAnalysis = ${?METADATA_NUM_RECORDS_FOR_ANALYSIS}
numRecordsForAnalysis = ${?NUM_RECORDS_FOR_ANALYSIS}
oneOfDistinctCountVsCountThreshold = 0.1
oneOfDistinctCountVsCountThreshold = ${?METADATA_ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD}
oneOfDistinctCountVsCountThreshold = ${?ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD}
oneOfMinCount = 1000
oneOfMinCount = ${?ONE_OF_MIN_COUNT}
numGeneratedSamples = 10
numGeneratedSamples = ${?NUM_GENERATED_SAMPLES}
}

generation {
numRecordsPerBatch = 100000
numRecordsPerBatch = ${?GENERATION_NUM_RECORDS_PER_BATCH}
numRecordsPerBatch = 1000000
numRecordsPerBatch = ${?NUM_RECORDS_PER_BATCH}
}

validation {}
alert {}
validation {
numSampleErrorRecords = 5
numSampleErrorRecords = ${?NUM_SAMPLE_ERROR_RECORDS}
enableDeleteRecordTrackingFiles = true
enableDeleteRecordTrackingFiles = ${?ENABLE_DELETE_RECORD_TRACKING_FILES}
}

alert {
triggerOn = "all"
triggerOn = ${?ALERT_TRIGGER_ON}
slackAlertConfig {
token = ""
token = ${?ALERT_SLACK_TOKEN}
channels = []
channels = ${?ALERT_SLACK_CHANNELS}
}
}

runtime {
master = "local[*]"
master = ${?DATA_CATERER_MASTER}
config {
"spark.sql.cbo.enabled" = "true"
"spark.sql.adaptive.enabled" = "true"
"spark.sql.cbo.planStats.enabled" = "true"
"spark.sql.legacy.allowUntypedScalaUDF" = "true"
"spark.sql.statistics.histogram.enabled" = "true"
"spark.sql.shuffle.partitions" = "10"
"spark.sql.catalog.postgres" = ""
"spark.sql.catalog.cassandra" = "com.datastax.spark.connector.datasource.CassandraCatalog"
"spark.hadoop.fs.s3a.directory.marker.retention" = "keep"
"spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled" = "true"
#"spark.hadoop.fs.defaultFS" = "s3a://my-bucket"
"spark.sql.cbo.enabled": "true",
"spark.sql.adaptive.enabled": "true",
"spark.sql.cbo.planStats.enabled": "true",
"spark.sql.legacy.allowUntypedScalaUDF": "true",
"spark.sql.legacy.allowParameterlessCount": "true",
"spark.sql.statistics.histogram.enabled": "true",
"spark.sql.shuffle.partitions": "10",
"spark.sql.catalog.postgres": "",
"spark.sql.catalog.cassandra": "com.datastax.spark.connector.datasource.CassandraCatalog",
"spark.sql.catalog.iceberg": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.iceberg.type": "hadoop",
"spark.hadoop.fs.s3a.directory.marker.retention": "keep",
"spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled": "true",
"spark.hadoop.fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem",
"spark.hadoop.fs.file.impl": "com.globalmentor.apache.hadoop.fs.BareLocalFileSystem",
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
}
}

json {
json {
}
}

csv {
csv {
}
}

delta {
delta {
}
}

iceberg {
iceberg {
}
}

orc {
orc {
}
}

parquet {
parquet {
}
}

# connection type
jdbc {
# connection name
postgres {
# connection details
url = "jdbc:postgresql://localhost:5432/customer"
url = ${?POSTGRES_URL}
user = "postgres"
user = ${?POSTGRES_USERNAME}
user = ${?POSTGRES_USER}
password = "postgres"
password = ${?POSTGRES_PASSWORD}
driver = "org.postgresql.Driver"
}
postgresDvd {
url = "jdbc:postgresql://localhost:5432/dvdrental"
url = ${?POSTGRES_URL}
user = "postgres"
user = ${?POSTGRES_USERNAME}
password = "postgres"
password = ${?POSTGRES_PASSWORD}
driver = "org.postgresql.Driver"
stringtype = "unspecified"
}
mysql {
url = "jdbc:mysql://localhost:3306/customer"
url = ${?MYSQL_URL}
Expand All @@ -103,6 +150,7 @@ jdbc {
}
}


org.apache.spark.sql.cassandra {
cassandra {
spark.cassandra.connection.host = "localhost"
Expand Down Expand Up @@ -140,30 +188,10 @@ jms {

kafka {
kafka {
kafka.bootstrap.servers = "localhost:9092"
kafka.bootstrap.servers = "localhost:29092"
kafka.bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS}
}
}

parquet {
parquet {
path = "app/src/test/resources/sample"
path = ${?PARQUET_PATH}
}
}

json {
json {
path = "app/src/test/resources/sample"
path = ${?JSON_PATH}
}
}

csv {
csv {
path = "app/src/test/resources/sample"
path = ${?CSV_PATH}
}
}

datastax-java-driver.advanced.metadata.schema.refreshed-keyspaces = [ "/.*/" ]
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ class BatchDataProcessor(connectionConfigsByName: Map[String, Map[String, String
s"new-num-records=${additionalDf.count()}, actual-num-records=$dfRecordCount")
}

while (targetNumRecords != dfRecordCount && retries < maxRetries) {
retries += 1
generateAdditionalRecords()
}
if (targetNumRecords != dfRecordCount && retries == maxRetries) {
LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " +
s"Can be due to limited number of potential unique records, " +
s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}")
//if random amount of records, don't try to regenerate more records
if (s.count.generator.isEmpty && s.count.perColumn.forall(_.generator.isEmpty)) {
while (targetNumRecords != dfRecordCount && retries < maxRetries) {
retries += 1
generateAdditionalRecords()
}
if (targetNumRecords != dfRecordCount && retries == maxRetries) {
LOGGER.warn("Unable to reach expected number of records due to reaching max retries. " +
s"Can be due to limited number of potential unique records, " +
s"target-num-records=$targetNumRecords, actual-num-records=${dfRecordCount}")
}
} else {
LOGGER.debug("Random amount of records generated, not attempting to generate more records")
}

trackRecordsPerStep = trackRecordsPerStep ++ Map(recordStepName -> stepRecords.copy(currentNumRecords = dfRecordCount))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_GENERATED_REPORTS_FOLDER_PATH, SPECIFIC_DATA_SOURCE_OPTIONS}
import io.github.datacatering.datacaterer.api.model.{DataCatererConfiguration, Field, Plan, Step, Task}
import io.github.datacatering.datacaterer.core.listener.SparkRecordListener
import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_VALIDATIONS_HTML}
import io.github.datacatering.datacaterer.core.model.Constants.{REPORT_DATA_CATERING_SVG, REPORT_DATA_SOURCES_HTML, REPORT_FIELDS_HTML, REPORT_HOME_HTML, REPORT_MAIN_CSS, REPORT_RESULT_JSON, REPORT_TASK_HTML, REPORT_VALIDATIONS_HTML}
import io.github.datacatering.datacaterer.core.model.{DataSourceResult, DataSourceResultSummary, StepResultSummary, TaskResultSummary, ValidationConfigResult}
import io.github.datacatering.datacaterer.core.plan.PostPlanProcessor
import io.github.datacatering.datacaterer.core.util.FileUtil.writeStringToFile
Expand Down Expand Up @@ -50,41 +50,19 @@ class DataGenerationResultWriter(val dataCatererConfiguration: DataCatererConfig
try {
fileWriter(REPORT_HOME_HTML, htmlWriter.index(plan, stepSummary, taskSummary, dataSourceSummary,
validationResults, dataCatererConfiguration.flagsConfig, sparkRecordListener))
fileWriter("tasks.html", htmlWriter.taskDetails(taskSummary))
fileWriter(REPORT_TASK_HTML, htmlWriter.taskDetails(taskSummary))
fileWriter(REPORT_FIELDS_HTML, htmlWriter.stepDetails(stepSummary))
fileWriter(REPORT_DATA_SOURCES_HTML, htmlWriter.dataSourceDetails(stepSummary.flatMap(_.dataSourceResults)))
fileWriter(REPORT_VALIDATIONS_HTML, htmlWriter.validations(validationResults, validationConfig))
writeStringToFile(fileSystem, s"$reportFolder/results.json", resultsAsJson(generationResult, validationResults))

copyHtmlResources(fileSystem, reportFolder)
writeStringToFile(fileSystem, s"$reportFolder/$REPORT_RESULT_JSON", resultsAsJson(generationResult, validationResults))
writeStringToFile(fileSystem, s"$reportFolder/$REPORT_DATA_CATERING_SVG", htmlWriter.dataCateringSvg)
writeStringToFile(fileSystem, s"$reportFolder/$REPORT_MAIN_CSS", htmlWriter.mainCss)
} catch {
case ex: Exception =>
LOGGER.error("Failed to write data generation summary to HTML files", ex)
}
}

private def copyHtmlResources(fileSystem: FileSystem, folder: String): Unit = {
val resources = List("main.css", "data_catering_transparent.svg")
if (!foldersConfig.generatedReportsFolderPath.equalsIgnoreCase(DEFAULT_GENERATED_REPORTS_FOLDER_PATH)) {
resources.foreach(resource => {
val defaultResourcePath = new Path(s"file:///$DEFAULT_GENERATED_REPORTS_FOLDER_PATH/$resource")
val tryLocalUri = Try(new Path(getClass.getResource(s"/report/$resource").toURI))
val resourcePath = tryLocalUri match {
case Failure(_) =>
defaultResourcePath
case Success(value) =>
Try(value.getName) match {
case Failure(_) => defaultResourcePath
case Success(name) =>
if (name.startsWith("jar:")) defaultResourcePath else value
}
}
val destination = s"file:///$folder/$resource"
fileSystem.copyFromLocalFile(resourcePath, new Path(destination))
})
}
}

private def writeToFile(fileSystem: FileSystem, folderPath: String)(fileName: String, content: Node): Unit = {
writeStringToFile(fileSystem, s"$folderPath/$fileName", content.toString())
}
Expand Down
Loading
Loading