Skip to content

Commit

Permalink
Improve Iceberg Writer test with hadoop file catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
braislchao committed Oct 29, 2024
1 parent 5a544e3 commit c28e9fc
Showing 1 changed file with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,19 @@ class IcebergWriterTest extends AnyFunSuite
)

val testDir = "./src/test/tmp/iw_test/"
val catalog = "spark_catalog"
val database = "data_lake"

override def conf: SparkConf = super.conf
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.set("spark.sql.catalog.spark_catalog.type", "hive")
.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.local.type", "hadoop")
.set("spark.sql.catalog.local.warehouse", s"$testDir")
.set("spark.sql.defaultCatalog", "local")
.set("spark.sql.catalog.spark_catalog.type", "hadoop")
.set("spark.sql.catalog.spark_catalog.warehouse", s"$testDir")
.set("spark.sql.defaultCatalog", s"$catalog")
.set("spark.sql.catalog.spark_catalog.default-namespace", "spark_catalog")

private def createExpectedDataFrame(): DataFrame = {
spark.sql(s"CREATE DATABASE IF NOT EXISTS $database")
spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
Expand All @@ -62,14 +64,15 @@ class IcebergWriterTest extends AnyFunSuite
test("Iceberg batch append") {
cleanUpTestDir()
val table = "letters_append"
val database = "data_lake"
val fqn = s"local.$database.$table"
val fqn = s"$catalog.$database.$table"
val inputDF = createExpectedDataFrame()


val wm = WriteMode.Append
val cpl = ""
val iceberg = new IcebergWriter(fqn, wm, cpl)(spark)


iceberg.write(inputDF, EngineMode.Batch)
iceberg.write(inputDF, EngineMode.Batch)

Expand All @@ -84,11 +87,9 @@ class IcebergWriterTest extends AnyFunSuite
test("Iceberg batch overwrite") {
cleanUpTestDir()
val table = "letters_overwrite"
val database = "data_lake"
val fqn = s"local.$database.$table"
val fqn = s"$catalog.$database.$table"
val inputDF = createExpectedDataFrame()

spark.sql("SELECT current_catalog();").show()

val wm = WriteMode.Overwrite
val cpl = ""
Expand All @@ -104,9 +105,8 @@ class IcebergWriterTest extends AnyFunSuite

test("Iceberg streaming append") {
cleanUpTestDir()
val database = "data_lake"
val expected = s"local.$database.letters"
val result = s"local.$database.letters_result"
val expected = s"$catalog.$database.letters"
val result = s"$catalog.$database.letters_result"

val expectedDf = createExpectedDataFrame()

Expand Down

0 comments on commit c28e9fc

Please sign in to comment.