Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/read_delta_from_timestamp #4

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,56 @@ package com.metabolic.data.core.services.spark.reader.file
import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader
import org.apache.spark.sql.{DataFrame, SparkSession}

class DeltaReader(val input_identifier: String) extends DataframeUnifiedReader {
import scala.reflect.io.Directory
import java.io.File

class DeltaReader(val input_identifier: String, historical: Boolean, startTimestamp: String, checkpointPath: String) extends DataframeUnifiedReader {

import io.delta.implicits._

override def readBatch(spark: SparkSession): DataFrame = {

spark.read
val sr = spark.read
val osr = historical match {
case true => {
val directoryPath = new Directory(new File(checkpointPath))
directoryPath.deleteRecursively()
sr
}
case false => startTimestamp match {
case "" => sr
case _ => sr.option("timestampAsOf", startTimestamp)
}
}
osr
.delta(input_identifier)

}

override def readStream(spark: SparkSession): DataFrame = {

spark.readStream
.delta(input_identifier)

val sr = spark.readStream
val osr = historical match {
case true => {
val directoryPath = new Directory(new File(checkpointPath))
directoryPath.deleteRecursively()
sr.option("startingTimestamp", "2000-01-01")
}
case false => startTimestamp match {
case "" => sr
case _ => sr.option("startingTimestamp", startTimestamp)}
}

osr
.delta(input_identifier)
}

}

object DeltaReader {
def apply(input_identifier: String) = new DeltaReader(input_identifier)
def apply(input_identifier: String) = new DeltaReader(input_identifier, false, "", "")
def apply(input_identifier: String, historical: Boolean, checkpointPath: String) = new DeltaReader(input_identifier, historical, "", checkpointPath)
def apply(input_identifier: String, startTimestamp: String) = new DeltaReader(input_identifier, false, startTimestamp, "")


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import org.apache.spark.sql.functions.{col, schema_of_json}
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession}
import scala.reflect.io.Directory
import java.io.File

class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, topic: String)
class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, topic: String, historical: Boolean, startTimestamp: String, checkpointPath: String)
extends DataframeUnifiedReader {

override val input_identifier: String = topic
Expand Down Expand Up @@ -58,15 +60,31 @@ class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, t

def readStream(spark: SparkSession): DataFrame = {

if(historical){
val directoryPath = new Directory(new File(checkpointPath))
directoryPath.deleteRecursively()
}
val plain = spark
.readStream

startTimestamp match {
case "" => plain
.format("kafka")
.option("kafka.bootstrap.servers", servers.mkString(","))
.option("subscribe", topic)
.option("kafka.session.timeout.ms", 45000)
.option("kafka.client.dns.lookup","use_all_dns_ips")
.option("startingOffsets", "latest")
.option("startingOffsets", if (historical) "earliest" else "latest")
.option("failOnDataLoss", false)
case _ => plain
.format("kafka")
.option("kafka.bootstrap.servers", servers.mkString(","))
.option("subscribe", topic)
.option("kafka.session.timeout.ms", 45000)
.option("kafka.client.dns.lookup", "use_all_dns_ips")
.option("startingTimestamp", startTimestamp)
.option("failOnDataLoss", false)
}


val input = setStreamAuthentication(plain)
Expand Down Expand Up @@ -96,5 +114,8 @@ class KafkaReader(val servers: Seq[String], apiKey: String, apiSecret: String, t
}

object KafkaReader {
def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String) = new KafkaReader(servers, apiKey, apiSecret, topic)
def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String) = new KafkaReader(servers, apiKey, apiSecret, topic, false, "", "")
def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String, historical:Boolean, checkpointPath:String) = new KafkaReader(servers, apiKey, apiSecret, topic, historical, "", checkpointPath)
def apply(servers: Seq[String], apiKey: String, apiSecret: String, topic: String, startTimestamp: String) = new KafkaReader(servers, apiKey, apiSecret, topic, false, startTimestamp, "")

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.metabolic.data.core.services.spark.writer

import com.metabolic.data.mapper.domain.io.EngineMode
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import io.delta.tables.DeltaTable
import org.apache.logging.log4j.scala.Logging
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SaveMode}
Expand All @@ -21,8 +22,14 @@ trait DataframeUnifiedWriter extends Logging {

def postHook(df: DataFrame, streamingQuery: Option[StreamingQuery] ): Boolean = {

// if (DeltaTable.isDeltaTable(output_identifier)){
// val deltaTable = DeltaTable.forPath(output_identifier)
//
// logger.info("optimize with z order")
// deltaTable.optimize().executeZOrderBy("column")
//
// }
streamingQuery.flatMap(stream => Option.apply(stream.awaitTermination()))

true

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class DeltaWriter(val outputPath: String, val saveMode: SaveMode,
.option("mergeSchema", "true")
.option("checkpointLocation", checkpointLocation)
.start(output_identifier)

}
case SaveMode.Overwrite => {
df.writeStream
Expand Down Expand Up @@ -99,28 +98,10 @@ class DeltaWriter(val outputPath: String, val saveMode: SaveMode,

override def preHook(df: DataFrame): DataFrame = {

val tableName: String = ConfigUtilsService.getTablePrefix(namespaces, output_identifier)+ConfigUtilsService.getTableNameFileSink(output_identifier)

val prefix = ConfigUtilsService.getTablePrefix(namespaces, output_identifier)
val tableName: String = prefix + ConfigUtilsService.getTableNameFileSink(output_identifier)
if (!DeltaTable.isDeltaTable(outputPath)) {
if (!File(outputPath).exists) {
//In this way a table is created which can be read but the schema is not visible.
/*
//Check if the database has location
new GlueCatalogService()
.checkDatabase(dbName, ConfigUtilsService.getDataBaseName(outputPath))
//Create the delta table
val deltaTable = DeltaTable.createIfNotExists()
.tableName(dbName + "." + tableName)
.location(output_identifier)
.addColumns(df.schema)
.partitionedBy(partitionColumnNames: _*)
.execute()
deltaTable.toDF.write.format("delta").mode(SaveMode.Append).save(output_identifier)
//Create table in Athena (to see the columns)
new AthenaCatalogueService()
.createDeltaTable(dbName, tableName, output_identifier, true)*/

//Redo if the other way works for us
// create an empty RDD with original schema
val emptyRDD = spark.sparkContext.emptyRDD[Row]
val emptyDF = spark.createDataFrame(emptyRDD, df.schema)
Expand All @@ -132,6 +113,9 @@ class DeltaWriter(val outputPath: String, val saveMode: SaveMode,
//Create table in Athena
new AthenaCatalogueService()
.createDeltaTable(dbName, tableName, output_identifier)
//Create table in Athena separate schema
new AthenaCatalogueService()
.createDeltaTable(dbName + "_" + prefix.dropRight(1), tableName, output_identifier)

} else {
//Convert to delta if parquet
Expand All @@ -145,6 +129,8 @@ class DeltaWriter(val outputPath: String, val saveMode: SaveMode,
override def postHook(df: DataFrame, query: Option[StreamingQuery]): Boolean = {
//Not for current version
//deltaTable.optimize().executeCompaction()
//deltaTable.optimize().executeZOrderBy("column")

query.flatMap(stream => Option.apply(stream.awaitTermination()))
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ class DeltaPartitionWriter(val partitionColumnNames: Seq[String],
override def preHook(df: DataFrame): DataFrame = {

if (partitionColumnNames.size > 0) {
val tableName: String = ConfigUtilsService.getTablePrefix(namespaces, output_identifier)+ConfigUtilsService.getTableNameFileSink(output_identifier)
val prefix = ConfigUtilsService.getTablePrefix(namespaces, output_identifier)
val tableName: String = prefix+ConfigUtilsService.getTableNameFileSink(output_identifier)

if (!DeltaTable.isDeltaTable(outputPath)) {
if (!File(outputPath).exists) {
Expand Down Expand Up @@ -120,7 +121,9 @@ class DeltaPartitionWriter(val partitionColumnNames: Seq[String],
//Create table in Athena
new AthenaCatalogueService()
.createDeltaTable(dbName, tableName, output_identifier)

//Create table in Athena separate schema
new AthenaCatalogueService()
.createDeltaTable(dbName+"_"+ prefix.dropRight(1), tableName, output_identifier)
} else {
//Convert to delta if parquet
DeltaTable.convertToDelta(spark, s"parquet.`$outputPath`")
Expand All @@ -136,8 +139,14 @@ class DeltaPartitionWriter(val partitionColumnNames: Seq[String],
}

override def postHook(df: DataFrame, query: Option[StreamingQuery]): Boolean = {
//Not for current version
//deltaTable.optimize().executeCompaction()

// if (DeltaTable.isDeltaTable(output_identifier)) {
// val deltaTable = DeltaTable.forPath(output_identifier)
// deltaTable.optimize().executeCompaction()
//
// logger.info("optimize with z order")
// deltaTable.optimize().executeZOrderBy(column_name)
// }
query.flatMap(stream => Option.apply(stream.awaitTermination()))

true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {

def transform(mapping: Config)(implicit spark: SparkSession, region: Regions): Unit = {

val checkpointPath = mapping.environment.baseCheckpointLocation + "/checkpoints/" + mapping.sink.name
.toLowerCase()
.replaceAll("\\W", "_")

mapping.sources.foreach { source =>
MetabolicReader.read(source, mapping.environment.historical, mapping.environment.mode)
MetabolicReader.read(source, mapping.environment.historical, mapping.environment.mode, checkpointPath)
}

mapping.mappings.foreach { mapping =>
Expand All @@ -100,8 +104,9 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {

val output: DataFrame = spark.table("output")


MetabolicWriter.write(output, mapping.sink, mapping.environment.historical, mapping.environment.autoSchema,
mapping.environment.baseCheckpointLocation, mapping.environment.mode, mapping.environment.namespaces)
checkpointPath, mapping.environment.mode, mapping.environment.namespaces)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ import org.apache.spark.sql.{DataFrame, SparkSession}

object MetabolicReader extends Logging {

def read(source: Source, historical: Boolean, mode: EngineMode)(implicit spark: SparkSession) = {
def read(source: Source, historical: Boolean, mode: EngineMode, checkpointPath: String)(implicit spark: SparkSession) = {

val input = readSource(source, mode, spark)
val input = readSource(source, historical, mode, spark, checkpointPath)

val prepared = prepareSource(source, historical, input)

prepared.createOrReplaceTempView(source.name)

}

private def readSource(source: Source, mode: EngineMode, spark: SparkSession) = {
private def readSource(source: Source, historical: Boolean, mode: EngineMode, spark: SparkSession, checkpointPath: String) = {
source match {

case streamSource: StreamSource => {
logger.info(s"Reading stream source ${streamSource.name} from ${streamSource.topic}")

streamSource.format match {
case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic)
case IOFormat.KAFKA => new KafkaReader(streamSource.servers, streamSource.key, streamSource.secret, streamSource.topic, historical, streamSource.startTimestamp, checkpointPath)
.read(spark, mode)
}
}
Expand All @@ -50,7 +50,7 @@ object MetabolicReader extends Logging {
new ParquetReader(fileSource.inputPath)
.read(spark, mode)
case IOFormat.DELTA =>
new DeltaReader(fileSource.inputPath)
new DeltaReader(fileSource.inputPath, historical, fileSource.startTimestamp, checkpointPath)
.read(spark, mode)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,11 @@ object MetabolicWriter extends Logging {
}
}

def write(df: DataFrame, sink: Sink, historical: Boolean, autoSchema: Boolean, baseCheckpointLocation: String, mode: EngineMode, namespaces: Seq[String])
def write(df: DataFrame, sink: Sink, historical: Boolean, autoSchema: Boolean, checkpointPath: String, mode: EngineMode, namespaces: Seq[String])
(implicit spark: SparkSession, region: Regions) = {

val _df = prepareOutput(sink, df)

val checkpointPath = baseCheckpointLocation + "/checkpoints/" + sink.name
.toLowerCase()
.replaceAll("\\W", "_")

sink match {

case streamSink: StreamSink => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ case class FileSource(inputPath: String,
name: String,
format: IOFormat = DELTA,
useStringPrimitives: Boolean = false,
ops: Seq[SourceOp] = Seq.empty
ops: Seq[SourceOp] = Seq.empty,
startTimestamp: String = ""
)
extends Source
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ case class StreamSource(name: String,
secret: String,
topic: String,
format: IOFormat = KAFKA,
ops: Seq[SourceOp] = Seq.empty)
ops: Seq[SourceOp] = Seq.empty,
startTimestamp: String)
extends Source
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ case class SourceConfigParserService()(implicit val region: Regions) extends Log
val name = source.getString("name")

val ops = checkOps(source)

SourceFormatParser().parse(name, source, ops)

}
Expand Down
Loading