Skip to content

Commit

Permalink
Merge pull request #40 from metabolicdata/hotfix/keep_foreach_append
Browse files Browse the repository at this point in the history
Keep foreachBatch for append and overwrite
  • Loading branch information
margon8 authored May 17, 2024
2 parents e7d08f0 + f953918 commit 02179be
Showing 1 changed file with 13 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,
.option("txnVersion", batchId).option("txnAppId", output_identifier)
.delta(output_identifier)

if (batchId % optimizeEvery == 0) {
if (optimize && batchId % optimizeEvery == 0) {
compactAndVacuum
}

Expand All @@ -60,7 +60,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,
.option("txnVersion", batchId).option("txnAppId", output_identifier)
.delta(output_identifier)

if (batchId % optimizeEvery == 0) {
if (optimize && batchId % optimizeEvery == 0) {
compactAndVacuum
}

Expand All @@ -84,7 +84,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,
.whenNotMatched().insertAll()
.execute()

if (batchId % optimizeEvery == 0 && optimize) {
if (optimize && batchId % optimizeEvery == 0){
compactAndVacuum
}
}
Expand All @@ -104,7 +104,7 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,
.whenMatched().delete()
.execute()

if (batchId % optimizeEvery == 0) {
if (optimize && batchId % optimizeEvery == 0) {
compactAndVacuum
}
}
Expand Down Expand Up @@ -143,34 +143,24 @@ class DeltaWriter(val outputPath: String, val writeMode: WriteMode,

val data_query = writeMode match {
case WriteMode.Append => df

val baseQuery = df
.writeStream
.outputMode("append")
.option("mergeSchema", "true")
.option("checkpointLocation", checkpointLocation)

if (optimize) {
baseQuery.foreachBatch(appendToDelta _).start
} else {
baseQuery.start(outputPath)
}
.writeStream
.outputMode("append")
.option("mergeSchema", "true")
.option("checkpointLocation", checkpointLocation)
.foreachBatch(appendToDelta _)
.start

case WriteMode.Overwrite =>

DeltaTable.forPath(outputPath).delete()

val baseQuery = df
df
.writeStream
.outputMode("append")
.option("overwriteSchema", "true")
.option("checkpointLocation", checkpointLocation)

if (optimize) {
baseQuery.foreachBatch(appendToDelta _).start
} else {
baseQuery.start(outputPath)
}
.foreachBatch(appendToDelta _)
.start

case WriteMode.Upsert => df
.writeStream
Expand Down

0 comments on commit 02179be

Please sign in to comment.