diff --git a/src/main/scala/com/metabolic/data/core/services/spark/reader/file/DeltaReader.scala b/src/main/scala/com/metabolic/data/core/services/spark/reader/file/DeltaReader.scala index 14b1068f..df53741e 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/reader/file/DeltaReader.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/reader/file/DeltaReader.scala @@ -3,26 +3,56 @@ package com.metabolic.data.core.services.spark.reader.file import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader import org.apache.spark.sql.{DataFrame, SparkSession} -class DeltaReader(val input_identifier: String) extends DataframeUnifiedReader { +import scala.reflect.io.Directory +import java.io.File + +class DeltaReader(val input_identifier: String, historical: Boolean, startTimestamp: String, checkpointPath: String) extends DataframeUnifiedReader { import io.delta.implicits._ override def readBatch(spark: SparkSession): DataFrame = { - spark.read + val sr = spark.read + val osr = historical match { + case true => { + val directoryPath = new Directory(new File(checkpointPath)) + directoryPath.deleteRecursively() + sr + } + case false => startTimestamp match { + case "" => sr + case _ => sr.option("timestampAsOf", startTimestamp) + } + } + osr .delta(input_identifier) } override def readStream(spark: SparkSession): DataFrame = { - spark.readStream - .delta(input_identifier) - + val sr = spark.readStream + val osr = historical match { + case true => { + val directoryPath = new Directory(new File(checkpointPath)) + directoryPath.deleteRecursively() + sr.option("startingTimestamp", "2000-01-01") + } + case false => startTimestamp match { + case "" => sr + case _ => sr.option("startingTimestamp", startTimestamp)} + } + + osr + .delta(input_identifier) } } object DeltaReader { - def apply(input_identifier: String) = new DeltaReader(input_identifier) + def apply(input_identifier: String) = new DeltaReader(input_identifier, false, "", "") + def apply(input_identifier: String, historical: Boolean, checkpointPath: String) = new DeltaReader(input_identifier, historical, "", checkpointPath) + def apply(input_identifier: String, startTimestamp: String) = new DeltaReader(input_identifier, false, startTimestamp, "") + + } diff --git a/src/main/scala/com/metabolic/data/core/services/spark/reader/stream/KafkaReader.scala b/src/main/scala/com/metabolic/data/core/services/spark/reader/stream/KafkaReader.scala index 08f5b523..d5b78559 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/reader/stream/KafkaReader.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/reader/stream/KafkaReader.scala @@ -5,8 +5,10 @@ import org.apache.spark.sql.functions.{col, schema_of_json} import org.apache.spark.sql.streaming.DataStreamReader import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession} +import scala.reflect.io.Directory +import java.io.File -class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, topic: String) +class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, topic: String, historical: Boolean, startTimestamp: String, checkpointPath: String) extends DataframeUnifiedReader { override val input_identifier: String = topic @@ -58,15 +60,31 @@ class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, t def readStream(spark: SparkSession): DataFrame = { + if(historical){ + val directoryPath = new Directory(new File(checkpointPath)) + directoryPath.deleteRecursively() + } val plain = spark .readStream + + startTimestamp match { + case "" => plain .format("kafka") .option("kafka.bootstrap.servers", servers.mkString(",")) .option("subscribe", topic) .option("kafka.session.timeout.ms", 45000) .option("kafka.client.dns.lookup","use_all_dns_ips") - .option("startingOffsets", "latest") + .option("startingOffsets", if (historical) "earliest" else "latest") .option("failOnDataLoss", false) + case _ => plain + .format("kafka") + .option("kafka.bootstrap.servers", servers.mkString(",")) + .option("subscribe", topic) + .option("kafka.session.timeout.ms", 45000) + .option("kafka.client.dns.lookup", "use_all_dns_ips") + .option("startingTimestamp", startTimestamp) + .option("failOnDataLoss", false) + } val input = setStreamAuthentication(plain) @@ -96,5 +114,8 @@ class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, t } object KafkaReader { - def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String) = new KafkaReader(servers, apiKey, apiSecret, topic) + def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String) = new KafkaReader(servers, apiKey, apiSecret, topic, false, "", "") + def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String, historical:Boolean, checkpointPath:String) = new KafkaReader(servers, apiKey, apiSecret, topic, historical, "", checkpointPath) + def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String, startTimestamp: String) = new KafkaReader(servers, apiKey, apiSecret, topic, false, startTimestamp, "") + } diff --git a/src/main/scala/com/metabolic/data/core/services/spark/writer/DataframeUnifiedWriter.scala b/src/main/scala/com/metabolic/data/core/services/spark/writer/DataframeUnifiedWriter.scala index 18f330a6..0f88e99d 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/writer/DataframeUnifiedWriter.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/writer/DataframeUnifiedWriter.scala @@ -2,6 +2,7 @@ package com.metabolic.data.core.services.spark.writer import com.metabolic.data.mapper.domain.io.EngineMode import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode +import io.delta.tables.DeltaTable import org.apache.logging.log4j.scala.Logging import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, SaveMode} @@ -21,8 +22,14 @@ trait DataframeUnifiedWriter extends Logging { def postHook(df: DataFrame, streamingQuery: Option[StreamingQuery] ): Boolean = { +// if (DeltaTable.isDeltaTable(output_identifier)){ +// val deltaTable = DeltaTable.forPath(output_identifier) +// +// logger.info("optimize with z order") +// deltaTable.optimize().executeZOrderBy("column") +// +// } streamingQuery.flatMap(stream => Option.apply(stream.awaitTermination())) - true } diff --git a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala index 941d4c89..0a77d416 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/writer/file/DeltaWriter.scala @@ -55,7 +55,6 @@ class DeltaWriter(val outputPath: String, val saveMode: SaveMode, .option("mergeSchema", "true") .option("checkpointLocation", checkpointLocation) .start(output_identifier) - } case SaveMode.Overwrite => { df.writeStream @@ -99,28 +98,10 @@ class DeltaWriter(val outputPath: String, val saveMode: SaveMode, override def preHook(df: DataFrame): DataFrame = { - val tableName: String = ConfigUtilsService.getTablePrefix(namespaces, output_identifier)+ConfigUtilsService.getTableNameFileSink(output_identifier) - + val prefix = ConfigUtilsService.getTablePrefix(namespaces, output_identifier) + val tableName: String = prefix + ConfigUtilsService.getTableNameFileSink(output_identifier) if (!DeltaTable.isDeltaTable(outputPath)) { if (!File(outputPath).exists) { - //In this way a table is created which can be read but the schema is not visible. - /* - //Check if the database has location - new GlueCatalogService() - .checkDatabase(dbName, ConfigUtilsService.getDataBaseName(outputPath)) - //Create the delta table - val deltaTable = DeltaTable.createIfNotExists() - .tableName(dbName + "." + tableName) - .location(output_identifier) - .addColumns(df.schema) - .partitionedBy(partitionColumnNames: _*) - .execute() - deltaTable.toDF.write.format("delta").mode(SaveMode.Append).save(output_identifier) - //Create table in Athena (to see the columns) - new AthenaCatalogueService() - .createDeltaTable(dbName, tableName, output_identifier, true)*/ - - //Redo if the other way works for us // create an empty RDD with original schema val emptyRDD = spark.sparkContext.emptyRDD[Row] val emptyDF = spark.createDataFrame(emptyRDD, df.schema) @@ -132,6 +113,9 @@ class DeltaWriter(val outputPath: String, val saveMode: SaveMode, //Create table in Athena new AthenaCatalogueService() .createDeltaTable(dbName, tableName, output_identifier) + //Create table in Athena separate schema + new AthenaCatalogueService() + .createDeltaTable(dbName + "_" + prefix.dropRight(1), tableName, output_identifier) } else { //Convert to delta if parquet @@ -145,6 +129,8 @@ class DeltaWriter(val outputPath: String, val saveMode: SaveMode, override def postHook(df: DataFrame, query: Option[StreamingQuery]): Boolean = { //Not for current version //deltaTable.optimize().executeCompaction() + //deltaTable.optimize().executeZOrderBy("column") + query.flatMap(stream => Option.apply(stream.awaitTermination())) true } diff --git a/src/main/scala/com/metabolic/data/core/services/spark/writer/partitioned_file/DeltaPartitionWriter.scala b/src/main/scala/com/metabolic/data/core/services/spark/writer/partitioned_file/DeltaPartitionWriter.scala index a38739ed..2ace1a21 100644 --- a/src/main/scala/com/metabolic/data/core/services/spark/writer/partitioned_file/DeltaPartitionWriter.scala +++ b/src/main/scala/com/metabolic/data/core/services/spark/writer/partitioned_file/DeltaPartitionWriter.scala @@ -86,7 +86,8 @@ class DeltaPartitionWriter(val partitionColumnNames: Seq[String], override def preHook(df: DataFrame): DataFrame = { if (partitionColumnNames.size > 0) { - val tableName: String = ConfigUtilsService.getTablePrefix(namespaces, output_identifier)+ConfigUtilsService.getTableNameFileSink(output_identifier) + val prefix = ConfigUtilsService.getTablePrefix(namespaces, output_identifier) + val tableName: String = prefix+ConfigUtilsService.getTableNameFileSink(output_identifier) if (!DeltaTable.isDeltaTable(outputPath)) { if (!File(outputPath).exists) { @@ -120,7 +121,9 @@ class DeltaPartitionWriter(val partitionColumnNames: Seq[String], //Create table in Athena new AthenaCatalogueService() .createDeltaTable(dbName, tableName, output_identifier) - + //Create table in Athena separate schema + new AthenaCatalogueService() + .createDeltaTable(dbName+"_"+ prefix.dropRight(1), tableName, output_identifier) } else { //Convert to delta if parquet DeltaTable.convertToDelta(spark, s"parquet.`$outputPath`") @@ -136,8 +139,14 @@ class DeltaPartitionWriter(val partitionColumnNames: Seq[String], } override def postHook(df: DataFrame, query: Option[StreamingQuery]): Boolean = { - //Not for current version - //deltaTable.optimize().executeCompaction() + +// if (DeltaTable.isDeltaTable(output_identifier)) { +// val deltaTable = DeltaTable.forPath(output_identifier) +// deltaTable.optimize().executeCompaction() +// +// logger.info("optimize with z order") +// deltaTable.optimize().executeZOrderBy(column_name) +// } query.flatMap(stream => Option.apply(stream.awaitTermination())) true diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala index 674f5390..6ed8762b 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala @@ -90,8 +90,12 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging { def transform(mapping: Config)(implicit spark: SparkSession, region: Regions): Unit = { + val checkpointPath = mapping.environment.baseCheckpointLocation + "/checkpoints/" + mapping.sink.name + .toLowerCase() + .replaceAll("\\W", "_") + mapping.sources.foreach { source => - MetabolicReader.read(source, mapping.environment.historical, mapping.environment.mode) + MetabolicReader.read(source, mapping.environment.historical, mapping.environment.mode, checkpointPath) } mapping.mappings.foreach { mapping => @@ -100,8 +104,9 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging { val output: DataFrame = spark.table("output") + MetabolicWriter.write(output, mapping.sink, mapping.environment.historical, mapping.environment.autoSchema, - mapping.environment.baseCheckpointLocation, mapping.environment.mode, mapping.environment.namespaces) + checkpointPath, mapping.environment.mode, mapping.environment.namespaces) } diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala index 12ba3dd8..2ae0c5ee 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicReader.scala @@ -14,9 +14,9 @@ import org.apache.spark.sql.{DataFrame, SparkSession} object MetabolicReader extends Logging { - def read(source: Source, historical: Boolean, mode: EngineMode)(implicit spark: SparkSession) = { + def read(source: Source, historical: Boolean, mode: EngineMode, checkpointPath: String)(implicit spark: SparkSession) = { - val input = readSource(source, mode, spark) + val input = readSource(source, historical, mode, spark, checkpointPath) val prepared = prepareSource(source, historical, input) @@ -24,14 +24,14 @@ object MetabolicReader extends Logging { } - private def readSource(source: Source, mode: EngineMode, spark: SparkSession) = { + private def readSource(source: Source, historical: Boolean, mode: EngineMode, spark: SparkSession, checkpointPath: String) = { source match { case streamSource: StreamSource => { logger.info(s"Reading stream source ${streamSource.name} from ${streamSource.topic}") streamSource.format match { - case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic) + case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic, historical, streamSource.startTimestamp, checkpointPath) .read(spark, mode) } } @@ -50,7 +50,7 @@ object MetabolicReader extends Logging { new ParquetReader(fileSource.inputPath) .read(spark, mode) case IOFormat.DELTA => - new DeltaReader(fileSource.inputPath) + new DeltaReader(fileSource.inputPath, historical, fileSource.startTimestamp, checkpointPath) .read(spark, mode) } } diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicWriter.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicWriter.scala index 0593b8cc..bfb4a95f 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicWriter.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicWriter.scala @@ -62,15 +62,11 @@ object MetabolicWriter extends Logging { } } - def write(df: DataFrame, sink: Sink, historical: Boolean, autoSchema: Boolean, baseCheckpointLocation: String, mode: EngineMode, namespaces: Seq[String]) + def write(df: DataFrame, sink: Sink, historical: Boolean, autoSchema: Boolean, checkpointPath: String, mode: EngineMode, namespaces: Seq[String]) (implicit spark: SparkSession, region: Regions) = { val _df = prepareOutput(sink, df) - val checkpointPath = baseCheckpointLocation + "/checkpoints/" + sink.name - .toLowerCase() - .replaceAll("\\W", "_") - sink match { case streamSink: StreamSink => { diff --git a/src/main/scala/com/metabolic/data/mapper/domain/io/FileSource.scala b/src/main/scala/com/metabolic/data/mapper/domain/io/FileSource.scala index aa0ee1b5..b4e66b95 100644 --- a/src/main/scala/com/metabolic/data/mapper/domain/io/FileSource.scala +++ b/src/main/scala/com/metabolic/data/mapper/domain/io/FileSource.scala @@ -7,6 +7,7 @@ case class FileSource(inputPath: String, name: String, format: IOFormat = DELTA, useStringPrimitives: Boolean = false, - ops: Seq[SourceOp] = Seq.empty + ops: Seq[SourceOp] = Seq.empty, + startTimestamp: String = "" ) extends Source diff --git a/src/main/scala/com/metabolic/data/mapper/domain/io/StreamSource.scala b/src/main/scala/com/metabolic/data/mapper/domain/io/StreamSource.scala index 7a53b417..a2af3d26 100644 --- a/src/main/scala/com/metabolic/data/mapper/domain/io/StreamSource.scala +++ b/src/main/scala/com/metabolic/data/mapper/domain/io/StreamSource.scala @@ -9,5 +9,6 @@ case class StreamSource(name: String, secret: String, topic: String, format: IOFormat = KAFKA, - ops: Seq[SourceOp] = Seq.empty) + ops: Seq[SourceOp] = Seq.empty, + startTimestamp: String) extends Source diff --git a/src/main/scala/com/metabolic/data/mapper/services/SourceConfigParserService.scala b/src/main/scala/com/metabolic/data/mapper/services/SourceConfigParserService.scala index a68aa982..1ce7a8bc 100644 --- a/src/main/scala/com/metabolic/data/mapper/services/SourceConfigParserService.scala +++ b/src/main/scala/com/metabolic/data/mapper/services/SourceConfigParserService.scala @@ -24,7 +24,6 @@ case class SourceConfigParserService()(implicit val region: Regions) extends Log val name = source.getString("name") val ops = checkOps(source) - SourceFormatParser().parse(name, source, ops) } diff --git a/src/main/scala/com/metabolic/data/mapper/services/SourceFormatParser.scala b/src/main/scala/com/metabolic/data/mapper/services/SourceFormatParser.scala index 973ad518..ae65706f 100644 --- a/src/main/scala/com/metabolic/data/mapper/services/SourceFormatParser.scala +++ b/src/main/scala/com/metabolic/data/mapper/services/SourceFormatParser.scala @@ -7,6 +7,7 @@ import com.metabolic.data.mapper.domain.io._ import com.metabolic.data.mapper.domain.ops.SourceOp import com.typesafe.config.{Config => HoconConfig} import org.apache.logging.log4j.scala.Logging +import java.time.{LocalDateTime, ZoneOffset} case class SourceFormatParser()(implicit val region: Regions) extends FormatParser with Logging { @@ -30,8 +31,9 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars private def parseDeltaSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = { val path = if(config.hasPathOrNull("inputPath")) { config.getString("inputPath")} else { config.getString("path") } - - FileSource(path, name, IOFormat.DELTA, false, ops) + val startTimestamp = if(config.hasPathOrNull("startTimestamp")){config.getString("startTimestamp")} + else{""} + FileSource(path, name, IOFormat.DELTA, false, ops, startTimestamp) } private def parseJsonSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = { @@ -41,7 +43,7 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars val useStringPrimitives = if(config.hasPathOrNull("useStringPrimitives")) { config.getBoolean("useStringPrimitives")} else { false } - FileSource(path, name, IOFormat.JSON, useStringPrimitives, ops) + FileSource(path, name, IOFormat.JSON, useStringPrimitives, ops, "") } private def parseCSVSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = { @@ -51,14 +53,14 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars val useStringPrimitives = if(config.hasPathOrNull("useStringPrimitives")) { config.getBoolean("useStringPrimitives")} else { false } - FileSource(path, name, IOFormat.CSV, useStringPrimitives, ops) + FileSource(path, name, IOFormat.CSV, useStringPrimitives, ops, "") } private def parseParquetSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = { val path = if(config.hasPathOrNull("inputPath")) { config.getString("inputPath")} else { config.getString("path") } - FileSource(path, name, IOFormat.PARQUET, false, ops) + FileSource(path, name, IOFormat.PARQUET, false, ops, "") } private def parseKafkaSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = { @@ -73,10 +75,16 @@ case class SourceFormatParser()(implicit val region: Regions) extends FormatPars val servers = kafkaConfig.servers.get val apiKey = kafkaConfig.key.get val apiSecret = kafkaConfig.secret.get - + val startTimestamp = if (config.hasPathOrNull("startTimestamp")) { + config.getString("startTimestamp") + } + else { + "" + } + val dateTime = LocalDateTime.parse(startTimestamp) val topic = config.getString("topic") - StreamSource(name, servers, apiKey, apiSecret, topic, IOFormat.KAFKA, ops) + StreamSource(name, servers, apiKey, apiSecret, topic, IOFormat.KAFKA, ops, dateTime.toEpochSecond(ZoneOffset.UTC).toString) } private def parseMetastoreSource(name: String, config: HoconConfig, ops: Seq[SourceOp]): Source = { diff --git a/src/test/scala/com/metabolic/data/core/services/spark/reader/DeltaReaderTest.scala b/src/test/scala/com/metabolic/data/core/services/spark/reader/DeltaReaderTest.scala new file mode 100644 index 00000000..eec0f069 --- /dev/null +++ b/src/test/scala/com/metabolic/data/core/services/spark/reader/DeltaReaderTest.scala @@ -0,0 +1,295 @@ +package com.metabolic.data.core.services.spark.writer + +import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext} +import com.metabolic.data.RegionedTest +import com.metabolic.data.core.services.spark.reader.file.DeltaReader +import com.metabolic.data.mapper.domain.io.EngineMode +import io.delta.implicits.{DeltaDataFrameWriter, DeltaDataStreamReader} +import org.apache.spark.SparkConf +import org.apache.spark.sql.streaming.{OutputMode, Trigger} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row, SaveMode} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.Futures.timeout +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.time.{Seconds, Span} + +import scala.reflect.io.Directory +import java.io.File + +class DeltaReaderTest extends AnyFunSuite + with DataFrameSuiteBase + with SharedSparkContext + with BeforeAndAfterAll + with RegionedTest { + + override def conf: SparkConf = super.conf + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + + val delta_source_path = "src/test/tmp/delta/letters_5" + val path = "src/test/tmp/delta/letters_6" + val pathCheckpoint = "src/test/tmp/delta/letters_6_checkpoint" + + val eventA = Seq( + Row("A", "a", 2022, 2, 5, "2022-02-05") + ) + + val someSchema = List( + StructField("name", StringType, true), + StructField("data", StringType, true), + StructField("yyyy", IntegerType, true), + StructField("mm", IntegerType, true), + StructField("dd", IntegerType, true), + StructField("date", StringType, true), + ) + + def write_into_source(df: DataFrame, path: String, savemode: SaveMode): Unit = { + df + .write + .mode(savemode) + .option("overwriteSchema", "true") + .option("mergeSchema", "true") + .delta(path) + } + + test("Create Delta source for streaming with event A"){ + val sqlCtx = sqlContext + + val inputDF = spark.createDataFrame( + spark.sparkContext.parallelize(eventA), + StructType(someSchema) + ) + + val directoryPath = new Directory(new File(delta_source_path)) + directoryPath.deleteRecursively() + + write_into_source(inputDF, delta_source_path, SaveMode.Overwrite) + val outputDf = DeltaReader(delta_source_path) + .read(sqlCtx.sparkSession, EngineMode.Batch) + assertDataFrameNoOrderEquals(inputDF, outputDf) + + } + + test("Read and Write event A into sink - hist mode") { + val sqlCtx = sqlContext + + val directoryPath = new Directory(new File(path)) + directoryPath.deleteRecursively() + + val directoryCheckpoint = new Directory(new File(pathCheckpoint)) + directoryCheckpoint.deleteRecursively() + + val eventAdf = spark.createDataFrame( + spark.sparkContext.parallelize(eventA), + StructType(someSchema) + ) + //Create table + val emptyRDD = spark.sparkContext.emptyRDD[Row] + val emptyDF = spark.createDataFrame(emptyRDD, eventAdf.schema) + emptyDF + .write + .format("delta") + .mode(SaveMode.Append) + .save(path) + + val outputDf = spark.readStream + .option("startingTimestamp", "2000-01-01") + .delta(delta_source_path) + .writeStream + .format("delta") + .outputMode("append") + .option("mergeSchema", "true") + .option("checkpointLocation", pathCheckpoint) + .trigger(Trigger.ProcessingTime("5 seconds")) + .start(path) + outputDf.awaitTermination(20000) + + eventually(timeout(Span(30, Seconds))) { + val outputDf2 = DeltaReader(path) + .read(sqlCtx.sparkSession, EngineMode.Batch) + assertDataFrameNoOrderEquals(eventAdf, outputDf2) + } + + } + + test("Add event B to the source") { + val sqlCtx = sqlContext + + val eventB = Seq( + Row("B", "b", 2022, 2, 6, "2022-02-06") + ) + + val sourceData = Seq( + Row("A", "a", 2022, 2, 5, "2022-02-05"), + Row("B", "b", 2022, 2, 6, "2022-02-06") + ) + + val eventBdf = spark.createDataFrame( + spark.sparkContext.parallelize(eventB), + StructType(someSchema) + ) + + val sourceDf = spark.createDataFrame( + spark.sparkContext.parallelize(sourceData), + StructType(someSchema) + ) + + write_into_source(eventBdf, delta_source_path, SaveMode.Append) + val outputDf = DeltaReader(delta_source_path) + .read(sqlCtx.sparkSession, EngineMode.Batch) + assertDataFrameNoOrderEquals(sourceDf, outputDf) + } + + test("Read and Write event B into sink - without historical") { + val sqlCtx = sqlContext + + val expectedData = Seq( + Row("A", "a", 2022, 2, 5, "2022-02-05"), + Row("B", "b", 2022, 2, 6, "2022-02-06") + ) + val expectedDf = spark.createDataFrame( + spark.sparkContext.parallelize(expectedData), + StructType(someSchema) + ) + + val outputDf = spark.readStream + .delta(delta_source_path) + .writeStream + .format("delta") + .outputMode("append") + .option("mergeSchema", "true") + .option("checkpointLocation", pathCheckpoint) + .trigger(Trigger.ProcessingTime("5 seconds")) + .start(path) + outputDf.awaitTermination(20000) + + eventually(timeout(Span(30, Seconds))) { + val outputDf3 = DeltaReader(path) + .read(sqlCtx.sparkSession, EngineMode.Batch) + assertDataFrameNoOrderEquals(expectedDf, outputDf3) + } + } + + test("Add event C to the source") { + val sqlCtx = sqlContext + + val eventC = Seq( + Row("C", "c", 2022, 2, 7, "2022-02-07") + ) + + val sourceData = Seq( + Row("A", "a", 2022, 2, 5, "2022-02-05"), + Row("B", "b", 2022, 2, 6, "2022-02-06"), + Row("C", "c", 2022, 2, 7, "2022-02-07") + ) + val eventCdf = spark.createDataFrame( + spark.sparkContext.parallelize(eventC), + StructType(someSchema) + ) + val sourceDf = spark.createDataFrame( + spark.sparkContext.parallelize(sourceData), + StructType(someSchema) + ) + write_into_source(eventCdf, delta_source_path, SaveMode.Append) + val outputDf = DeltaReader(delta_source_path) + .read(sqlCtx.sparkSession, EngineMode.Batch) + assertDataFrameNoOrderEquals(sourceDf, outputDf) + } + + test("Read and Write event C into sink - without historical") { + val sqlCtx = sqlContext + + val expectedData = Seq( + Row("A", "a", 2022, 2, 5, "2022-02-05"), + Row("B", "b", 2022, 2, 6, "2022-02-06"), + Row("C", "c", 2022, 2, 7, "2022-02-07"), + ) + val expectedDf = spark.createDataFrame( + spark.sparkContext.parallelize(expectedData), + StructType(someSchema) + ) + + val outputDf = spark.readStream + .delta(delta_source_path) + .writeStream + .format("delta") + .outputMode("append") + .option("mergeSchema", "true") + .option("checkpointLocation", pathCheckpoint) + .trigger(Trigger.ProcessingTime("5 seconds")) + .start(path) + outputDf.awaitTermination(20000) + + assertDataFrameNoOrderEquals(expectedDf, DeltaReader(path) + .read(sqlCtx.sparkSession, EngineMode.Batch)) + + } + + test("Read table with historical and checkpoint") { + val sqlCtx = sqlContext + + val expectedData = Seq( + Row("A", "a", 2022, 2, 5, "2022-02-05"), + Row("B", "b", 2022, 2, 6, "2022-02-06"), + Row("C", "c", 2022, 2, 7, "2022-02-07"), + ) + val expectedDf = spark.createDataFrame( + spark.sparkContext.parallelize(expectedData), + StructType(someSchema) + ) + + val outputDf = spark.readStream + .option("startingTimestamp", "2000-01-01") + .delta(delta_source_path) + .writeStream + .format("delta") + .outputMode("append") + .option("mergeSchema", "true") + .option("checkpointLocation", pathCheckpoint) + .trigger(Trigger.ProcessingTime("5 seconds")) + .start(path) + outputDf.awaitTermination(20000) + + val outputDf3 = DeltaReader(path) + .read(sqlCtx.sparkSession, EngineMode.Batch) + assertDataFrameNoOrderEquals(expectedDf, outputDf3) + } + + test("Read table with historical and no checkpoint") { + val sqlCtx = sqlContext + + val directoryCheckpoint = new Directory(new File(pathCheckpoint)) + directoryCheckpoint.deleteRecursively() + + val expectedData = Seq( + Row("A", "a", 2022, 2, 5, "2022-02-05"), + Row("A", "a", 2022, 2, 5, "2022-02-05"), + Row("B", "b", 2022, 2, 6, "2022-02-06"), + Row("B", "b", 2022, 2, 6, "2022-02-06"), + Row("C", "c", 2022, 2, 7, "2022-02-07"), + Row("C", "c", 2022, 2, 7, "2022-02-07"), + ) + val expectedDf = spark.createDataFrame( + spark.sparkContext.parallelize(expectedData), + StructType(someSchema) + ) + + val outputDf = spark.readStream + .option("startingTimestamp", "2000-01-01") + .delta(delta_source_path) + .writeStream + .format("delta") + .outputMode("append") + .option("mergeSchema", "true") + .option("checkpointLocation", pathCheckpoint) + .trigger(Trigger.ProcessingTime("5 seconds")) + .start(path) + outputDf.awaitTermination(20000) + + val outputDf3 = DeltaReader(path) + .read(sqlCtx.sparkSession, EngineMode.Batch) + assertDataFrameNoOrderEquals(expectedDf, outputDf3) + } +} diff --git a/src/test/scala/com/metabolic/data/core/services/spark/writer/DeltaWriterTest.scala b/src/test/scala/com/metabolic/data/core/services/spark/writer/DeltaWriterTest.scala index b86bc5cb..920a3ce1 100644 --- a/src/test/scala/com/metabolic/data/core/services/spark/writer/DeltaWriterTest.scala +++ b/src/test/scala/com/metabolic/data/core/services/spark/writer/DeltaWriterTest.scala @@ -129,7 +129,7 @@ class DeltaWriterTest extends AnyFunSuite StructType(someSchema) ) - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path,false,"","") .read(sqlCtx.sparkSession, EngineMode.Batch) outputDf.show(20, false) @@ -203,7 +203,7 @@ class DeltaWriterTest extends AnyFunSuite StructType(someSchema) ) - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path, false, "","") .read(sqlCtx.sparkSession,EngineMode.Batch) outputDf.show(20, false) @@ -275,7 +275,7 @@ class DeltaWriterTest extends AnyFunSuite StructType(someSchema) ) - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path,false,"","") .read(sqlCtx.sparkSession, EngineMode.Batch) outputDf.show(20, false) @@ -352,7 +352,7 @@ class DeltaWriterTest extends AnyFunSuite StructType(someSchema) ) - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path,false,"","") .read(sqlCtx.sparkSession, EngineMode.Batch) outputDf.show(20, false) @@ -397,7 +397,7 @@ class DeltaWriterTest extends AnyFunSuite .save(path) */ - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path,false,"","") .read(sqlCtx.sparkSession, EngineMode.Batch) println("SCHEMA: " + outputDf.schema.json) @@ -420,7 +420,7 @@ class DeltaWriterTest extends AnyFunSuite firstWriter.write(inputDF, EngineMode.Batch) eventually(timeout(Span(5, Seconds))) { - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path,false,"","") .read(sqlCtx.sparkSession, EngineMode.Batch) assertDataFrameNoOrderEquals(inputDF, outputDf) } @@ -520,7 +520,7 @@ class DeltaWriterTest extends AnyFunSuite ) eventually(timeout(Span(5, Seconds))) { - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path,false,"","") .read(sqlCtx.sparkSession, EngineMode.Batch) assertDataFrameNoOrderEquals(inputDF, outputDf) } @@ -646,7 +646,7 @@ class DeltaWriterTest extends AnyFunSuite ) eventually(timeout(Span(30, Seconds))) { - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path,false,"","") .read(sqlCtx.sparkSession, EngineMode.Batch) assertDataFrameNoOrderEquals(inputDF, outputDf) } @@ -766,7 +766,7 @@ class DeltaWriterTest extends AnyFunSuite ) eventually(timeout(Span(30, Seconds))) { - val outputDf = new DeltaReader(path) + val outputDf = new DeltaReader(path,false,"","") .read(sqlCtx.sparkSession, EngineMode.Batch) assertDataFrameNoOrderEquals(inputDF, outputDf) } diff --git a/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala b/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala index b3031eac..299c163d 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/MetabolicReaderIT.scala @@ -78,7 +78,7 @@ class MetabolicReaderIT extends AnyFunSuite val source = getFileSource(inputPath, tableName, IOFormat.PARQUET.toString).head - MetabolicReader.read(source, true, EngineMode.Batch)(spark) + MetabolicReader.read(source, true, EngineMode.Batch, "")(spark) val result = spark.table(tableName) @@ -103,7 +103,7 @@ class MetabolicReaderIT extends AnyFunSuite val source = getFileSource(inputPath, tableName, IOFormat.JSON.toString).head - MetabolicReader.read(source, true, EngineMode.Batch)(spark) + MetabolicReader.read(source, true, EngineMode.Batch, "")(spark) val result = spark.table(tableName) @@ -129,7 +129,7 @@ class MetabolicReaderIT extends AnyFunSuite val source = getFileSource(inputPath, tableName, IOFormat.CSV.toString).head - MetabolicReader.read(source, true, EngineMode.Batch)(spark) + MetabolicReader.read(source, true, EngineMode.Batch, "")(spark) val result = spark.table(tableName) @@ -153,7 +153,7 @@ class MetabolicReaderIT extends AnyFunSuite val source = getFileSource(inputPath, tableName, IOFormat.DELTA.toString).head - MetabolicReader.read(source, true, EngineMode.Batch)(spark) + MetabolicReader.read(source, true, EngineMode.Batch, "")(spark) val result = spark.table(tableName) diff --git a/src/test/scala/com/metabolic/data/mapper/app/MetabolicWriterIT.scala b/src/test/scala/com/metabolic/data/mapper/app/MetabolicWriterIT.scala index 5442a976..d1722373 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/MetabolicWriterIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/MetabolicWriterIT.scala @@ -12,7 +12,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SaveMode} -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll} import org.scalatest.concurrent.Eventually.eventually import org.scalatest.concurrent.Futures.timeout import org.scalatest.funsuite.AnyFunSuite diff --git a/src/test/scala/com/metabolic/data/mapper/app/ProdConfigsMetabolicAppIT.scala b/src/test/scala/com/metabolic/data/mapper/app/ProdConfigsMetabolicAppIT.scala index 43ca7cd0..d75421f5 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/ProdConfigsMetabolicAppIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/ProdConfigsMetabolicAppIT.scala @@ -36,7 +36,7 @@ class ProdConfigsMetabolicAppIT extends AnyFunSuite .parseConfig(config) parsedConfig.head.sources.foreach { source => - MetabolicReader.read(source, true, EngineMode.Batch)(spark) + MetabolicReader.read(source, true, EngineMode.Batch, "")(spark) }