Skip to content

Commit

Permalink
Merge pull request #13 from metabolicdata/feature/add_atlan_metadata
Browse files Browse the repository at this point in the history
feature/add_atlan_metadata
  • Loading branch information
browniecode93 authored Aug 24, 2023
2 parents aeaad64 + 648c256 commit b3d0c6c
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 21 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-glue" % awsVersion % Provided,
"com.amazonaws" % "aws-java-sdk-kinesis" % awsVersion % Provided,
"com.amazonaws" % "aws-java-sdk-athena" % awsVersion % Provided,
"org.scalaj" %% "scalaj-http" % "2.3.0",
"org.scalaj" %% "scalaj-http" % "2.4.2",
"com.typesafe.play" %% "play-json" % "2.9.4",
"io.starburst.openx.data" % "json-serde" % "1.3.9-e.10"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ import com.metabolic.data.mapper.domain.io.EngineMode
import com.typesafe.config.ConfigFactory

abstract class CoreConfig(val defaults: Defaults = Defaults(ConfigFactory.load()),
val environment: Environment = Environment("", EngineMode.Batch, "", false, "","", Option.empty))
val environment: Environment = Environment("", EngineMode.Batch, "", false, "","", Option.empty, Option.empty))
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ case class Environment(name: String,
dbName: String,
iamRole: String,
atlanToken: Option[String],
atlanBaseUrl: Option[String],
historical: Boolean = false,
autoSchema: Boolean = false,
namespaces: Seq[String] = Seq.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ package com.metabolic.data.core.services.catalogue

import com.metabolic.data.core.services.util.ConfigUtilsService
import com.metabolic.data.mapper.domain.Config
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode
import com.metabolic.data.mapper.domain.io._
import com.metabolic.data.mapper.domain.ops.SQLMapping
import org.apache.hadoop.shaded.com.google.gson.JsonParseException
import org.apache.logging.log4j.scala.Logging

import java.math.BigInteger
import java.security.MessageDigest
import scala.collection.mutable
import play.api.libs.json._

class AtlanService(token: String) extends Logging {
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter


class AtlanService(token: String, baseUrl: String) extends Logging {

val versionRegex = """version=(\d)+/""".r

Expand All @@ -19,10 +27,66 @@ class AtlanService(token: String) extends Logging {
HttpRequestHandler.sendHttpPostRequest("https://factorial.atlan.com/api/meta/entity/bulk#createProcesses", body, token)
}

def setMetadata(mapping: Config): String = {
val outputTable = getOutputTableName(mapping)
val dbName = mapping.environment.dbName
val qualifiedName = s"${baseUrl}${dbName}/${outputTable}"
val guid = getGUI(qualifiedName).stripPrefix("\"").stripSuffix("\"")
guid match{
case "" => ""
case _ => {
val last_synced = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
val mode = mapping.environment.mode
val body =
s"""
|{
| "Data Quality": {
| "last_synced_at" : "${last_synced}",
| "engine_type":"${mode.toString}"
| }
|}
|""".stripMargin
logger.info(s"Atlan Metadata Json Body ${body}")
HttpRequestHandler.sendHttpPostRequest(s"https://factorial.atlan.com/api/meta/entity/guid/$guid/businessmetadata/displayName?isOverwrite=false", body, token)
}
}
}

def setDescription(mapping: Config): String = {
val outputTable = getOutputTableName(mapping)
val dbName = mapping.environment.dbName
val qualifiedName = s"${baseUrl}${dbName}/${outputTable}"
val body = generateDescriptionBodyJson(mapping, outputTable, qualifiedName)
logger.info(s"Atlan Description Json Body ${body}")
HttpRequestHandler.sendHttpPostRequest(s"https://factorial.atlan.com/api/meta/entity/bulk#changeDescriptione", body, token)
}

def generateDescriptionBodyJson(mapping: Config, outputTable: String, qualifiedName: String): String = {
val sql = mapping.mappings.head match {
case sqlmapping: SQLMapping => {
sqlmapping.sqlContents
}
case _ => ""
}
s"""
|{
| "entities": [
| {
| "typeName": "Table",
| "attributes": {
| "name": "$outputTable",
| "qualifiedName": "$qualifiedName",
| "description": "$sql"
| }
| }
| ]
|}
|""".stripMargin
}

def generateBodyJson(mapping: Config): String = {
val inputTables = getSourceTableNameList(mapping)
val outputTable = getOutputTableName(mapping)
val baseUrl = "default/athena/1659962653/AwsDataCatalog/"
val dbName = mapping.environment.dbName
val name = inputTables.mkString(",") + " -> " + s"${dbName}/${outputTable}"
val qualifiedName = baseUrl + md5Hash(name)
Expand Down Expand Up @@ -119,4 +183,35 @@ class AtlanService(token: String) extends Logging {
ConfigUtilsService.getTableName(mapping)
}

def getGUI(qualifiedName: String): String = {
try {
val response = HttpRequestHandler.sendHttpGetRequest(s"https://factorial.atlan.com/api/meta/entity/uniqueAttribute/type/Table?attr:qualifiedName=${qualifiedName}", token)
isValidJson(response) match {
case true => {
val json = Json.parse(response)
json.\("entity").get("guid").toString()
}
case false => {
logger.info(s"can not find obj ${qualifiedName}")
""
}
}
}
catch
{
case e: Exception =>
println(s"An error occurred: ${e.getMessage}")
""
}
}

private def isValidJson(jsonString: String): Boolean = {
try {
Json.parse(jsonString)
true
} catch {
case _: JsonParseException => false
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,12 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {
if (mapping.sink.isInstanceOf[FileSink]) {
logger.info(s"Done with ${mapping.name}, pushing lineage to Atlan")
mapping.environment.atlanToken match {
case Some(token) => new AtlanService(token)
.setLineage(mapping)
case Some(token) => {
val atlan = new AtlanService(token, mapping.environment.atlanBaseUrl.getOrElse(""))
atlan.setLineage(mapping)
atlan.setMetadata(mapping)
atlan.setDescription(mapping)
}
case _ => ""
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Config {

def apply(name: String, sources: Seq[Source], mappings: Seq[Mapping], sink: Sink): Config = {
val defaults: Defaults = Defaults(ConfigFactory.load())
val environment: Environment = Environment("", EngineMode.Batch, "", false,"test","",Option.empty)
val environment: Environment = Environment("", EngineMode.Batch, "", false,"test","",Option.empty, Option.empty)
new Config(name, sources, mappings, sink, defaults, environment)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ class ConfigParserService(implicit region: Regions) extends Logging {
Option.empty
}

val atlanBaseUrl = if (config.hasPathOrNull("atlan_url")) {
Option.apply(config.getString("atlan_url"))
} else {
Option.empty
}

val autoSchema = if (config.hasPathOrNull("autoSchema")){
config.getBoolean("autoSchema")
} else {
Expand All @@ -80,7 +86,7 @@ class ConfigParserService(implicit region: Regions) extends Logging {
Seq.empty
}

Environment(envPrefix, engineMode, baseCheckpointLocation, crawl, dbname, iamrole, atlanToken, historical, autoSchema, namespaces, infix_namespaces)
Environment(envPrefix, engineMode, baseCheckpointLocation, crawl, dbname, iamrole, atlanToken, atlanBaseUrl,historical, autoSchema, namespaces, infix_namespaces)
}

private def parseDefaults(config: HoconConfig) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ class MetabolicAppIT extends AnyFunSuite
List(SQLStatmentMapping("WITH a as (SELECT * from vowels) SELECT *, make_date(yyyy,mm,dd) as date from a")),
io.FileSink("test", "src/test/tmp/vowelsv1_o", SaveMode.Overwrite, IOFormat.PARQUET, eventTimeColumnName = Option("date"), ops = Seq(ManageSchemaSinkOp())),
Defaults(ConfigFactory.load()),
Environment("", EngineMode.Batch, "", false,"dbName","",Option.empty,false, true)
Environment("", EngineMode.Batch, "", false,"dbName","",Option.empty, Option.empty,false, true)

)

Expand Down Expand Up @@ -464,7 +464,7 @@ class MetabolicAppIT extends AnyFunSuite
List(SQLStatmentMapping("WITH a as (SELECT * from vowels) SELECT *, CAST(make_date(yyyy,mm,dd) as string) as date from a")),
io.FileSink("test", "src/test/tmp/vowels_o", SaveMode.Overwrite, IOFormat.PARQUET, eventTimeColumnName = Option("date")),
Defaults(ConfigFactory.load()),
Environment("", EngineMode.Batch, "", false,"dbName","",Option.empty)
Environment("", EngineMode.Batch, "", false,"dbName","",Option.empty, Option.empty)

)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class MetabolicWriterIT extends AnyFunSuite
.loadConfig(genericSourceHOCON, "")

SinkConfigParserService()
.parseSink(fileSinkConfig, Environment("",EngineMode.Batch, "", false,"dbName","",Option.empty, false, autoSchema = true))
.parseSink(fileSinkConfig, Environment("",EngineMode.Batch, "", false,"dbName","",Option.empty, Option.empty, false, autoSchema = true))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AtlanServiceTest extends AnyFunSuite
List(new SQLFileMapping("src/test/resources/simple.sql", region)),
io.FileSink("test", "src/test/tmp/gold/stripe_f_fake_employee_t/version=4/", SaveMode.Overwrite, IOFormat.PARQUET),
Defaults(ConfigFactory.load()),
Environment("", EngineMode.Batch, "", false, "test", "", Option(""), false, false,Seq("raw", "clean", "gold", "bronze"), Seq("raw_stripe", "raw_hubspot"))
Environment("", EngineMode.Batch, "", false, "test", "", Option(""), Option(""), false, false,Seq("raw", "clean", "gold", "bronze"), Seq("raw_stripe", "raw_hubspot"))
)

val expectedJson =
Expand All @@ -59,40 +59,40 @@ class AtlanServiceTest extends AnyFunSuite
| "typeName": "Process",
| "attributes": {
| "name": "test/raw_stripe_fake_employee,test/clean_fake_employee_s,test/raw_hubspot_owners,test/clean_hubspot_owners -> test/gold_stripe_f_fake_employee_t",
| "qualifiedName": "default/athena/1659962653/AwsDataCatalog/47b83f3425f72bfe7cbf3d966f9bda4b",
| "qualifiedName": "foo47b83f3425f72bfe7cbf3d966f9bda4b",
| "connectorName": "athena",
| "connectionName": "athena",
| "connectionQualifiedName": "default/athena/1659962653/AwsDataCatalog"
| "connectionQualifiedName": "fo"
| },
| "relationshipAttributes": {
| "outputs": [
| {
| "typeName": "Table",
| "uniqueAttributes": {
| "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/gold_stripe_f_fake_employee_t"
| "qualifiedName": "footest/gold_stripe_f_fake_employee_t"
| }
| }
| ],
| "inputs": [
| {
| "typeName": "Table",
| "uniqueAttributes": {
| "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/raw_stripe_fake_employee"
| "qualifiedName": "footest/raw_stripe_fake_employee"
| }
| }, {
| "typeName": "Table",
| "uniqueAttributes": {
| "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/clean_fake_employee_s"
| "qualifiedName": "footest/clean_fake_employee_s"
| }
| }, {
| "typeName": "Table",
| "uniqueAttributes": {
| "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/raw_hubspot_owners"
| "qualifiedName": "footest/raw_hubspot_owners"
| }
| }, {
| "typeName": "Table",
| "uniqueAttributes": {
| "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/clean_hubspot_owners"
| "qualifiedName": "footest/clean_hubspot_owners"
| }
| }
| ]
Expand All @@ -101,9 +101,45 @@ class AtlanServiceTest extends AnyFunSuite
| ]
|}""".stripMargin

val calculatedJson = new AtlanService("foo")
val calculatedJson = new AtlanService("foo", "foo")
.generateBodyJson(testingConfig)
print(calculatedJson)
assert(expectedJson.trim.equalsIgnoreCase(calculatedJson.trim))
}

test("Test fake asset GUI - should not stop execution") {
val response = new AtlanService("foo", "foo")
.getGUI("test")
assert(response.trim.equalsIgnoreCase(""))
}

test("Test description body") {
val testingConfig = Config(
"",
List(io.FileSource("raw/stripe/fake_employee/version=3/", "employees", IOFormat.PARQUET), io.FileSource("clean/fake_employee_s/version=123/", "employeesss", IOFormat.PARQUET), io.FileSource("raw/hubspot/owners/", "owners", IOFormat.PARQUET), io.FileSource("clean/hubspot_owners/", "clean_owners", IOFormat.PARQUET)),
List(new SQLFileMapping("src/test/resources/simple.sql", region)),
io.FileSink("test", "src/test/tmp/gold/stripe_f_fake_employee_t/version=4/", SaveMode.Overwrite, IOFormat.PARQUET),
Defaults(ConfigFactory.load()),
Environment("", EngineMode.Batch, "", false, "test", "", Option(""),Option(""), false, false, Seq("raw", "clean", "gold", "bronze"), Seq("raw_stripe", "raw_hubspot"))
)
val calculatedJson = new AtlanService("foo", "foo")
.generateDescriptionBodyJson(testingConfig, "gold_stripe_f_fake_employee_t", "foo/test/gold_stripe_f_fake_employee_t")

val expectedJson =
"""
|{
| "entities": [
| {
| "typeName": "Table",
| "attributes": {
| "name": "gold_stripe_f_fake_employee_t",
| "qualifiedName": "foo/test/gold_stripe_f_fake_employee_t",
| "description": "select * from employees where age < 40"
| }
| }
| ]
|}
|""".stripMargin
assert(expectedJson.trim.equalsIgnoreCase(calculatedJson.trim))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class GlueCatalogServiceTest extends AnyFunSuite
ignore("Works") {

val s3Path = Seq("s3://factorial-etl/entity_mapper/dl/clean/subs/")
val config = Environment("[Test] ", EngineMode.Batch, "", true, "test_data_lake", "AWSGlueServiceRoleDefault", Option.apply("fooBarAtlan"))
val config = Environment("[Test] ", EngineMode.Batch, "", true, "test_data_lake", "AWSGlueServiceRoleDefault", Option.apply("fooBarAtlan"), Option.apply("fooBarAtlan"))

//GlueCatalogService.register(config, "subsy", s3Path, "clean_")

Expand Down

0 comments on commit b3d0c6c

Please sign in to comment.