Skip to content

Commit

Permalink
run drop view query
Browse files Browse the repository at this point in the history
  • Loading branch information
browniecode93 committed Oct 16, 2023
1 parent bae2ddf commit 1a33634
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging {
s"$dbName.$tableName"
}

def dropViewStatement(dbName: String, viewName: String): String = {
def dropView(dbName: String, viewName: String) = {

s"DROP VIEW IF EXISTS " +
val statement = s"DROP VIEW IF EXISTS " +
s"$dbName.$viewName"

dropTable(dbName, viewName, statement)

}

def createDeltaTable(dbName:String, tableName:String, location: String, recreate: Boolean = false) = {

if(recreate) {
dropDeltaTable(dbName, tableName)
val delete_statement = dropTableStatement(dbName, tableName)
dropTable(dbName, tableName, delete_statement)
}

val statement = createTableStatement(dbName, tableName, location)
Expand All @@ -59,9 +63,8 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging {

}

private def dropDeltaTable(dbName: String, tableName: String) = {
private def dropTable(dbName: String, tableName: String, statement: String) = {

val statement = dropTableStatement(dbName, tableName)
logger.info(s"Drop table statement for ${dbName}.${tableName} is ${statement}")
val queryExecutionContext = new QueryExecutionContext().withDatabase(dbName)
//val resultConfiguration = new ResultConfiguration().withOutputLocation(path)
Expand All @@ -71,8 +74,8 @@ class AthenaCatalogueService(implicit val region: Regions) extends Logging {
.withQueryExecutionContext(queryExecutionContext)
//.withResultConfiguration(resultConfiguration)
val queryExecutionId = athenaClient.startQueryExecution(startQueryExecutionRequest).getQueryExecutionId
logger.info(s"Table ${dbName}.${tableName} has been created")
val getQueryResultsRequest = new GetQueryResultsRequest().withQueryExecutionId(queryExecutionId)
logger.info(s"Query result is: ${getQueryResultsRequest}")

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class MetabolicApp(sparkBuilder: SparkSession.Builder) extends Logging {
val prefix = ConfigUtilsService.getTablePrefix(options.namespaces, s3Path)
val tableName = prefix+ConfigUtilsService.getTableName(config)

new AthenaCatalogueService().dropViewStatement(dbName, tableName)
new AthenaCatalogueService().dropView(dbName, tableName)

config.sink.format match {
case IOFormat.DELTA => new AthenaCatalogueService().createDeltaTable(dbName, tableName, s3Path)
Expand Down

0 comments on commit 1a33634

Please sign in to comment.