Skip to content

Commit

Permalink
Merge pull request #47 from metabolicdata/feature/icerberg
Browse files Browse the repository at this point in the history
WIP Iceberg integration
  • Loading branch information
braislchao authored Nov 12, 2024
2 parents 276ea86 + 3897d9c commit a7b7c12
Show file tree
Hide file tree
Showing 25 changed files with 1,285 additions and 135 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "3.3.2",

"io.delta" %% "delta-core" % "2.3.0",
"org.apache.iceberg" %% "iceberg-spark-runtime-3.3" % "1.6.1",
"org.apache.iceberg" % "iceberg-aws-bundle" % "1.6.1",


"org.apache.logging.log4j" %% "log4j-api-scala" % "12.0",
"com.typesafe" % "config" % "1.4.0",
Expand Down Expand Up @@ -71,7 +74,7 @@ libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.49" % "test"

libraryDependencies += "org.mockito" % "mockito-core" % "2.21.0" % "test"

Test / fork := true
//Test / fork := true
Test / coverageEnabled := true
Test / parallelExecution := false

Expand Down
3 changes: 2 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.6")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.6")
addDependencyTreePlugin
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import scala.collection.mutable




class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: String) extends Logging {

val versionRegex = """version=(\d)+/""".r
Expand All @@ -30,7 +27,7 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
def setMetadata(mapping: Config): String = {
val qualifiedName = getQualifiedNameOutput(mapping)
val guid = getGUI(qualifiedName).stripPrefix("\"").stripSuffix("\"")
guid match{
guid match {
case "" => ""
case _ => {
val body: String = generateMetadaBody(mapping)
Expand Down Expand Up @@ -73,19 +70,19 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str

private def generateDescriptionBodyJson(outputTable: String, qualifiedName: String, typeName: String): String = {
s"""
|{
| "entities": [
| {
| "typeName": "$typeName",
| "attributes": {
| "name": "$outputTable",
| "qualifiedName": "$qualifiedName",
| "description": ""
| }
| }
| ]
|}
|""".stripMargin
|{
| "entities": [
| {
| "typeName": "$typeName",
| "attributes": {
| "name": "$outputTable",
| "qualifiedName": "$qualifiedName",
| "description": ""
| }
| }
| ]
|}
|""".stripMargin
}

def generateBodyJson(mapping: Config): String = {
Expand All @@ -100,7 +97,7 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
val qualifiedNameOutput = getQualifiedNameOutput(mapping)
val qualifiedNameInputs = getQualifiedNameInputs(mapping)

val inputsJson = qualifiedNameInputs.map { case (sourceType, qualifiedName) =>
val inputsJson = qualifiedNameInputs.map { case (sourceType, qualifiedName) =>
s"""
| {
| "typeName": "${sourceType}",
Expand Down Expand Up @@ -141,8 +138,6 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str

}



private def md5Hash(pwd: String): String = {
val digest: Array[Byte] = MessageDigest.getInstance("MD5").digest(pwd.getBytes)
val bigInt = new BigInteger(1, digest).toString(16).trim
Expand All @@ -155,7 +150,7 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
val infix_namespaces = options.infix_namespaces
val tables = mutable.MutableList[String]()
config.sources.foreach { source =>
tables += getSourceTableName(source, prefix_namespaces,infix_namespaces, options.dbName)
tables += getSourceTableName(source, prefix_namespaces, infix_namespaces, options.dbName)
}
tables.filter(p => p != "")
}
Expand All @@ -171,39 +166,39 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
}
}

private def getSourceTableName(source: Source, prefix_namespaces:Seq[String],infix_namespaces:Seq[String], dbName: String): String = {
private def getSourceTableName(source: Source, prefix_namespaces: Seq[String], infix_namespaces: Seq[String], dbName: String): String = {

source match {

case streamSource: StreamSource => streamSource.topic

case fileSource: FileSource => {

val s3Path = versionRegex.replaceAllIn(fileSource.inputPath, "")
val prefix = ConfigUtilsService.getTablePrefix(prefix_namespaces, s3Path)
val infix = ConfigUtilsService.getTableInfix(infix_namespaces, s3Path)
val tableName = ConfigUtilsService.getTableNameFileSink(s3Path)
dbName + "/" + prefix + infix + tableName
}
case meta: MetastoreSource => meta.fqn.replace(".", "/")

case meta: TableSource => meta.fqn.replace(".", "/")
}
}

private def getQualifiedNameInput(source: Source, prefix_namespaces:Seq[String],infix_namespaces:Seq[String], dbName: String): String = {
private def getQualifiedNameInput(source: Source, prefix_namespaces: Seq[String], infix_namespaces: Seq[String], dbName: String): String = {

source match {

case streamSource: StreamSource => baseUrlConfluent + streamSource.topic

case fileSource: FileSource => {

val s3Path = versionRegex.replaceAllIn(fileSource.inputPath, "")
val prefix = ConfigUtilsService.getTablePrefix(prefix_namespaces, s3Path)
val infix = ConfigUtilsService.getTableInfix(infix_namespaces, s3Path)
val tableName = ConfigUtilsService.getTableNameFileSink(s3Path)
baseUrlDataLake + dbName + "/" + prefix + infix + tableName
}
case meta: MetastoreSource => baseUrlDataLake + meta.fqn.replace(".", "/")

case meta: TableSource => baseUrlDataLake + meta.fqn.replace(".", "/")
}
}

Expand All @@ -213,18 +208,24 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
case _: StreamSink =>
baseUrlConfluent + md5Hash(name)
case _: FileSink =>
baseUrlDataLake + md5Hash(name) }
baseUrlDataLake + md5Hash(name)
case _: TableSink =>
baseUrlDataLake + md5Hash(name)
}

}

private def getConnectorNameProcess(mapping: Config): String = mapping.sink match {
case _: StreamSink => "confluent-kafka"
case _: FileSink => "athena"
case _: TableSink => "athena"

}

private def getConnectionNameProcess(mapping: Config): String = mapping.sink match {
case _: StreamSink => "production"
case _: FileSink => "athena"
case _: TableSink => "athena"
}

private def getConnectionQualifiedNameProcess(mapping: Config): String = {
Expand All @@ -242,7 +243,7 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
mapping.sink match {
case fileSink: FileSink => {
val s3Path = versionRegex.replaceAllIn(fileSink.path, "")
val prefix =ConfigUtilsService.getTablePrefix(mapping.environment.namespaces, s3Path)
val prefix = ConfigUtilsService.getTablePrefix(mapping.environment.namespaces, s3Path)
val infix = ConfigUtilsService.getTableInfix(mapping.environment.infix_namespaces, s3Path)
val tableName = ConfigUtilsService.getTableName(mapping)
val dbName = mapping.environment.dbName
Expand All @@ -251,7 +252,9 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
case streamSink: StreamSink => {
ConfigUtilsService.getTableName(mapping)
}
case metastoreSource: MetastoreSource => metastoreSource.fqn.replace(".", "/")
case tableSink: TableSink => {
tableSink.catalog.replace(".", "/")
}
}

}
Expand All @@ -263,6 +266,8 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
"KafkaTopic"
case _: FileSink =>
"Table"
case _: TableSink =>
"Table"
}
}

Expand All @@ -273,7 +278,7 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
"KafkaTopic"
case _: FileSource =>
"Table"
case _: MetastoreSource =>
case _: TableSource =>
"Table"
}
}
Expand All @@ -292,12 +297,11 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str
}
}
}
catch
{
case e: Exception =>
println(s"An error occurred: ${e.getMessage}")
""
}
catch {
case e: Exception =>
println(s"An error occurred: ${e.getMessage}")
""
}
}

private def isValidJson(jsonString: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.metabolic.data.core.services.spark.reader.table

import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader
import org.apache.logging.log4j.scala.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}

class GenericReader(fqn: String) extends DataframeUnifiedReader with Logging{

override val input_identifier: String = fqn

override def readBatch(spark: SparkSession): DataFrame = {
//Generic for Delta Lake and Iceberg tables using fqn
spark.table(input_identifier)
}

override def readStream(spark: SparkSession): DataFrame = {
//Generic for Delta Lake and Iceberg tables using fqn
spark.readStream.table(input_identifier)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random

@Deprecated
class TableReader(fqn : String, enableJDBC: Boolean, queryOutputLocation: String) extends DataframeUnifiedReader {

override val input_identifier: String = fqn
Expand All @@ -19,6 +20,7 @@ class TableReader(fqn : String, enableJDBC: Boolean, queryOutputLocation: String
.option("S3OutputLocation", s"${queryOutputLocation}/${input_identifier}-${Random.nextInt(100000)}")
.load()
}else {
// Read table from catalog
spark.read
.table(input_identifier)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.metabolic.data.core.services.spark.writer.file

import com.metabolic.data.core.services.spark.writer.DataframeUnifiedWriter
import com.metabolic.data.mapper.domain.io.WriteMode
import com.metabolic.data.mapper.domain.io.WriteMode.WriteMode
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}

import java.util.concurrent.TimeUnit

class IcebergWriter(
val fqn: String,
val writeMode: WriteMode,
val checkpointLocation: String)
(implicit val spark: SparkSession)
extends DataframeUnifiedWriter {

override val output_identifier: String = fqn

override def writeBatch(df: DataFrame): Unit = {

writeMode match {
case WriteMode.Append =>
try {
df.writeTo(output_identifier).using("iceberg").create()
}catch {
case e: AnalysisException =>
logger.warn("Create table failed: " + e)
df.writeTo(output_identifier).append()
}

case WriteMode.Overwrite =>
try {
df.writeTo(output_identifier).using("iceberg").create()
}catch {
case e: AnalysisException =>
logger.warn("Create table failed: " + e)
df.writeTo(output_identifier).using("iceberg").replace()
}

case WriteMode.Upsert =>
try {
df.writeTo(output_identifier).using("iceberg").create()
}catch {
case e: AnalysisException =>
logger.warn("Create table failed: " + e)
df.writeTo(output_identifier).overwritePartitions()
}


case WriteMode.Delete =>
throw new NotImplementedError("Delete is not supported in Iceberg yet")

case WriteMode.Update =>
throw new NotImplementedError("Update is not supported in Iceberg yet")

}
}

override def writeStream(df: DataFrame): StreamingQuery = {

writeMode match {
case WriteMode.Append =>
df
.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("checkpointLocation", checkpointLocation)
.toTable(output_identifier)

case WriteMode.Complete =>
df
.writeStream
.format("iceberg")
.outputMode("complete")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("checkpointLocation", checkpointLocation)
.toTable(output_identifier)

}
}

//TODO: do we need to do any specific post write operations in Iceberg?
//
// override def postHook(df: DataFrame, query: Seq[StreamingQuery]): Unit = {
//
// if (query.isEmpty) {
// spark.sql(s"CALL local.system.rewrite_data_files('$output_identifier')")
// }
// }

}
18 changes: 16 additions & 2 deletions src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {
def run(configPath: String, params: Map[String, String]): Unit = {

implicit val region = Regions.fromName(params("dp.region"))
val client_region = params("dp.region")
val warehouse_global = params("dp.warehouse_global")
val warehouse_environment = params("dp.warehouse_environment")


val rawConfig = new ConfigReaderService()
.getConfig(configPath, params)
Expand All @@ -33,11 +37,21 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {
implicit val spark = sparkBuilder
.appName(s" Metabolic Mapper - $configPath")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
.config("spark.databricks.delta.optimize.repartition.enabled", "true")
.config("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")
.config("spark.sql.catalog.prod", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.prod.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.prod.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.prod.client.region", s"$client_region")
.config("spark.sql.catalog.prod.warehouse", s"$warehouse_global")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.spark_catalog.client.region", s"$client_region")
.config("spark.sql.catalog.spark_catalog.warehouse", s"$warehouse_environment")
.config("spark.sql.defaultCatalog", "spark_catalog")
.getOrCreate()

params.get("configJar") match {
Expand Down
Loading

0 comments on commit a7b7c12

Please sign in to comment.