diff --git a/.gitignore b/.gitignore index 69ea798..20f71bd 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ #Input of tests src/test/resources/prod/* src/test/scala/EndToEndTest.scala +src/test/models/* #Output of tests src/test/tmp/* diff --git a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala index 8a0cb6b..b5d86f9 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala @@ -47,7 +47,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, .option("txnVersion", batchId).option("txnAppId", output_identifier) .delta(output_identifier) - if (optimize && batchId % optimizeEvery == 0) { + if (batchId % optimizeEvery == 0) { compactAndVacuum } @@ -142,25 +142,41 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, override def writeStream(df: DataFrame): StreamingQuery = { val data_query = writeMode match { - case WriteMode.Append => df - .writeStream - .outputMode("append") - .option("mergeSchema", "true") - .option("checkpointLocation", checkpointLocation) - .foreachBatch(appendToDelta _) - .start + case WriteMode.Append => + + val sw = df + .writeStream + .outputMode("append") + .option("mergeSchema", "true") + .option("checkpointLocation", checkpointLocation) + + if (optimize) { + sw + .foreachBatch(appendToDelta _) + .start + } else { + sw + .delta(output_identifier) + } case WriteMode.Overwrite => DeltaTable.forPath(outputPath).delete() - df + val sw = df .writeStream .outputMode("append") .option("overwriteSchema", "true") .option("checkpointLocation", checkpointLocation) - .foreachBatch(appendToDelta _) - .start + + if (optimize) { + sw + .foreachBatch(appendToDelta _) + .start + } else { + sw + .delta(output_identifier) + } case WriteMode.Upsert => df .writeStream