Skip to content

Commit

Permalink
Refacted Config into Metadata object
Browse files Browse the repository at this point in the history
  • Loading branch information
margon8 committed Dec 11, 2024
1 parent f0d6d1a commit 01fe6f8
Show file tree
Hide file tree
Showing 39 changed files with 312 additions and 263 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.metabolic.data.core.domain

import com.amazonaws.regions.Regions
import com.metabolic.data.mapper.domain.io.EngineMode
import com.metabolic.data.mapper.domain.run.EngineMode
import com.typesafe.config.ConfigFactory

abstract class CoreConfig(val defaults: Defaults = Defaults(ConfigFactory.load()),
abstract class CoreConfig(
val environment: Environment = Environment("", EngineMode.Batch, "", false, "","",
Regions.fromName("eu-central-1"), Option.empty, Option.empty, Option.empty))
15 changes: 0 additions & 15 deletions src/main/scala/com/metabolic/data/core/domain/Defaults.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.metabolic.data.core.domain
import com.amazonaws.regions.Regions
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.run.EngineMode.EngineMode

case class Environment(name: String,
mode: EngineMode,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.metabolic.data.core.services.athena

import com.metabolic.data.core.services.util.ConfigUtilsService
import com.metabolic.data.mapper.domain.Config
import com.metabolic.data.mapper.domain.config.Config
import com.metabolic.data.mapper.domain.io.{FileSink, IOFormat}
import com.metabolic.data.mapper.services.AfterAction
import org.apache.logging.log4j.scala.Logging
Expand All @@ -13,7 +13,7 @@ class AthenaAction extends AfterAction with Logging {
def run(config: Config): Unit = {
logger.info(f"Running After Action $name")

val options = config.environment
val options = config.metadata.environment

val region = options.region
val dbName = options.dbName
Expand All @@ -24,7 +24,7 @@ class AthenaAction extends AfterAction with Logging {
case sink: FileSink =>
sink.format match {
case IOFormat.DELTA =>
logger.info(f"After Action $name: Creating Delta Table for ${config.name}")
logger.info(f"After Action $name: Creating Delta Table for ${config.metadata.name}")
val s3Path = sink.path.replaceAll("version=\\d+", "")
val prefix =
ConfigUtilsService.getTablePrefix(options.namespaces, s3Path)
Expand All @@ -33,10 +33,10 @@ class AthenaAction extends AfterAction with Logging {
athena.createDeltaTable(dbName, tableName, s3Path)

case _ =>
logger.warn(f"After Action: Skipping $name for ${config.name} as it is not a DeltaSink")
logger.warn(f"After Action: Skipping $name for ${config.metadata.name} as it is not a DeltaSink")
}
case _ =>
logger.warn(f"After Action: Skipping $name for ${config.name} as it is not a FileSink")
logger.warn(f"After Action: Skipping $name for ${config.metadata.name} as it is not a FileSink")
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.metabolic.data.core.services.catalogue

import com.metabolic.data.mapper.domain.Config
import com.metabolic.data.mapper.domain.config.Config
import com.metabolic.data.mapper.domain.io.FileSink
import com.metabolic.data.mapper.services.AfterAction
import org.apache.logging.log4j.scala.Logging
Expand All @@ -11,21 +11,23 @@ class AtlanCatalogueAction extends AfterAction with Logging {

def run(config: Config): Unit = {

config.environment.atlanToken match {
val environment = config.metadata.environment

environment.atlanToken match {
case Some(token) =>
(config.environment.atlanBaseUrlDataLake, config.environment.atlanBaseUrlConfluent) match {
(environment.atlanBaseUrlDataLake, environment.atlanBaseUrlConfluent) match {
case (Some(_), Some(_)) =>
val atlan = new AtlanService(token, config.environment.atlanBaseUrlDataLake.get, config.environment.atlanBaseUrlConfluent.get)
val atlan = new AtlanService(token, environment.atlanBaseUrlDataLake.get, environment.atlanBaseUrlConfluent.get)
atlan.setLineage(config)
atlan.setOwner(config)
atlan.setMetadata(config)
atlan.setResource(config)
logger.info(s"After Action $name: Pushed lineage generated in ${config.name} to Atlan")
logger.info(s"After Action $name: Pushed lineage generated in ${config.metadata.name} to Atlan")
case _ =>
logger.warn(s"After Action: Skipping $name for ${config.name} as Atlan Url is not provided")
logger.warn(s"After Action: Skipping $name for ${config.metadata.name} as Atlan Url is not provided")
}
case None =>
logger.warn(s"After Action: Skipping $name for ${config.name} as Atlan Token is not provided")
logger.warn(s"After Action: Skipping $name for ${config.metadata.name} as Atlan Token is not provided")
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.metabolic.data.core.services.catalogue

import com.metabolic.data.core.services.util.ConfigUtilsService
import com.metabolic.data.mapper.domain.Config
import com.metabolic.data.mapper.domain.config.Config
import com.metabolic.data.mapper.domain.io._
import com.metabolic.data.mapper.domain.ops.SQLMapping
import org.apache.hadoop.shaded.com.google.gson.JsonParseException
Expand Down Expand Up @@ -99,7 +99,7 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str

def generateOwnerBody(mapping: Config): String = {

val ownerString = mapping.owner
val ownerString = mapping.metadata.owner

val body = {
s"""
Expand All @@ -113,8 +113,8 @@ class AtlanService(token: String, baseUrlDataLake: String, baseUrlConfluent: Str

def generateResourceBody(mapping: Config): String = {

val urlSQL = mapping.sqlUrl
val urlConf = mapping.confUrl
val urlSQL = mapping.metadata.sqlUrl
val urlConf = mapping.metadata.confUrl

val body = {
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.metabolic.data.core.services.glue

import com.metabolic.data.core.domain.Environment
import com.metabolic.data.core.services.util.ConfigUtilsService
import com.metabolic.data.mapper.domain.Config
import com.metabolic.data.mapper.domain.config.Config
import com.metabolic.data.mapper.domain.io.{FileSink, IOFormat}
import com.metabolic.data.mapper.services.AfterAction
import org.apache.logging.log4j.scala.Logging
Expand All @@ -14,10 +14,10 @@ class GlueCrawlerAction extends AfterAction with Logging {

override def run(config: Config): Unit = {

val options = config.environment
val options = config.metadata.environment

val region = options.region
val crawlerName = s"${options.name} EM ${config.name}"
val crawlerName = s"${options.name} EM ${config.metadata.name}"

val dbName = options.dbName
val iamRole = options.iamRole
Expand All @@ -34,16 +34,16 @@ class GlueCrawlerAction extends AfterAction with Logging {
case com.metabolic.data.mapper.domain.io.IOFormat.JSON =>
runCrawler(config, options, crawlerName, dbName, iamRole, glue, sink)
case com.metabolic.data.mapper.domain.io.IOFormat.DELTA =>
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.name} for DeltaSink")
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.metadata.name} for DeltaSink")
case com.metabolic.data.mapper.domain.io.IOFormat.DELTA_PARTITION =>
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.name} for DeltaPartitionSink")
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.metadata.name} for DeltaPartitionSink")
case com.metabolic.data.mapper.domain.io.IOFormat.KAFKA =>
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.name} for KafkaSink")
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.metadata.name} for KafkaSink")
case com.metabolic.data.mapper.domain.io.IOFormat.TABLE =>
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.name} for DeltaSink")
logger.warn(f"After Action $name: Skipping Glue Crawler for ${config.metadata.name} for DeltaSink")
}
case _ =>
logger.warn(f"After Action: Skipping $crawlerName for ${config.name} as it is not a FileSink")
logger.warn(f"After Action: Skipping $crawlerName for ${config.metadata.name} as it is not a FileSink")
}

}
Expand All @@ -52,7 +52,7 @@ class GlueCrawlerAction extends AfterAction with Logging {
val s3Path = sink.path.replaceAll("version=\\d+", "")
val prefix = ConfigUtilsService.getTablePrefix(options.namespaces, s3Path)

logger.info(f"After Action $name: Running Glue Crawler for ${config.name}")
logger.info(f"After Action $name: Running Glue Crawler for ${config.metadata.name}")

glue.createAndRunCrawler(iamRole, Seq(s3Path), dbName, crawlerName, prefix)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.metabolic.data.core.services.spark.append

import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.run.EngineMode.EngineMode
import org.apache.spark.sql.{DataFrame, SparkSession}

object Appender {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.metabolic.data.core.services.spark.reader

import com.metabolic.data.mapper.domain.io.EngineMode
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.run.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.run.EngineMode
import org.apache.spark.sql.{DataFrame, SparkSession}

trait DataframeUnifiedReader {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.metabolic.data.core.services.spark.writer

import com.metabolic.data.mapper.domain.io.EngineMode
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.run.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.io.WriteMode.WriteMode
import com.metabolic.data.mapper.domain.run.EngineMode
import org.apache.logging.log4j.scala.Logging
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.DataFrame
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.metabolic.data.core.services.util


import com.metabolic.data.mapper.domain.config.Config
import com.typesafe.config.Config
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import com.metabolic.data.mapper.domain.Config
import com.metabolic.data.mapper.domain.io.{FileSink, StreamSink}

object ConfigUtilsService {
Expand Down Expand Up @@ -74,7 +74,7 @@ object ConfigUtilsService {
}
}.mkString("")
}
def getTableName(mapping: com.metabolic.data.mapper.domain.Config): String = {
def getTableName(mapping: Config): String = {

mapping.sink match {
case f: FileSink => {
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.metabolic.data.core.services.glue.GlueCrawlerAction
import com.metabolic.data.core.services.spark.udfs.MetabolicUserDefinedFunction
import com.metabolic.data.core.services.util.ConfigReaderService
import com.metabolic.data.mapper.domain._
import com.metabolic.data.mapper.domain.config.Config
import com.metabolic.data.mapper.services.{AfterAction, ConfigParserService}
import org.apache.logging.log4j.scala.Logging
import org.apache.spark.sql.streaming.StreamingQuery
Expand Down Expand Up @@ -100,11 +101,11 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {

configs.foldLeft(Seq[StreamingQuery]()) { (streamingQueries, config) =>
before(config)
logger.info(s"Transforming ${config.name}")
logger.info(s"Transforming ${config.metadata.name}")
val streamingQuery = transform(config)
logger.info(s"Done with ${config.name}")
logger.info(s"Done with ${config.metadata.name}")
after(config, withAction)
logger.info(s"Done registering ${config.name}")
logger.info(s"Done registering ${config.metadata.name}")
streamingQueries ++ streamingQuery
}.par.foreach {
_.awaitTermination
Expand All @@ -117,7 +118,7 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {
def transform(mapping: Config)(implicit spark: SparkSession, region: Regions): Seq[StreamingQuery] = {

mapping.sources.foreach { source =>
MetabolicReader.read(source, mapping.environment.historical, mapping.environment.mode, mapping.environment.enableJDBC, mapping.environment.queryOutputLocation, mapping.getCanonicalName)
MetabolicReader.read(source, mapping.metadata.environment.historical, mapping.metadata.environment.mode, mapping.metadata.environment.enableJDBC, mapping.metadata.environment.queryOutputLocation, mapping.metadata.getCanonicalName)
}

mapping.mappings.foreach { mapping =>
Expand All @@ -126,8 +127,8 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {

val output: DataFrame = spark.table("output")

MetabolicWriter.write(output, mapping.sink, mapping.environment.historical, mapping.environment.autoSchema,
mapping.environment.baseCheckpointLocation, mapping.environment.mode, mapping.environment.namespaces)
MetabolicWriter.write(output, mapping.sink, mapping.metadata.environment.historical, mapping.metadata.environment.autoSchema,
mapping.metadata.environment.baseCheckpointLocation, mapping.metadata.environment.mode, mapping.metadata.environment.namespaces)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.metabolic.data.core.services.spark.reader.file.{CSVReader, DeltaReade
import com.metabolic.data.core.services.spark.reader.stream.KafkaReader
import com.metabolic.data.core.services.spark.reader.table.{GenericReader, TableReader}
import com.metabolic.data.core.services.spark.transformations._
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.run.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.io._
import com.metabolic.data.mapper.domain.ops._
import com.metabolic.data.mapper.domain.ops.source._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.metabolic.data.core.services.spark.transformations.FlattenTransform
import com.metabolic.data.core.services.spark.writer.file.IcebergWriter
import com.metabolic.data.core.services.spark.writer.partitioned_file._
import com.metabolic.data.core.services.spark.writer.stream.KafkaWriter
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.run.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.io._
import com.metabolic.data.mapper.domain.ops.SinkOp
import com.metabolic.data.mapper.domain.ops.sink._
Expand Down
46 changes: 0 additions & 46 deletions src/main/scala/com/metabolic/data/mapper/domain/Config.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.metabolic.data.mapper.domain.config

import com.metabolic.data.mapper.domain.io.{Sink, Source}
import com.metabolic.data.mapper.domain.ops.Mapping

case class Config(sources: Seq[Source],
mappings: Seq[Mapping],
sink: Sink,
metadata: Metadata)

case object Config {

def apply(sources: Seq[Source], mapping: Mapping, sink: Sink, metadata: Metadata): Config = {
new Config(sources, Seq(mapping), sink, metadata)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.metabolic.data.mapper.domain.config

import com.metabolic.data.core.domain.Environment


case class Metadata(name: String,
description:String,
owner: String,
sqlUrl: String,
confUrl: String,
environment: Environment) {



def getCanonicalName() = {
name
.toLowerCase()
.replaceAll(" ", "-")
}

}
Loading

0 comments on commit 01fe6f8

Please sign in to comment.