Skip to content

Commit

Permalink
Merge pull request #15 from metabolicdata/feature/drop_view
Browse files Browse the repository at this point in the history
- read Athena view with jdbc
- drop view before creation
  • Loading branch information
browniecode93 authored Oct 16, 2023
2 parents a5e697b + 1a33634 commit 8bbdb7d
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ case class Environment(name: String,
historical: Boolean = false,
autoSchema: Boolean = false,
namespaces: Seq[String] = Seq.empty,
infix_namespaces: Seq[String] = Seq.empty)
infix_namespaces: Seq[String] = Seq.empty,
enableJDBC: Boolean = false,
queryOutputLocation: String = "")
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging {
s"$dbName.$tableName"
}

def dropView(dbName: String, viewName: String) = {

val statement = s"DROP VIEW IF EXISTS " +
s"$dbName.$viewName"

dropTable(dbName, viewName, statement)

}

def createDeltaTable(dbName:String, tableName:String, location: String, recreate: Boolean = false) = {

if(recreate)
dropDeltaTable(dbName, tableName)
if(recreate) {
val delete_statement = dropTableStatement(dbName, tableName)
dropTable(dbName, tableName, delete_statement)
}

val statement = createTableStatement(dbName, tableName, location)
logger.info(s"Create table statement for ${dbName}.${tableName} is ${statement}")
Expand All @@ -52,9 +63,8 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging {

}

private def dropDeltaTable(dbName: String, tableName: String) = {
private def dropTable(dbName: String, tableName: String, statement: String) = {

val statement = dropTableStatement(dbName, tableName)
logger.info(s"Drop table statement for ${dbName}.${tableName} is ${statement}")
val queryExecutionContext = new QueryExecutionContext().withDatabase(dbName)
//val resultConfiguration = new ResultConfiguration().withOutputLocation(path)
Expand All @@ -64,8 +74,8 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging {
.withQueryExecutionContext(queryExecutionContext)
//.withResultConfiguration(resultConfiguration)
val queryExecutionId = athenaClient.startQueryExecution(startQueryExecutionRequest).getQueryExecutionId
logger.info(s"Table ${dbName}.${tableName} has been created")
val getQueryResultsRequest = new GetQueryResultsRequest().withQueryExecutionId(queryExecutionId)
logger.info(s"Query result is: ${getQueryResultsRequest}")

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@ package com.metabolic.data.core.services.spark.reader.table

import com.metabolic.data.core.services.spark.reader.DataframeUnifiedReader
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random

class TableReader(fqn : String) extends DataframeUnifiedReader {
class TableReader(fqn : String, enableJDBC: Boolean, queryOutputLocation: String) extends DataframeUnifiedReader {

override val input_identifier: String = fqn

override def readBatch(spark: SparkSession): DataFrame = {
spark.read
.table(input_identifier)
if(enableJDBC){
spark.read
.format("jdbc")
.option("driver", "com.simba.athena.jdbc.Driver")
.option("AwsCredentialsProviderClass", "com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider")
.option("url", "jdbc:awsathena://athena.eu-central-1.amazonaws.com:443")
.option("dbtable", s"AwsDataCatalog.${input_identifier}")
.option("S3OutputLocation", s"${queryOutputLocation}/${input_identifier}-${Random.nextInt(100000)}")
.load()
}else {
spark.read
.table(input_identifier)
}
}

override def readStream(spark: SparkSession): DataFrame = {
Expand All @@ -20,5 +32,6 @@ class TableReader(fqn : String) extends DataframeUnifiedReader {
}

object TableReader {
def apply(fqn: String) = new TableReader(fqn)
def apply(fqn: String, enableJDBC: Boolean, queryOutputLocation: String) = new TableReader(fqn, enableJDBC, queryOutputLocation)

}
10 changes: 7 additions & 3 deletions src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {
def transform(mapping: Config)(implicit spark: SparkSession, region: Regions): Unit = {

mapping.sources.foreach { source =>
MetabolicReader.read(source, mapping.environment.historical, mapping.environment.mode)
MetabolicReader.read(source, mapping.environment.historical, mapping.environment.mode, mapping.environment.enableJDBC, mapping.environment.queryOutputLocation)
}

mapping.mappings.foreach { mapping =>
Expand Down Expand Up @@ -134,11 +134,15 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {
val s3Path = config.sink.asInstanceOf[FileSink].path
.replace("version=1/", "")

val dbName = options.dbName
val prefix = ConfigUtilsService.getTablePrefix(options.namespaces, s3Path)
val tableName = prefix+ConfigUtilsService.getTableName(config)

new AthenaCatalogueService().dropView(dbName, tableName)

config.sink.format match {
case IOFormat.DELTA => new AthenaCatalogueService().createDeltaTable(options.dbName, prefix+ConfigUtilsService.getTableName(config), s3Path)
case _ => new GlueCrawlerService().register(options.dbName, options.iamRole, name, Seq(s3Path), prefix)
case IOFormat.DELTA => new AthenaCatalogueService().createDeltaTable(dbName, tableName, s3Path)
case _ => new GlueCrawlerService().register(dbName, options.iamRole, name, Seq(s3Path), prefix)
}
// Schema Separation
// if(options.name.contains("production")){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ import org.apache.spark.sql.{DataFrame, SparkSession}

object MetabolicReader extends Logging {

def read(source: Source, historical: Boolean, mode: EngineMode)(implicit spark: SparkSession) = {
def read(source: Source, historical: Boolean, mode: EngineMode, enableJDBC: Boolean, queryOutputLocation: String)(implicit spark: SparkSession) = {

val input = readSource(source, mode, spark)
val input = readSource(source, mode, spark, enableJDBC, queryOutputLocation)

val prepared = prepareSource(source, historical, input)

prepared.createOrReplaceTempView(source.name)

}

private def readSource(source: Source, mode: EngineMode, spark: SparkSession) = {
private def readSource(source: Source, mode: EngineMode, spark: SparkSession, enableJDBC: Boolean, queryOutputLocation: String) = {
source match {

case streamSource: StreamSource => {
Expand Down Expand Up @@ -58,7 +58,7 @@ object MetabolicReader extends Logging {
case meta: MetastoreSource => {
logger.info(s"Reading source ${meta.fqn} already in metastore")

new TableReader(meta.fqn)
new TableReader(meta.fqn, enableJDBC, queryOutputLocation)
.read(spark, mode)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,19 @@ class ConfigParserService(implicit region: Regions) extends Logging {
Seq.empty
}

Environment(envPrefix, engineMode, baseCheckpointLocation, crawl, dbname, iamrole, atlanToken, atlanBaseUrl,historical, autoSchema, namespaces, infix_namespaces)
val enableJDBC = if (config.hasPathOrNull("enableJDBC")) {
config.getBoolean("enableJDBC")
} else {
false
}

val queryOutputLocation = if (config.hasPathOrNull("queryOutputLocation")) {
config.getString("queryOutputLocation")
} else {
""
}

Environment(envPrefix, engineMode, baseCheckpointLocation, crawl, dbname, iamrole, atlanToken, atlanBaseUrl,historical, autoSchema, namespaces, infix_namespaces, enableJDBC, queryOutputLocation)
}

private def parseDefaults(config: HoconConfig) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MetabolicReaderIT extends AnyFunSuite

val source = getFileSource(inputPath, tableName, IOFormat.PARQUET.toString).head

MetabolicReader.read(source, true, EngineMode.Batch)(spark)
MetabolicReader.read(source, true, EngineMode.Batch, false, "")(spark)

val result = spark.table(tableName)

Expand All @@ -103,7 +103,7 @@ class MetabolicReaderIT extends AnyFunSuite

val source = getFileSource(inputPath, tableName, IOFormat.JSON.toString).head

MetabolicReader.read(source, true, EngineMode.Batch)(spark)
MetabolicReader.read(source, true, EngineMode.Batch, false, "")(spark)

val result = spark.table(tableName)

Expand All @@ -129,7 +129,7 @@ class MetabolicReaderIT extends AnyFunSuite

val source = getFileSource(inputPath, tableName, IOFormat.CSV.toString).head

MetabolicReader.read(source, true, EngineMode.Batch)(spark)
MetabolicReader.read(source, true, EngineMode.Batch, false, "")(spark)

val result = spark.table(tableName)

Expand All @@ -153,7 +153,7 @@ class MetabolicReaderIT extends AnyFunSuite

val source = getFileSource(inputPath, tableName, IOFormat.DELTA.toString).head

MetabolicReader.read(source, true, EngineMode.Batch)(spark)
MetabolicReader.read(source, true, EngineMode.Batch, false, "")(spark)

val result = spark.table(tableName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ProdConfigsMetabolicAppIT extends AnyFunSuite
.parseConfig(config)

parsedConfig.head.sources.foreach { source =>
MetabolicReader.read(source, true, EngineMode.Batch)(spark)
MetabolicReader.read(source, true, EngineMode.Batch, false, "")(spark)
}


Expand Down

0 comments on commit 8bbdb7d

Please sign in to comment.