Skip to content

Commit

Permalink
Add upsert mode to Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
braislchao committed Nov 7, 2024
1 parent f4789dd commit 9ed76ab
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ class IcebergWriter(
}

case WriteMode.Upsert =>
throw new NotImplementedError("Batch Upsert is not supported in Iceberg yet")
try {
df.writeTo(output_identifier).using("iceberg").create()
}catch {
case e: AnalysisException =>
logger.warn("Create table failed: " + e)
df.writeTo(output_identifier).overwritePartitions()
}


case WriteMode.Delete =>
throw new NotImplementedError("Delete is not supported in Iceberg yet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,64 @@ class IcebergWriterTest extends AnyFunSuite
iceberg.write(inputDF, EngineMode.Batch)

val outputDf = spark.table(fqn)

val expectedDf = inputDF.union(inputDF)

assertDataFrameNoOrderEquals(expectedDf, outputDf)
cleanUpTestDir()
}

test("Iceberg batch overwrite") {
test("Iceberg batch overwrite wrong data") {
cleanUpTestDir()
val table = "letters_overwrite"
val table = "letters_overwrite_bad"
val fqn = s"$catalog.$database.$table"
val inputDF = createExpectedDataFrame()

val differentInputDF = createDifferentDataFrame()

val wm = WriteMode.Overwrite
val cpl = ""
val iceberg = new IcebergWriter(fqn, wm, cpl)(spark)

iceberg.write(inputDF, EngineMode.Batch)
iceberg.write(differentInputDF, EngineMode.Batch)

val outputDf = spark.table(fqn)

assertDataFrameNoOrderEquals(differentInputDF, outputDf)
cleanUpTestDir()
}

test("Iceberg batch upsert wrong data") {
cleanUpTestDir()
val table = "letters_upsert_bad"
val fqn = s"$catalog.$database.$table"
val inputDF = createExpectedDataFrame()
val differentInputDF = createDifferentDataFrame()

val wm = WriteMode.Upsert
val cpl = ""
val iceberg = new IcebergWriter(fqn, wm, cpl)(spark)

iceberg.write(inputDF, EngineMode.Batch)

val exception = intercept[Exception] {
iceberg.write(differentInputDF, EngineMode.Batch)
}

assert(exception.getMessage.contains("Cannot write incompatible data to table"))
cleanUpTestDir()
}

test("Iceberg batch upsert") {
cleanUpTestDir()
val table = "letters_upsert"
val fqn = s"$catalog.$database.$table"
val inputDF = createExpectedDataFrame()

val wm = WriteMode.Upsert
val cpl = ""
val iceberg = new IcebergWriter(fqn, wm, cpl)(spark)

iceberg.write(inputDF, EngineMode.Batch)
iceberg.write(inputDF, EngineMode.Batch)

val outputDf = spark.table(fqn)
Expand Down

0 comments on commit 9ed76ab

Please sign in to comment.