Skip to content

Commit

Permalink
Implement generic Stream Reader + tests for Iceberg and Delta compati…
Browse files Browse the repository at this point in the history
…bility
  • Loading branch information
braislchao committed Sep 12, 2024
1 parent 7c40774 commit d7bd30a
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,27 +1,50 @@
package com.metabolic.data.core.services.spark.reader.table

import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader
import org.apache.logging.log4j.scala.Logging
import org.apache.spark.sql.delta.implicits.stringEncoder
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{DataFrame, SparkSession}

class GenericReader(fqn: String) extends DataframeUnifiedReader {
class GenericReader(fqn: String) extends DataframeUnifiedReader with Logging{

override val input_identifier: String = fqn

private def getTableProvider(spark: SparkSession): String = {
spark.sql(s"DESCRIBE FORMATTED $input_identifier")
.filter(col("col_name").contains("Provider"))
.select("data_type")
.as[String]
.first()
}

override def readBatch(spark: SparkSession): DataFrame = {
//Generic for Delta Lake and Iceberg tables using fqn
spark.table(input_identifier)
}

override def readStream(spark: SparkSession): DataFrame = {
//TODO: addformat delta readstream
//format Delta
//spark.readStream.format("delta").table("campaign_all_delta_stream")

//format Iceberg
spark.readStream
.format("iceberg")
.option("stream-from-timestamp", (System.currentTimeMillis() - 3600000).toString)
.load(input_identifier)

val provider = getTableProvider(spark)
provider match {
case "iceberg" =>
logger.info(s"Reading Iceberg Table source ${input_identifier}")
spark.readStream
.format("iceberg")
.option("stream-from-timestamp", (System.currentTimeMillis() - 3600000).toString)
.load(input_identifier)

case "delta" =>
logger.info(s"Reading Delta Table source $input_identifier")
//TODO: add format delta readstream
spark.readStream
.format("delta")
.table(input_identifier)

case unknown =>
logger.warn(s"Table source $provider not supported for table $input_identifier")
spark.emptyDataFrame
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ class GenericReaderTest extends AnyFunSuite
val delta = new GenericReader(fqn)
val resultDf = delta.read(spark, EngineMode.Batch)

print("comparison:")
expectedDf.show()
resultDf.show()

val sortedExpectedDf = expectedDf.orderBy("name")
val sortedResultDf = resultDf.orderBy("name")

Expand Down Expand Up @@ -143,9 +139,47 @@ class GenericReaderTest extends AnyFunSuite

}

//TODO: Implement this test
ignore("Delta stream read") {
test("Delta stream read") {

new Directory(new File(testDir)).deleteRecursively()

val fqn = "data_lake.letters"
spark.sql("CREATE DATABASE IF NOT EXISTS data_lake")

val expectedDf = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
)

expectedDf
.write
.format("delta")
.mode("overwrite")
.saveAsTable(fqn)

val delta = new GenericReader(fqn)
val inputDf = delta.read(spark, EngineMode.Stream)

val checkpointPath = testDir + "checkpoints"

val query = inputDf.writeStream
.format("parquet") // or "csv", "json", etc.
.outputMode("append") // Ensure the output mode is correct for your use case
.trigger(Trigger.Once()) // Process only one batch
.option("checkpointLocation", checkpointPath)
.option("path", testDir + "letters") // Specify the output path for the file
.start()

query.awaitTermination()

val resultDf = spark.read
.format("parquet")
.load(testDir + "letters")

val sortedExpectedDf = expectedDf.orderBy("name")
val sortedResultDf = resultDf.orderBy("name")

assertDataFrameEquals(sortedExpectedDf, sortedResultDf)
}

}

0 comments on commit d7bd30a

Please sign in to comment.