Skip to content

Commit

Permalink
Separate the append ops when its optimizing and when its not
Browse files Browse the repository at this point in the history
  • Loading branch information
margon8 committed May 22, 2024
1 parent c6638a0 commit 542c559
Showing 1 changed file with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,
.option("txnVersion", batchId).option("txnAppId", output_identifier)
.delta(output_identifier)

if (optimize && batchId % optimizeEvery == 0) {
if (batchId % optimizeEvery == 0) {
compactAndVacuum
}

Expand Down Expand Up @@ -142,25 +142,41 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,
override def writeStream(df: DataFrame): StreamingQuery = {

val data_query = writeMode match {
case WriteMode.Append => df
.writeStream
.outputMode("append")
.option("mergeSchema", "true")
.option("checkpointLocation", checkpointLocation)
.foreachBatch(appendToDelta _)
.start
case WriteMode.Append =>

val sw = df
.writeStream
.outputMode("append")
.option("mergeSchema", "true")
.option("checkpointLocation", checkpointLocation)

if (optimize) {
sw
.foreachBatch(appendToDelta _)
.start
} else {
sw
.delta(output_identifier)
}

case WriteMode.Overwrite =>

DeltaTable.forPath(outputPath).delete()

df
val sw = df
.writeStream
.outputMode("append")
.option("overwriteSchema", "true")
.option("checkpointLocation", checkpointLocation)
.foreachBatch(appendToDelta _)
.start

if (optimize) {
sw
.foreachBatch(appendToDelta _)
.start
} else {
sw
.delta(output_identifier)
}

case WriteMode.Upsert => df
.writeStream
Expand Down

0 comments on commit 542c559

Please sign in to comment.