Skip to content

Commit

Permalink
Loader: feature flag to disable adding load_tstamp column (close #1041)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Sep 5, 2022
1 parent 592cbdb commit e3bc6ad
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ object DatabricksSpec {
Config.Timeouts(1.minute, 1.minute, 1.minute),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None)
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.FeatureFlags(addLoadTstampColumn = true)
)).right.get

}
3 changes: 3 additions & 0 deletions modules/loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@
"loading": "1 hour",
"nonLoading": "10 minutes",
"sqsVisibility": "5 minutes"
},
"featureFlags": {
"addLoadTstampColumn": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ object Loader {
val periodicMetrics: Stream[F, Unit] =
Monitoring[F].periodicMetrics.report

val blockUntilReady = TargetCheck.blockUntilReady[F, C](config.readyCheck, config.storage) *>
def initRetry(f: F[Unit]) = retryingOnAllErrors(Retry.getRetryPolicy[F](config.initRetries), initRetryLog[F])(f)

val blockUntilReady = initRetry(TargetCheck.blockUntilReady[F, C](config.readyCheck, config.storage)) *>
Logging[F].info("Target check is completed")
val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *>
Logging[F].info("No operation prepare step is completed")
val manifestInit = retryingOnAllErrors(Retry.getRetryPolicy[F](config.initRetries), initRetryLog[F])(Manifest.initialize[F, C](config.storage)) *>
val manifestInit = initRetry(Manifest.initialize[F, C](config.storage)) *>
Logging[F].info("Manifest initialization is completed")
val addLoadTstamp = addLoadTstampColumn[F, C](config.storage) *>
val addLoadTstamp = addLoadTstampColumn[F, C](config.featureFlags.addLoadTstampColumn, config.storage) *>
Logging[F].info("Adding load_tstamp column is completed")

val init: F[Unit] = blockUntilReady *> noOperationPrepare *> manifestInit *> addLoadTstamp
Expand Down Expand Up @@ -157,25 +159,28 @@ object Loader {
}

private def addLoadTstampColumn[F[_]: Transaction[*[_], C]: Monitoring: Logging: MonadThrow,
C[_]: DAO: Monad: Logging](targetConfig: StorageTarget): F[Unit] = {
val f = targetConfig match {
// Adding load_tstamp column explicitly is not needed due to merge schema
// feature of Databricks. It will create missing column itself.
case _: StorageTarget.Databricks => Monad[C].unit
case _ =>
for {
allColumns <- DbControl.getColumns[C](EventsTable.MainName)
_ <- if (loadTstampColumnExist(allColumns))
Logging[C].info("load_tstamp column already exists")
else
DAO[C].executeUpdate(Statement.AddLoadTstampColumn, DAO.Purpose.NonLoading).void *>
Logging[C].info("load_tstamp column is added successfully")
} yield ()
}
Transaction[F, C].transact(f).recoverWith {
case e: Throwable =>
val err = s"Error while adding load_tstamp column: $e"
Logging[F].error(err) *> Monitoring[F].alert(AlertPayload.error(err))
C[_]: DAO: Monad: Logging](shouldAdd: Boolean, targetConfig: StorageTarget): F[Unit] = {
if (!shouldAdd) Logging[F].info("Adding load_tstamp is skipped")
else {
val f = targetConfig match {
// Adding load_tstamp column explicitly is not needed due to merge schema
// feature of Databricks. It will create missing column itself.
case _: StorageTarget.Databricks => Monad[C].unit
case _ =>
for {
allColumns <- DbControl.getColumns[C](EventsTable.MainName)
_ <- if (loadTstampColumnExist(allColumns))
Logging[C].info("load_tstamp column already exists")
else
DAO[C].executeUpdate(Statement.AddLoadTstampColumn, DAO.Purpose.NonLoading).void *>
Logging[C].info("load_tstamp column is added successfully")
} yield ()
}
Transaction[F, C].transact(f).recoverWith {
case e: Throwable =>
val err = s"Error while adding load_tstamp column: ${getErrorMessage(e)}"
Logging[F].error(err) *> Monitoring[F].alert(AlertPayload.error(err))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ final case class Config[+D <: StorageTarget](region: Region,
timeouts: Timeouts,
retries: Retries,
readyCheck: Retries,
initRetries: Retries)
initRetries: Retries,
featureFlags: FeatureFlags)

object Config {

Expand Down Expand Up @@ -81,6 +82,7 @@ object Config {
final case class RetryQueue(period: FiniteDuration, size: Int, maxAttempts: Int, interval: FiniteDuration)
final case class Timeouts(loading: FiniteDuration, nonLoading: FiniteDuration, sqsVisibility: FiniteDuration)
final case class Retries(strategy: Strategy, attempts: Option[Int], backoff: FiniteDuration, cumulativeBound: Option[FiniteDuration])
final case class FeatureFlags(addLoadTstampColumn: Boolean)

sealed trait Strategy
object Strategy {
Expand Down Expand Up @@ -168,6 +170,9 @@ object Config {

implicit val configDecoder: Decoder[Config[StorageTarget]] =
deriveDecoder[Config[StorageTarget]].ensure(validateConfig)

implicit val featureFlagsConfigDecoder: Decoder[FeatureFlags] =
deriveDecoder[FeatureFlags]
}

/** Post-decoding validation, making sure different parts are consistent */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ object ConfigSpec {
val exampleReadyCheck: Config.Retries = Config.Retries(Config.Strategy.Constant, None, 15.seconds, None)
val exampleTempCreds = StorageTarget.LoadAuthMethod.TempCreds("test_role_arn", "test_role_session_name")
val exampleInitRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour))
val exampleFeatureFlags: Config.FeatureFlags = Config.FeatureFlags(addLoadTstampColumn = true)
val exampleConfig = Config(
exampleRegion,
None,
Expand All @@ -144,7 +145,8 @@ object ConfigSpec {
exampleTimeouts,
exampleRetries,
exampleReadyCheck,
exampleInitRetries
exampleInitRetries,
exampleFeatureFlags
)

def getConfig[A](confPath: String, parse: String => EitherT[IO, String, A]): Either[String, A] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ object SpecHelpers {
ConfigSpec.exampleTimeouts,
ConfigSpec.exampleRetries,
ConfigSpec.exampleReadyCheck,
ConfigSpec.exampleInitRetries
ConfigSpec.exampleInitRetries,
ConfigSpec.exampleFeatureFlags
)
val validCliConfig: CliConfig = CliConfig(validConfig, false, resolverJson)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class ConfigSpec extends Specification {
exampleTimeouts,
exampleRetries,
exampleReadyCheck,
exampleInitRetries
exampleInitRetries,
exampleFeatureFlags
)
result must beRight(expected)
}
Expand All @@ -56,7 +57,8 @@ class ConfigSpec extends Specification {
exampleTimeouts,
exampleRetries.copy(cumulativeBound = None),
exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds),
exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes))
exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)),
exampleFeatureFlags
)
result must beRight(expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,21 @@ object Snowflake {
private def qualify(tableName: String): String =
s"$schema.$tableName"

private def columnsForCopy(columns: ColumnsToCopy): String =
columns.names.map(_.value).mkString(",") + ",load_tstamp"
private def columnsForCopy(columns: ColumnsToCopy): String = {
val columnNames = columns.names.map(_.value).mkString(",")
if (config.featureFlags.addLoadTstampColumn)
columnNames + ",load_tstamp"
else
columnNames
}

private def columnsForSelect(columns: ColumnsToCopy): String =
columns.names.map(c => s"$$1:${c.value}").mkString(",") + ",current_timestamp()"
private def columnsForSelect(columns: ColumnsToCopy): String = {
val columnNames = columns.names.map(c => s"$$1:${c.value}").mkString(",")
if (config.featureFlags.addLoadTstampColumn)
columnNames + ",current_timestamp()"
else
columnNames
}
}

Right(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class ConfigSpec extends Specification {
exampleTimeouts,
exampleRetries,
exampleReadyCheck,
exampleInitRetries
exampleInitRetries,
exampleFeatureFlags
)
result must beRight(expected)
}
Expand All @@ -65,7 +66,8 @@ class ConfigSpec extends Specification {
exampleTimeouts,
exampleRetries.copy(cumulativeBound = None),
exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds),
exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes))
exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)),
exampleFeatureFlags
)
result must beRight(expected)
}
Expand Down

0 comments on commit e3bc6ad

Please sign in to comment.