From 9ed76abc3d7378e927513b6a7036acc52af41793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brais=20L=C3=B3pez=20Chao?= Date: Thu, 7 Nov 2024 10:39:13 +0100 Subject: [PATCH] Add upsert mode to Iceberg --- .../spark/writer/file/IcebergWriter.scala | 9 +++- .../spark/writer/IcebergWriterTest.scala | 48 +++++++++++++++++-- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/IcebergWriter.scala b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/IcebergWriter.scala index 82cee1a..d2652f1 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/IcebergWriter.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/IcebergWriter.scala @@ -39,7 +39,14 @@ class IcebergWriter( } case WriteMode.Upsert => - throw new NotImplementedError("Batch Upsert is not supported in Iceberg yet") + try { + df.writeTo(output_identifier).using("iceberg").create() + }catch { + case e: AnalysisException => + logger.warn("Create table failed: " + e) + df.writeTo(output_identifier).overwritePartitions() + } + case WriteMode.Delete => throw new NotImplementedError("Delete is not supported in Iceberg yet") diff --git a/src/test/scala/com/metabolic/data/core/services/spark/writer/IcebergWriterTest.scala b/src/test/scala/com/metabolic/data/core/services/spark/writer/IcebergWriterTest.scala index f713b06..609fad0 100644 --- a/src/test/scala/com/metabolic/data/core/services/spark/writer/IcebergWriterTest.scala +++ b/src/test/scala/com/metabolic/data/core/services/spark/writer/IcebergWriterTest.scala @@ -124,24 +124,64 @@ class IcebergWriterTest extends AnyFunSuite iceberg.write(inputDF, EngineMode.Batch) val outputDf = spark.table(fqn) - val expectedDf = inputDF.union(inputDF) assertDataFrameNoOrderEquals(expectedDf, outputDf) cleanUpTestDir() } - test("Iceberg batch overwrite") { + test("Iceberg batch overwrite wrong data") { cleanUpTestDir() - val table = "letters_overwrite" + val table = "letters_overwrite_bad" val fqn = s"$catalog.$database.$table" val inputDF = createExpectedDataFrame() - + val differentInputDF = createDifferentDataFrame() val wm = WriteMode.Overwrite val cpl = "" val iceberg = new IcebergWriter(fqn, wm, cpl)(spark) + iceberg.write(inputDF, EngineMode.Batch) + iceberg.write(differentInputDF, EngineMode.Batch) + + val outputDf = spark.table(fqn) + + assertDataFrameNoOrderEquals(differentInputDF, outputDf) + cleanUpTestDir() + } + + test("Iceberg batch upsert wrong data") { + cleanUpTestDir() + val table = "letters_upsert_bad" + val fqn = s"$catalog.$database.$table" + val inputDF = createExpectedDataFrame() + val differentInputDF = createDifferentDataFrame() + + val wm = WriteMode.Upsert + val cpl = "" + val iceberg = new IcebergWriter(fqn, wm, cpl)(spark) + + iceberg.write(inputDF, EngineMode.Batch) + + val exception = intercept[Exception] { + iceberg.write(differentInputDF, EngineMode.Batch) + } + + assert(exception.getMessage.contains("Cannot write incompatible data to table")) + cleanUpTestDir() + } + + test("Iceberg batch upsert") { + cleanUpTestDir() + val table = "letters_upsert" + val fqn = s"$catalog.$database.$table" + val inputDF = createExpectedDataFrame() + + val wm = WriteMode.Upsert + val cpl = "" + val iceberg = new IcebergWriter(fqn, wm, cpl)(spark) + + iceberg.write(inputDF, EngineMode.Batch) iceberg.write(inputDF, EngineMode.Batch) val outputDf = spark.table(fqn)