Skip to content

Commit

Permalink
Loader: Create the database schema on startup (close #1266)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Jun 7, 2023
1 parent 5e88175 commit 3914d7a
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ object Databricks {
case Statement.VacuumManifest => sql"""
OPTIMIZE ${Fragment.const0(qualify(Manifest.Name))}
ZORDER BY base"""
case Statement.CreateDbSchema =>
val schema = tgt.catalog.map(c => s"$c.${tgt.schema}").getOrElse(tgt.schema)
sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(schema)}"""
}

private def qualify(tableName: String): String = tgt.catalog match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.snowplowanalytics.snowplow.rdbloader

import java.sql.SQLException
import scala.concurrent.duration._
import cats.{Apply, Monad, MonadThrow}
import cats.implicits._
Expand Down Expand Up @@ -112,9 +113,15 @@ object Loader {
val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *>
Logging[F].info("No operation prepare step is completed")

val eventsTableInit = initRetry(createEventsTable[F, C](target)).onError { case t: Throwable =>
Monitoring[F].alert(Alert.FailedToCreateEventsTable(t))
} *> Logging[F].info("Events table initialization is completed")
val dbSchemaInit = createDbSchema[F, C]
.onError { case t: Throwable =>
Monitoring[F].alert(Alert.FailedToCreateDatabaseSchema(t))
} *> Logging[F].info("Database schema initialization is completed")

val eventsTableInit = createEventsTable[F, C](target)
.onError { case t: Throwable =>
Monitoring[F].alert(Alert.FailedToCreateEventsTable(t))
} *> Logging[F].info("Events table initialization is completed")

val manifestInit = initRetry(Manifest.initialize[F, C, I](config.storage, target)).onError { case t: Throwable =>
Monitoring[F].alert(Alert.FailedToCreateManifestTable(t))
Expand All @@ -123,7 +130,10 @@ object Loader {
val addLoadTstamp = addLoadTstampColumn[F, C](config.featureFlags.addLoadTstampColumn, config.storage) *>
Logging[F].info("Adding load_tstamp column is completed")

val init: F[I] = blockUntilReady *> noOperationPrepare *> eventsTableInit *> manifestInit *> addLoadTstamp *> initQuery[F, C, I](target)
val init: F[I] =
blockUntilReady *> noOperationPrepare *> dbSchemaInit *> eventsTableInit *> manifestInit *> addLoadTstamp *> initQuery[F, C, I](
target
)

val process = Stream.eval(init).flatMap { initQueryResult =>
loading(initQueryResult)
Expand Down Expand Up @@ -281,8 +291,37 @@ object Loader {
Monitoring[F].alert(Alert.FailedInitialConnection(t))
}

private def createEventsTable[F[_]: Transaction[*[_], C], C[_]: DAO: Monad](target: Target[_]): F[Unit] =
Transaction[F, C].transact(DAO[C].executeUpdate(target.getEventTable, DAO.Purpose.NonLoading).void)
private def isSQLPermissionError(t: Throwable): Boolean =
t match {
case s: SQLException =>
Option(s.getSQLState) match {
case Some("42501") => true // Sql state for insufficient permissions for Databricks, Snowflake and Redshift
case _ => false
}
case _ => false
}

private def createEventsTable[F[_]: Transaction[*[_], C], C[_]: DAO: MonadThrow: Logging](target: Target[_]): F[Unit] =
Transaction[F, C].transact {
DAO[C]
.executeUpdate(target.getEventTable, DAO.Purpose.NonLoading)
.void
.recoverWith {
case t: Throwable if isSQLPermissionError(t) =>
Logging[C].warning(s"Failed to create events table due to permission error: ${getErrorMessage(t)}")
}
}

private def createDbSchema[F[_]: Transaction[*[_], C], C[_]: DAO: MonadThrow: Logging]: F[Unit] =
Transaction[F, C].transact {
DAO[C]
.executeUpdate(Statement.CreateDbSchema, DAO.Purpose.NonLoading)
.void
.recoverWith {
case t: Throwable if isSQLPermissionError(t) =>
Logging[C].warning(s"Failed to create database schema due to permission error: ${getErrorMessage(t)}")
}
}

/**
* Last level of failure handling, called when non-loading stream fail. Called on an application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object Statement {
case class CreateTable(ddl: Fragment) extends Statement
case class AlterTable(ddl: Fragment) extends Statement
case class DdlFile(ddl: Fragment) extends Statement
case object CreateDbSchema extends Statement

// Optimize (housekeeping i.e. vacuum in redshift, optimize in databricks)
case object VacuumManifest extends Statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object Alert {
case class FailedInitialConnection(cause: Throwable) extends Alert
case class FailedToCreateEventsTable(cause: Throwable) extends Alert
case class FailedToCreateManifestTable(cause: Throwable) extends Alert
case class FailedToCreateDatabaseSchema(cause: Throwable) extends Alert
case class FailedHealthCheck(cause: Throwable) extends Alert
case class FailedFolderMonitoring(cause: Throwable, numFailures: Int) extends Alert

Expand All @@ -52,6 +53,7 @@ object Alert {
case FailedInitialConnection(_) => None
case FailedToCreateEventsTable(_) => None
case FailedToCreateManifestTable(_) => None
case FailedToCreateDatabaseSchema(_) => None
case FailedHealthCheck(_) => None
case FailedFolderMonitoring(_, _) => None
}
Expand All @@ -74,6 +76,7 @@ object Alert {
case FailedInitialConnection(_) => Severity.Error
case FailedToCreateEventsTable(_) => Severity.Error
case FailedToCreateManifestTable(_) => Severity.Error
case FailedToCreateDatabaseSchema(_) => Severity.Error
}

def getMessage(am: Alert): String = {
Expand All @@ -88,6 +91,7 @@ object Alert {
case FailedInitialConnection(t) => show"Failed to get connection at startup: $t"
case FailedToCreateEventsTable(t) => show"Failed to create events table: $t"
case FailedToCreateManifestTable(t) => show"Failed to create manifest table: $t"
case FailedToCreateDatabaseSchema(t) => show"Failed to create database schema: $t"
case FailedHealthCheck(t) => show"DB failed health check: $t"
case FailedFolderMonitoring(t, numFailures) => show"Folder monitoring failed $numFailures times in a row: $t"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ object Redshift {
case Statement.AddLoadTstampColumn =>
sql"""ALTER TABLE ${Fragment.const0(EventsTable.withSchema(schema))}
ADD COLUMN load_tstamp TIMESTAMP DEFAULT GETDATE() NULL"""
case Statement.CreateDbSchema =>
sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(schema)}"""

case Statement.CreateTable(ddl) =>
ddl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ object Snowflake {
case Statement.AddLoadTstampColumn =>
sql"""ALTER TABLE ${Fragment.const0(EventsTable.withSchema(schema))}
ADD COLUMN load_tstamp TIMESTAMP NULL"""
case Statement.CreateDbSchema =>
sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(schema)}"""

case Statement.CreateTable(ddl) =>
ddl
Expand Down

0 comments on commit 3914d7a

Please sign in to comment.