From e3bc6ad2ce40444b9730b0a820131205ebedd974 Mon Sep 17 00:00:00 2001 From: spenes Date: Fri, 2 Sep 2022 16:43:45 +0300 Subject: [PATCH] Loader: feature flag to disable adding load_tstamp column (close #1041) --- .../loader/databricks/DatabricksSpec.scala | 3 +- .../src/main/resources/application.conf | 3 ++ .../snowplow/rdbloader/Loader.scala | 49 ++++++++++--------- .../snowplow/rdbloader/config/Config.scala | 7 ++- .../snowplow/rdbloader/ConfigSpec.scala | 4 +- .../snowplow/rdbloader/SpecHelpers.scala | 3 +- .../snowplow/loader/redshift/ConfigSpec.scala | 6 ++- .../snowplow/loader/snowflake/Snowflake.scala | 18 +++++-- .../loader/snowflake/ConfigSpec.scala | 6 ++- 9 files changed, 65 insertions(+), 34 deletions(-) diff --git a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala index e31eac365..be2cb912e 100644 --- a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala +++ b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala @@ -140,7 +140,8 @@ object DatabricksSpec { Config.Timeouts(1.minute, 1.minute, 1.minute), Config.Retries(Config.Strategy.Constant, None, 1.minute, None), Config.Retries(Config.Strategy.Constant, None, 1.minute, None), - Config.Retries(Config.Strategy.Constant, None, 1.minute, None) + Config.Retries(Config.Strategy.Constant, None, 1.minute, None), + Config.FeatureFlags(addLoadTstampColumn = true) )).right.get } diff --git a/modules/loader/src/main/resources/application.conf b/modules/loader/src/main/resources/application.conf index 4dd4ddfa4..7fef75dbe 100644 --- a/modules/loader/src/main/resources/application.conf +++ b/modules/loader/src/main/resources/application.conf @@ -26,5 +26,8 @@ "loading": "1 hour", "nonLoading": "10 minutes", "sqsVisibility": "5 minutes" + }, + "featureFlags": { + "addLoadTstampColumn": true } } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala index 01e5f2bd6..31d4e828e 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala @@ -70,13 +70,15 @@ object Loader { val periodicMetrics: Stream[F, Unit] = Monitoring[F].periodicMetrics.report - val blockUntilReady = TargetCheck.blockUntilReady[F, C](config.readyCheck, config.storage) *> + def initRetry(f: F[Unit]) = retryingOnAllErrors(Retry.getRetryPolicy[F](config.initRetries), initRetryLog[F])(f) + + val blockUntilReady = initRetry(TargetCheck.blockUntilReady[F, C](config.readyCheck, config.storage)) *> Logging[F].info("Target check is completed") val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *> Logging[F].info("No operation prepare step is completed") - val manifestInit = retryingOnAllErrors(Retry.getRetryPolicy[F](config.initRetries), initRetryLog[F])(Manifest.initialize[F, C](config.storage)) *> + val manifestInit = initRetry(Manifest.initialize[F, C](config.storage)) *> Logging[F].info("Manifest initialization is completed") - val addLoadTstamp = addLoadTstampColumn[F, C](config.storage) *> + val addLoadTstamp = addLoadTstampColumn[F, C](config.featureFlags.addLoadTstampColumn, config.storage) *> Logging[F].info("Adding load_tstamp column is completed") val init: F[Unit] = blockUntilReady *> noOperationPrepare *> manifestInit *> addLoadTstamp @@ -157,25 +159,28 @@ object Loader { } private def addLoadTstampColumn[F[_]: Transaction[*[_], C]: Monitoring: Logging: MonadThrow, - C[_]: DAO: Monad: Logging](targetConfig: StorageTarget): F[Unit] = { - val f = targetConfig match { - // Adding load_tstamp column explicitly is not needed due to merge schema - // feature of Databricks. It will create missing column itself. - case _: StorageTarget.Databricks => Monad[C].unit - case _ => - for { - allColumns <- DbControl.getColumns[C](EventsTable.MainName) - _ <- if (loadTstampColumnExist(allColumns)) - Logging[C].info("load_tstamp column already exists") - else - DAO[C].executeUpdate(Statement.AddLoadTstampColumn, DAO.Purpose.NonLoading).void *> - Logging[C].info("load_tstamp column is added successfully") - } yield () - } - Transaction[F, C].transact(f).recoverWith { - case e: Throwable => - val err = s"Error while adding load_tstamp column: $e" - Logging[F].error(err) *> Monitoring[F].alert(AlertPayload.error(err)) + C[_]: DAO: Monad: Logging](shouldAdd: Boolean, targetConfig: StorageTarget): F[Unit] = { + if (!shouldAdd) Logging[F].info("Adding load_tstamp is skipped") + else { + val f = targetConfig match { + // Adding load_tstamp column explicitly is not needed due to merge schema + // feature of Databricks. It will create missing column itself. + case _: StorageTarget.Databricks => Monad[C].unit + case _ => + for { + allColumns <- DbControl.getColumns[C](EventsTable.MainName) + _ <- if (loadTstampColumnExist(allColumns)) + Logging[C].info("load_tstamp column already exists") + else + DAO[C].executeUpdate(Statement.AddLoadTstampColumn, DAO.Purpose.NonLoading).void *> + Logging[C].info("load_tstamp column is added successfully") + } yield () + } + Transaction[F, C].transact(f).recoverWith { + case e: Throwable => + val err = s"Error while adding load_tstamp column: ${getErrorMessage(e)}" + Logging[F].error(err) *> Monitoring[F].alert(AlertPayload.error(err)) + } } } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala index 14609e095..31613aac6 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/Config.scala @@ -47,7 +47,8 @@ final case class Config[+D <: StorageTarget](region: Region, timeouts: Timeouts, retries: Retries, readyCheck: Retries, - initRetries: Retries) + initRetries: Retries, + featureFlags: FeatureFlags) object Config { @@ -81,6 +82,7 @@ object Config { final case class RetryQueue(period: FiniteDuration, size: Int, maxAttempts: Int, interval: FiniteDuration) final case class Timeouts(loading: FiniteDuration, nonLoading: FiniteDuration, sqsVisibility: FiniteDuration) final case class Retries(strategy: Strategy, attempts: Option[Int], backoff: FiniteDuration, cumulativeBound: Option[FiniteDuration]) + final case class FeatureFlags(addLoadTstampColumn: Boolean) sealed trait Strategy object Strategy { @@ -168,6 +170,9 @@ object Config { implicit val configDecoder: Decoder[Config[StorageTarget]] = deriveDecoder[Config[StorageTarget]].ensure(validateConfig) + + implicit val featureFlagsConfigDecoder: Decoder[FeatureFlags] = + deriveDecoder[FeatureFlags] } /** Post-decoding validation, making sure different parts are consistent */ diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala index 9bdb1eabf..03de27d68 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala @@ -133,6 +133,7 @@ object ConfigSpec { val exampleReadyCheck: Config.Retries = Config.Retries(Config.Strategy.Constant, None, 15.seconds, None) val exampleTempCreds = StorageTarget.LoadAuthMethod.TempCreds("test_role_arn", "test_role_session_name") val exampleInitRetries: Config.Retries = Config.Retries(Config.Strategy.Exponential, Some(3), 30.seconds, Some(1.hour)) + val exampleFeatureFlags: Config.FeatureFlags = Config.FeatureFlags(addLoadTstampColumn = true) val exampleConfig = Config( exampleRegion, None, @@ -144,7 +145,8 @@ object ConfigSpec { exampleTimeouts, exampleRetries, exampleReadyCheck, - exampleInitRetries + exampleInitRetries, + exampleFeatureFlags ) def getConfig[A](confPath: String, parse: String => EitherT[IO, String, A]): Either[String, A] = diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala index 7fdff4ed6..b482a5458 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/SpecHelpers.scala @@ -41,7 +41,8 @@ object SpecHelpers { ConfigSpec.exampleTimeouts, ConfigSpec.exampleRetries, ConfigSpec.exampleReadyCheck, - ConfigSpec.exampleInitRetries + ConfigSpec.exampleInitRetries, + ConfigSpec.exampleFeatureFlags ) val validCliConfig: CliConfig = CliConfig(validConfig, false, resolverJson) diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala index c19ab7fc5..451b964bd 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/ConfigSpec.scala @@ -38,7 +38,8 @@ class ConfigSpec extends Specification { exampleTimeouts, exampleRetries, exampleReadyCheck, - exampleInitRetries + exampleInitRetries, + exampleFeatureFlags ) result must beRight(expected) } @@ -56,7 +57,8 @@ class ConfigSpec extends Specification { exampleTimeouts, exampleRetries.copy(cumulativeBound = None), exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds), - exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)) + exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)), + exampleFeatureFlags ) result must beRight(expected) } diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala index 181d1d7a7..82299ac67 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala @@ -244,11 +244,21 @@ object Snowflake { private def qualify(tableName: String): String = s"$schema.$tableName" - private def columnsForCopy(columns: ColumnsToCopy): String = - columns.names.map(_.value).mkString(",") + ",load_tstamp" + private def columnsForCopy(columns: ColumnsToCopy): String = { + val columnNames = columns.names.map(_.value).mkString(",") + if (config.featureFlags.addLoadTstampColumn) + columnNames + ",load_tstamp" + else + columnNames + } - private def columnsForSelect(columns: ColumnsToCopy): String = - columns.names.map(c => s"$$1:${c.value}").mkString(",") + ",current_timestamp()" + private def columnsForSelect(columns: ColumnsToCopy): String = { + val columnNames = columns.names.map(c => s"$$1:${c.value}").mkString(",") + if (config.featureFlags.addLoadTstampColumn) + columnNames + ",current_timestamp()" + else + columnNames + } } Right(result) diff --git a/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala index b27d4abc1..5e6866fc1 100644 --- a/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala +++ b/modules/snowflake-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/snowflake/ConfigSpec.scala @@ -44,7 +44,8 @@ class ConfigSpec extends Specification { exampleTimeouts, exampleRetries, exampleReadyCheck, - exampleInitRetries + exampleInitRetries, + exampleFeatureFlags ) result must beRight(expected) } @@ -65,7 +66,8 @@ class ConfigSpec extends Specification { exampleTimeouts, exampleRetries.copy(cumulativeBound = None), exampleReadyCheck.copy(strategy = Config.Strategy.Constant, backoff = 15.seconds), - exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)) + exampleInitRetries.copy(attempts = None, cumulativeBound = Some(10.minutes)), + exampleFeatureFlags ) result must beRight(expected) }