diff --git a/src/test/scala/com/metabolic/data/core/services/spark/writer/IcebergWriterTest.scala b/src/test/scala/com/metabolic/data/core/services/spark/writer/IcebergWriterTest.scala index 3d9a2e2..ab825c8 100644 --- a/src/test/scala/com/metabolic/data/core/services/spark/writer/IcebergWriterTest.scala +++ b/src/test/scala/com/metabolic/data/core/services/spark/writer/IcebergWriterTest.scala @@ -38,17 +38,19 @@ class IcebergWriterTest extends AnyFunSuite ) val testDir = "./src/test/tmp/iw_test/" + val catalog = "spark_catalog" + val database = "data_lake" override def conf: SparkConf = super.conf - .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension") .set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") - .set("spark.sql.catalog.spark_catalog.type", "hive") - .set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") - .set("spark.sql.catalog.local.type", "hadoop") - .set("spark.sql.catalog.local.warehouse", s"$testDir") - .set("spark.sql.defaultCatalog", "local") + .set("spark.sql.catalog.spark_catalog.type", "hadoop") + .set("spark.sql.catalog.spark_catalog.warehouse", s"$testDir") + .set("spark.sql.defaultCatalog", s"$catalog") + .set("spark.sql.catalog.spark_catalog.default-namespace", "spark_catalog") private def createExpectedDataFrame(): DataFrame = { + spark.sql(s"CREATE DATABASE IF NOT EXISTS $database") spark.createDataFrame( spark.sparkContext.parallelize(expectedData), StructType(expectedSchema) @@ -62,14 +64,15 @@ class IcebergWriterTest extends AnyFunSuite test("Iceberg batch append") { cleanUpTestDir() val table = "letters_append" - val database = "data_lake" - val fqn = s"local.$database.$table" + val fqn = s"$catalog.$database.$table" val inputDF = createExpectedDataFrame() + val wm = WriteMode.Append val cpl = "" val iceberg = new IcebergWriter(fqn, wm, cpl)(spark) + iceberg.write(inputDF, EngineMode.Batch) iceberg.write(inputDF, EngineMode.Batch) @@ -84,11 +87,9 @@ class IcebergWriterTest extends AnyFunSuite test("Iceberg batch overwrite") { cleanUpTestDir() val table = "letters_overwrite" - val database = "data_lake" - val fqn = s"local.$database.$table" + val fqn = s"$catalog.$database.$table" val inputDF = createExpectedDataFrame() - spark.sql("SELECT current_catalog();").show() val wm = WriteMode.Overwrite val cpl = "" @@ -104,9 +105,8 @@ class IcebergWriterTest extends AnyFunSuite test("Iceberg streaming append") { cleanUpTestDir() - val database = "data_lake" - val expected = s"local.$database.letters" - val result = s"local.$database.letters_result" + val expected = s"$catalog.$database.letters" + val result = s"$catalog.$database.letters_result" val expectedDf = createExpectedDataFrame()