diff --git a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala index b44eef1..8a0cb6b 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala @@ -47,7 +47,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, .option("txnVersion", batchId).option("txnAppId", output_identifier) .delta(output_identifier) - if (batchId % optimizeEvery == 0) { + if (optimize && batchId % optimizeEvery == 0) { compactAndVacuum } @@ -60,7 +60,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, .option("txnVersion", batchId).option("txnAppId", output_identifier) .delta(output_identifier) - if (batchId % optimizeEvery == 0) { + if (optimize && batchId % optimizeEvery == 0) { compactAndVacuum } @@ -84,7 +84,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, .whenNotMatched().insertAll() .execute() - if (batchId % optimizeEvery == 0 && optimize) { + if (optimize && batchId % optimizeEvery == 0){ compactAndVacuum } } @@ -104,7 +104,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, .whenMatched().delete() .execute() - if (batchId % optimizeEvery == 0) { + if (optimize && batchId % optimizeEvery == 0) { compactAndVacuum } } @@ -143,34 +143,24 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode, val data_query = writeMode match { case WriteMode.Append => df - - val baseQuery = df - .writeStream - .outputMode("append") - .option("mergeSchema", "true") - .option("checkpointLocation", checkpointLocation) - - if (optimize) { - baseQuery.foreachBatch(appendToDelta _).start - } else { - baseQuery.start(outputPath) - } + .writeStream + .outputMode("append") + .option("mergeSchema", "true") + .option("checkpointLocation", checkpointLocation) + .foreachBatch(appendToDelta _) + .start case WriteMode.Overwrite => DeltaTable.forPath(outputPath).delete() - val baseQuery = df + df .writeStream .outputMode("append") .option("overwriteSchema", "true") .option("checkpointLocation", checkpointLocation) - - if (optimize) { - baseQuery.foreachBatch(appendToDelta _).start - } else { - baseQuery.start(outputPath) - } + .foreachBatch(appendToDelta _) + .start case WriteMode.Upsert => df .writeStream