From 11425400c9f024aec143773816298a5a1907b95f Mon Sep 17 00:00:00 2001 From: fei Date: Wed, 23 Aug 2023 15:36:27 +0200 Subject: [PATCH 1/6] add atlan metadata and description --- build.sbt | 3 +- .../services/catalogue/AtlanService.scala | 100 +++++++++++++++++- .../data/mapper/app/MetabolicApp.scala | 8 +- 3 files changed, 107 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 38412ac5..2abac265 100644 --- a/build.sbt +++ b/build.sbt @@ -28,7 +28,8 @@ libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk-glue" % awsVersion % Provided, "com.amazonaws" % "aws-java-sdk-kinesis" % awsVersion % Provided, "com.amazonaws" % "aws-java-sdk-athena" % awsVersion % Provided, - "org.scalaj" %% "scalaj-http" % "2.3.0", + "org.scalaj" %% "scalaj-http" % "2.4.2", + "com.typesafe.play" %% "play-json" % "2.9.4", "io.starburst.openx.data" % "json-serde" % "1.3.9-e.10" ) diff --git a/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala b/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala index 14238018..ae255add 100644 --- a/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala +++ b/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala @@ -2,16 +2,25 @@ package com.metabolic.data.core.services.catalogue import com.metabolic.data.core.services.util.ConfigUtilsService import com.metabolic.data.mapper.domain.Config +import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode import com.metabolic.data.mapper.domain.io._ +import com.metabolic.data.mapper.domain.ops.SQLMapping +import org.apache.hadoop.shaded.com.google.gson.JsonParseException import org.apache.logging.log4j.scala.Logging import java.math.BigInteger import java.security.MessageDigest import scala.collection.mutable +import play.api.libs.json._ + +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter + class AtlanService(token: String) extends Logging { val versionRegex = """version=(\d)+/""".r + private val baseUrl = "default/athena/1659962653/AwsDataCatalog/" def setLineage(mapping: Config): String = { val body = generateBodyJson(mapping) @@ -19,10 +28,68 @@ class AtlanService(token: String) extends Logging { HttpRequestHandler.sendHttpPostRequest("https://factorial.atlan.com/api/meta/entity/bulk#createProcesses", body, token) } + def setMetadata(mapping: Config): String = { + val outputTable = getOutputTableName(mapping) + val dbName = mapping.environment.dbName + val qualifiedName = s"${baseUrl}${dbName}/${outputTable}" + val guid = getGUI(qualifiedName).stripPrefix("\"").stripSuffix("\"") + guid match{ + case "" => "" + case _ => { + val last_synced = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + val mode = mapping.environment.mode + val body = + s""" + |{ + | "Data Quality": { + | "last_synced_at" : "${last_synced}", + | "engine_type":"${mode.toString}" + | } + |} + |""".stripMargin + logger.info(s"Atlan Metadata Json Body ${body}") + HttpRequestHandler.sendHttpPostRequest(s"https://factorial.atlan.com/api/meta/entity/guid/$guid/businessmetadata/displayName?isOverwrite=false", body, token) + } + } + } + + def setDescription(mapping: Config): String = { + val outputTable = getOutputTableName(mapping) + val dbName = mapping.environment.dbName + val qualifiedName = s"${baseUrl}${dbName}/${outputTable}" + val sql = mapping.mappings.head match { + case sqlmapping: SQLMapping => { + sqlmapping.sqlContents + } + case _ => "" + } + sql match { + case "" => "" + case _ => { + val body = + s""" + { + | "entities": [ + | { + | "typeName": "Table", + | "attributes": { + | "name": "$outputTable", + | "qualifiedName": "$qualifiedName", + | "description": "$sql" + | } + | } + | ] + |} + |""".stripMargin + logger.info(s"Atlan Description Json Body ${body}") + HttpRequestHandler.sendHttpPostRequest(s"https://factorial.atlan.com/api/meta/entity/bulk#changeDescriptione", body, token) + } + } + } + def generateBodyJson(mapping: Config): String = { val inputTables = getSourceTableNameList(mapping) val outputTable = getOutputTableName(mapping) - val baseUrl = "default/athena/1659962653/AwsDataCatalog/" val dbName = mapping.environment.dbName val name = inputTables.mkString(",") + " -> " + s"${dbName}/${outputTable}" val qualifiedName = baseUrl + md5Hash(name) @@ -119,4 +186,35 @@ class AtlanService(token: String) extends Logging { ConfigUtilsService.getTableName(mapping) } + def getGUI(qualifiedName: String): String = { + try { + val response = HttpRequestHandler.sendHttpGetRequest(s"https://factorial.atlan.com/api/meta/entity/uniqueAttribute/type/Table?attr:qualifiedName=${qualifiedName}", token) + isValidJson(response) match { + case true => { + val json = Json.parse(response) + json.\("entity").get("guid").toString() + } + case false => { + logger.info(s"can not find obj ${qualifiedName}") + "" + } + } + } + catch + { + case e: Exception => + println(s"An error occurred: ${e.getMessage}") + "" + } + } + + private def isValidJson(jsonString: String): Boolean = { + try { + Json.parse(jsonString) + true + } catch { + case _: JsonParseException => false + } + } + } \ No newline at end of file diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala index 4fc4f101..d4d8ceb4 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala @@ -114,8 +114,12 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging { if (mapping.sink.isInstanceOf[FileSink]) { logger.info(s"Done with ${mapping.name}, pushing lineage to Atlan") mapping.environment.atlanToken match { - case Some(token) => new AtlanService(token) - .setLineage(mapping) + case Some(token) => { + val atlan = new AtlanService(token) + atlan.setLineage(mapping) + atlan.setMetadata(mapping) + atlan.setDescription(mapping) + } case _ => "" } } else { From 127a5224af86e0684671a342cec5343a1f4383b1 Mon Sep 17 00:00:00 2001 From: fei Date: Wed, 23 Aug 2023 15:57:02 +0200 Subject: [PATCH 2/6] add test --- .../services/catalogue/AtlanService.scala | 42 +++++++++---------- .../mapper/services/AtlanServiceTest.scala | 36 ++++++++++++++++ 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala b/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala index ae255add..3386e842 100644 --- a/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala +++ b/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala @@ -57,34 +57,32 @@ class AtlanService(token: String) extends Logging { val outputTable = getOutputTableName(mapping) val dbName = mapping.environment.dbName val qualifiedName = s"${baseUrl}${dbName}/${outputTable}" + val body = generateDescriptionBodyJson(mapping, outputTable, qualifiedName) + logger.info(s"Atlan Description Json Body ${body}") + HttpRequestHandler.sendHttpPostRequest(s"https://factorial.atlan.com/api/meta/entity/bulk#changeDescriptione", body, token) + } + + def generateDescriptionBodyJson(mapping: Config, outputTable: String, qualifiedName: String): String = { val sql = mapping.mappings.head match { case sqlmapping: SQLMapping => { sqlmapping.sqlContents } case _ => "" } - sql match { - case "" => "" - case _ => { - val body = - s""" - { - | "entities": [ - | { - | "typeName": "Table", - | "attributes": { - | "name": "$outputTable", - | "qualifiedName": "$qualifiedName", - | "description": "$sql" - | } - | } - | ] - |} - |""".stripMargin - logger.info(s"Atlan Description Json Body ${body}") - HttpRequestHandler.sendHttpPostRequest(s"https://factorial.atlan.com/api/meta/entity/bulk#changeDescriptione", body, token) - } - } + s""" + |{ + | "entities": [ + | { + | "typeName": "Table", + | "attributes": { + | "name": "$outputTable", + | "qualifiedName": "$qualifiedName", + | "description": "$sql" + | } + | } + | ] + |} + |""".stripMargin } def generateBodyJson(mapping: Config): String = { diff --git a/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala b/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala index 44c46f05..6f7e2e88 100644 --- a/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala +++ b/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala @@ -106,4 +106,40 @@ class AtlanServiceTest extends AnyFunSuite print(calculatedJson) assert(expectedJson.trim.equalsIgnoreCase(calculatedJson.trim)) } + + test("Test fake asset GUI - should not stop execution") { + val response = new AtlanService("foo") + .getGUI("test") + assert(response.trim.equalsIgnoreCase("")) + } + + test("Test description body") { + val testingConfig = Config( + "", + List(io.FileSource("raw/stripe/fake_employee/version=3/", "employees", IOFormat.PARQUET), io.FileSource("clean/fake_employee_s/version=123/", "employeesss", IOFormat.PARQUET), io.FileSource("raw/hubspot/owners/", "owners", IOFormat.PARQUET), io.FileSource("clean/hubspot_owners/", "clean_owners", IOFormat.PARQUET)), + List(new SQLFileMapping("src/test/resources/simple.sql", region)), + io.FileSink("test", "src/test/tmp/gold/stripe_f_fake_employee_t/version=4/", SaveMode.Overwrite, IOFormat.PARQUET), + Defaults(ConfigFactory.load()), + Environment("", EngineMode.Batch, "", false, "test", "", Option(""), false, false, Seq("raw", "clean", "gold", "bronze"), Seq("raw_stripe", "raw_hubspot")) + ) + val calculatedJson = new AtlanService("foo") + .generateDescriptionBodyJson(testingConfig, "gold_stripe_f_fake_employee_t", "foo/test/gold_stripe_f_fake_employee_t") + + val expectedJson = + """ + |{ + | "entities": [ + | { + | "typeName": "Table", + | "attributes": { + | "name": "gold_stripe_f_fake_employee_t", + | "qualifiedName": "foo/test/gold_stripe_f_fake_employee_t", + | "description": "select * from employees where age < 40" + | } + | } + | ] + |} + |""".stripMargin + assert(expectedJson.trim.equalsIgnoreCase(calculatedJson.trim)) + } } From b0d9be2f38149f36f1b7b392b82183d76b3200d9 Mon Sep 17 00:00:00 2001 From: fei Date: Thu, 24 Aug 2023 12:38:09 +0200 Subject: [PATCH 3/6] add baseurl as param --- .../com/metabolic/data/core/domain/Environment.scala | 1 + .../data/core/services/catalogue/AtlanService.scala | 3 +-- .../com/metabolic/data/mapper/app/MetabolicApp.scala | 2 +- .../data/mapper/services/ConfigParserService.scala | 8 +++++++- .../data/mapper/services/AtlanServiceTest.scala | 10 +++++----- 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/metabolic/data/core/domain/Environment.scala b/src/main/scala/com/metabolic/data/core/domain/Environment.scala index 95348d49..c3b5afd6 100644 --- a/src/main/scala/com/metabolic/data/core/domain/Environment.scala +++ b/src/main/scala/com/metabolic/data/core/domain/Environment.scala @@ -8,6 +8,7 @@ case class Environment(name: String, dbName: String, iamRole: String, atlanToken: Option[String], + atlanBaseUrl: Option[String], historical: Boolean = false, autoSchema: Boolean = false, namespaces: Seq[String] = Seq.empty, diff --git a/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala b/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala index 3386e842..434245a6 100644 --- a/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala +++ b/src/main/scala/com/metabolic/data/core/services/catalogue/AtlanService.scala @@ -17,10 +17,9 @@ import java.time.LocalDateTime import java.time.format.DateTimeFormatter -class AtlanService(token: String) extends Logging { +class AtlanService(token: String, baseUrl: String) extends Logging { val versionRegex = """version=(\d)+/""".r - private val baseUrl = "default/athena/1659962653/AwsDataCatalog/" def setLineage(mapping: Config): String = { val body = generateBodyJson(mapping) diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala index d4d8ceb4..046aba66 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala @@ -115,7 +115,7 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging { logger.info(s"Done with ${mapping.name}, pushing lineage to Atlan") mapping.environment.atlanToken match { case Some(token) => { - val atlan = new AtlanService(token) + val atlan = new AtlanService(token, mapping.environment.atlanBaseUrl.get) atlan.setLineage(mapping) atlan.setMetadata(mapping) atlan.setDescription(mapping) diff --git a/src/main/scala/com/metabolic/data/mapper/services/ConfigParserService.scala b/src/main/scala/com/metabolic/data/mapper/services/ConfigParserService.scala index 9277d481..953e7121 100644 --- a/src/main/scala/com/metabolic/data/mapper/services/ConfigParserService.scala +++ b/src/main/scala/com/metabolic/data/mapper/services/ConfigParserService.scala @@ -62,6 +62,12 @@ class ConfigParserService(implicit region: Regions) extends Logging { Option.empty } + val atlanBaseUrl = if (config.hasPathOrNull("atlan_url")) { + Option.apply(config.getString("atlan_url")) + } else { + Option.empty + } + val autoSchema = if (config.hasPathOrNull("autoSchema")){ config.getBoolean("autoSchema") } else { @@ -80,7 +86,7 @@ class ConfigParserService(implicit region: Regions) extends Logging { Seq.empty } - Environment(envPrefix, engineMode, baseCheckpointLocation, crawl, dbname, iamrole, atlanToken, historical, autoSchema, namespaces, infix_namespaces) + Environment(envPrefix, engineMode, baseCheckpointLocation, crawl, dbname, iamrole, atlanToken, atlanBaseUrl,historical, autoSchema, namespaces, infix_namespaces) } private def parseDefaults(config: HoconConfig) = { diff --git a/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala b/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala index 6f7e2e88..4c1fc3f3 100644 --- a/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala +++ b/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala @@ -49,7 +49,7 @@ class AtlanServiceTest extends AnyFunSuite List(new SQLFileMapping("src/test/resources/simple.sql", region)), io.FileSink("test", "src/test/tmp/gold/stripe_f_fake_employee_t/version=4/", SaveMode.Overwrite, IOFormat.PARQUET), Defaults(ConfigFactory.load()), - Environment("", EngineMode.Batch, "", false, "test", "", Option(""), false, false,Seq("raw", "clean", "gold", "bronze"), Seq("raw_stripe", "raw_hubspot")) + Environment("", EngineMode.Batch, "", false, "test", "", Option(""), Option(""), false, false,Seq("raw", "clean", "gold", "bronze"), Seq("raw_stripe", "raw_hubspot")) ) val expectedJson = @@ -101,14 +101,14 @@ class AtlanServiceTest extends AnyFunSuite | ] |}""".stripMargin - val calculatedJson = new AtlanService("foo") + val calculatedJson = new AtlanService("foo", "foo") .generateBodyJson(testingConfig) print(calculatedJson) assert(expectedJson.trim.equalsIgnoreCase(calculatedJson.trim)) } test("Test fake asset GUI - should not stop execution") { - val response = new AtlanService("foo") + val response = new AtlanService("foo", "foo") .getGUI("test") assert(response.trim.equalsIgnoreCase("")) } @@ -120,9 +120,9 @@ class AtlanServiceTest extends AnyFunSuite List(new SQLFileMapping("src/test/resources/simple.sql", region)), io.FileSink("test", "src/test/tmp/gold/stripe_f_fake_employee_t/version=4/", SaveMode.Overwrite, IOFormat.PARQUET), Defaults(ConfigFactory.load()), - Environment("", EngineMode.Batch, "", false, "test", "", Option(""), false, false, Seq("raw", "clean", "gold", "bronze"), Seq("raw_stripe", "raw_hubspot")) + Environment("", EngineMode.Batch, "", false, "test", "", Option(""),Option(""), false, false, Seq("raw", "clean", "gold", "bronze"), Seq("raw_stripe", "raw_hubspot")) ) - val calculatedJson = new AtlanService("foo") + val calculatedJson = new AtlanService("foo", "foo") .generateDescriptionBodyJson(testingConfig, "gold_stripe_f_fake_employee_t", "foo/test/gold_stripe_f_fake_employee_t") val expectedJson = From 49aee31d028381a45d52279a053c16bc712c3221 Mon Sep 17 00:00:00 2001 From: fei Date: Thu, 24 Aug 2023 13:03:16 +0200 Subject: [PATCH 4/6] add new param --- .../scala/com/metabolic/data/core/domain/CoreConfig.scala | 2 +- src/main/scala/com/metabolic/data/mapper/domain/Config.scala | 2 +- .../scala/com/metabolic/data/mapper/app/MetabolicAppIT.scala | 4 ++-- .../com/metabolic/data/mapper/app/MetabolicWriterIT.scala | 2 +- .../data/mapper/services/GlueCatalogServiceTest.scala | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/metabolic/data/core/domain/CoreConfig.scala b/src/main/scala/com/metabolic/data/core/domain/CoreConfig.scala index c58bf5ca..018ee6ef 100644 --- a/src/main/scala/com/metabolic/data/core/domain/CoreConfig.scala +++ b/src/main/scala/com/metabolic/data/core/domain/CoreConfig.scala @@ -4,4 +4,4 @@ import com.metabolic.data.mapper.domain.io.EngineMode import com.typesafe.config.ConfigFactory abstract class CoreConfig(val defaults: Defaults = Defaults(ConfigFactory.load()), - val environment: Environment = Environment("", EngineMode.Batch, "", false, "","", Option.empty)) \ No newline at end of file + val environment: Environment = Environment("", EngineMode.Batch, "", false, "","", Option.empty, Option.empty)) \ No newline at end of file diff --git a/src/main/scala/com/metabolic/data/mapper/domain/Config.scala b/src/main/scala/com/metabolic/data/mapper/domain/Config.scala index 75505e4d..dbdb3493 100644 --- a/src/main/scala/com/metabolic/data/mapper/domain/Config.scala +++ b/src/main/scala/com/metabolic/data/mapper/domain/Config.scala @@ -27,7 +27,7 @@ object Config { def apply(name: String, sources: Seq[Source], mappings: Seq[Mapping], sink: Sink): Config = { val defaults: Defaults = Defaults(ConfigFactory.load()) - val environment: Environment = Environment("", EngineMode.Batch, "", false,"test","",Option.empty) + val environment: Environment = Environment("", EngineMode.Batch, "", false,"test","",Option.empty, Option.empty) new Config(name, sources, mappings, sink, defaults, environment) } } diff --git a/src/test/scala/com/metabolic/data/mapper/app/MetabolicAppIT.scala b/src/test/scala/com/metabolic/data/mapper/app/MetabolicAppIT.scala index 4666a6be..a2ac6b4a 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/MetabolicAppIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/MetabolicAppIT.scala @@ -388,7 +388,7 @@ class MetabolicAppIT extends AnyFunSuite List(SQLStatmentMapping("WITH a as (SELECT * from vowels) SELECT *, make_date(yyyy,mm,dd) as date from a")), io.FileSink("test", "src/test/tmp/vowelsv1_o", SaveMode.Overwrite, IOFormat.PARQUET, eventTimeColumnName = Option("date"), ops = Seq(ManageSchemaSinkOp())), Defaults(ConfigFactory.load()), - Environment("", EngineMode.Batch, "", false,"dbName","",Option.empty,false, true) + Environment("", EngineMode.Batch, "", false,"dbName","",Option.empty, Option.empty,false, true) ) @@ -464,7 +464,7 @@ class MetabolicAppIT extends AnyFunSuite List(SQLStatmentMapping("WITH a as (SELECT * from vowels) SELECT *, CAST(make_date(yyyy,mm,dd) as string) as date from a")), io.FileSink("test", "src/test/tmp/vowels_o", SaveMode.Overwrite, IOFormat.PARQUET, eventTimeColumnName = Option("date")), Defaults(ConfigFactory.load()), - Environment("", EngineMode.Batch, "", false,"dbName","",Option.empty) + Environment("", EngineMode.Batch, "", false,"dbName","",Option.empty, Option.empty) ) diff --git a/src/test/scala/com/metabolic/data/mapper/app/MetabolicWriterIT.scala b/src/test/scala/com/metabolic/data/mapper/app/MetabolicWriterIT.scala index 5442a976..bfaede11 100644 --- a/src/test/scala/com/metabolic/data/mapper/app/MetabolicWriterIT.scala +++ b/src/test/scala/com/metabolic/data/mapper/app/MetabolicWriterIT.scala @@ -88,7 +88,7 @@ class MetabolicWriterIT extends AnyFunSuite .loadConfig(genericSourceHOCON, "") SinkConfigParserService() - .parseSink(fileSinkConfig, Environment("",EngineMode.Batch, "", false,"dbName","",Option.empty, false, autoSchema = true)) + .parseSink(fileSinkConfig, Environment("",EngineMode.Batch, "", false,"dbName","",Option.empty, Option.empty, false, autoSchema = true)) } diff --git a/src/test/scala/com/metabolic/data/mapper/services/GlueCatalogServiceTest.scala b/src/test/scala/com/metabolic/data/mapper/services/GlueCatalogServiceTest.scala index 395d96e6..77dfd334 100644 --- a/src/test/scala/com/metabolic/data/mapper/services/GlueCatalogServiceTest.scala +++ b/src/test/scala/com/metabolic/data/mapper/services/GlueCatalogServiceTest.scala @@ -13,7 +13,7 @@ class GlueCatalogServiceTest extends AnyFunSuite ignore("Works") { val s3Path = Seq("s3://factorial-etl/entity_mapper/dl/clean/subs/") - val config = Environment("[Test] ", EngineMode.Batch, "", true, "test_data_lake", "AWSGlueServiceRoleDefault", Option.apply("fooBarAtlan")) + val config = Environment("[Test] ", EngineMode.Batch, "", true, "test_data_lake", "AWSGlueServiceRoleDefault", Option.apply("fooBarAtlan"), Option.apply("fooBarAtlan")) //GlueCatalogService.register(config, "subsy", s3Path, "clean_") From 1f2cb3df90e87b6136a8bc9394085fd925797419 Mon Sep 17 00:00:00 2001 From: fei Date: Thu, 24 Aug 2023 13:23:09 +0200 Subject: [PATCH 5/6] fix tests --- .../metabolic/data/mapper/app/MetabolicApp.scala | 2 +- .../data/mapper/services/AtlanServiceTest.scala | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala index 046aba66..f3f74761 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala @@ -115,7 +115,7 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging { logger.info(s"Done with ${mapping.name}, pushing lineage to Atlan") mapping.environment.atlanToken match { case Some(token) => { - val atlan = new AtlanService(token, mapping.environment.atlanBaseUrl.get) + val atlan = new AtlanService(token, mapping.environment.atlanBaseUrl.getOrElse("")) atlan.setLineage(mapping) atlan.setMetadata(mapping) atlan.setDescription(mapping) diff --git a/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala b/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala index 4c1fc3f3..1cd65526 100644 --- a/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala +++ b/src/test/scala/com/metabolic/data/mapper/services/AtlanServiceTest.scala @@ -59,17 +59,17 @@ class AtlanServiceTest extends AnyFunSuite | "typeName": "Process", | "attributes": { | "name": "test/raw_stripe_fake_employee,test/clean_fake_employee_s,test/raw_hubspot_owners,test/clean_hubspot_owners -> test/gold_stripe_f_fake_employee_t", - | "qualifiedName": "default/athena/1659962653/AwsDataCatalog/47b83f3425f72bfe7cbf3d966f9bda4b", + | "qualifiedName": "foo47b83f3425f72bfe7cbf3d966f9bda4b", | "connectorName": "athena", | "connectionName": "athena", - | "connectionQualifiedName": "default/athena/1659962653/AwsDataCatalog" + | "connectionQualifiedName": "fo" | }, | "relationshipAttributes": { | "outputs": [ | { | "typeName": "Table", | "uniqueAttributes": { - | "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/gold_stripe_f_fake_employee_t" + | "qualifiedName": "footest/gold_stripe_f_fake_employee_t" | } | } | ], @@ -77,22 +77,22 @@ class AtlanServiceTest extends AnyFunSuite | { | "typeName": "Table", | "uniqueAttributes": { - | "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/raw_stripe_fake_employee" + | "qualifiedName": "footest/raw_stripe_fake_employee" | } | }, { | "typeName": "Table", | "uniqueAttributes": { - | "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/clean_fake_employee_s" + | "qualifiedName": "footest/clean_fake_employee_s" | } | }, { | "typeName": "Table", | "uniqueAttributes": { - | "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/raw_hubspot_owners" + | "qualifiedName": "footest/raw_hubspot_owners" | } | }, { | "typeName": "Table", | "uniqueAttributes": { - | "qualifiedName": "default/athena/1659962653/AwsDataCatalog/test/clean_hubspot_owners" + | "qualifiedName": "footest/clean_hubspot_owners" | } | } | ] From 648c25690aa61035a0d9a4e47b257af25be9daf9 Mon Sep 17 00:00:00 2001 From: fei Date: Thu, 24 Aug 2023 13:48:21 +0200 Subject: [PATCH 6/6] run ci