diff --git a/src/main/scala/com/metabolic/data/core/services/glue/AthenaCatalogueService.scala b/src/main/scala/com/metabolic/data/core/services/glue/AthenaCatalogueService.scala index d1ebffe..0f71ba2 100644 --- a/src/main/scala/com/metabolic/data/core/services/glue/AthenaCatalogueService.scala +++ b/src/main/scala/com/metabolic/data/core/services/glue/AthenaCatalogueService.scala @@ -32,16 +32,20 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging { s"$dbName.$tableName" } - def dropViewStatement(dbName: String, viewName: String): String = { + def dropView(dbName: String, viewName: String) = { - s"DROP VIEW IF EXISTS " + + val statement = s"DROP VIEW IF EXISTS " + s"$dbName.$viewName" + + dropTable(dbName, viewName, statement) + } def createDeltaTable(dbName:String, tableName:String, location: String, recreate: Boolean = false) = { if(recreate) { - dropDeltaTable(dbName, tableName) + val delete_statement = dropTableStatement(dbName, tableName) + dropTable(dbName, tableName, delete_statement) } val statement = createTableStatement(dbName, tableName, location) @@ -59,9 +63,8 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging { } - private def dropDeltaTable(dbName: String, tableName: String) = { + private def dropTable(dbName: String, tableName: String, statement: String) = { - val statement = dropTableStatement(dbName, tableName) logger.info(s"Drop table statement for ${dbName}.${tableName} is ${statement}") val queryExecutionContext = new QueryExecutionContext().withDatabase(dbName) //val resultConfiguration = new ResultConfiguration().withOutputLocation(path) @@ -71,8 +74,8 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging { .withQueryExecutionContext(queryExecutionContext) //.withResultConfiguration(resultConfiguration) val queryExecutionId = athenaClient.startQueryExecution(startQueryExecutionRequest).getQueryExecutionId - logger.info(s"Table ${dbName}.${tableName} has been created") val getQueryResultsRequest = new GetQueryResultsRequest().withQueryExecutionId(queryExecutionId) + logger.info(s"Query result is: ${getQueryResultsRequest}") } diff --git a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala index cd7d40b..95b1cb8 100644 --- a/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala +++ b/src/main/scala/com/metabolic/data/mapper/app/MetabolicApp.scala @@ -138,7 +138,7 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging { val prefix = ConfigUtilsService.getTablePrefix(options.namespaces, s3Path) val tableName = prefix+ConfigUtilsService.getTableName(config) - new AthenaCatalogueService().dropViewStatement(dbName, tableName) + new AthenaCatalogueService().dropView(dbName, tableName) config.sink.format match { case IOFormat.DELTA => new AthenaCatalogueService().createDeltaTable(dbName, tableName, s3Path)