From 909925363874f5c772bfb48d404f9f7afbc80a2d Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Mon, 18 Oct 2021 18:01:26 +0300 Subject: [PATCH] RDB Loader: drop a root folder from folders monitoring (close #602) --- .../rdbloader/dsl/FolderMonitoring.scala | 49 ++++++++++--------- .../rdbloader/dsl/FolderMonitoringSpec.scala | 8 +-- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala index 0ecfe5ccc..18f90e8f3 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoring.scala @@ -76,15 +76,14 @@ object FolderMonitoring { * (no trailing slash); keys of a wrong format won't be filtered out * @return false if folder is old enough, true otherwise */ - def isRecent(since: Option[FiniteDuration], now: Instant)(key: S3.Key): Boolean = + def isRecent(since: Option[FiniteDuration], now: Instant)(folder: S3.Folder): Boolean = since match { case Some(duration) => - Either.catchOnly[DateTimeParseException](LogTimeFormatter.parse(key.takeRight(TimePattern.size))) match { + Either.catchOnly[DateTimeParseException](LogTimeFormatter.parse(folder.stripSuffix("/").takeRight(TimePattern.size))) match { case Right(accessor) => val oldest = now.minusMillis(duration.toMillis) accessor.query(Instant.from).isAfter(oldest) case Left(_) => - // TODO: check if list returns keys without trailing slash true } case None => true @@ -96,18 +95,21 @@ object FolderMonitoring { * @param since optional duration to ignore old folders * @param input shredded archive * @param output temp staging path to store the list + * @return whether the list was non-empty (true) or empty (false) */ - def sinkFolders[F[_]: Sync: Timer: Logging: AWS](since: Option[FiniteDuration], input: S3.Folder, output: S3.Folder): F[Unit] = - Stream.eval((Ref.of(0), Timer[F].clock.instantNow).tupled).flatMap { case (ref, now) => - AWS[F].listS3(input, recursive = false) - .map(_.key) - .filter(isRecent(since, now)) - .evalTap(_ => ref.update(size => size + 1)) - .intersperse("\n") - .through(utf8Encode[F]) - .through(AWS[F].sinkS3(output.withKey("keys"), true)) - .onFinalize(ref.get.flatMap(size => Logging[F].info(s"Saved $size folders from $input in $output"))) - }.compile.drain + def sinkFolders[F[_]: Sync: Timer: Logging: AWS](since: Option[FiniteDuration], input: S3.Folder, output: S3.Folder): F[Boolean] = + Ref.of[F, Int](0).flatMap { ref => + Stream.eval(Timer[F].clock.instantNow).flatMap { now => + AWS[F].listS3(input, recursive = false) + .mapFilter(blob => if (blob.key.endsWith("/") && blob.key != input) S3.Folder.parse(blob.key).toOption else None) // listS3 returns the root dir as well + .filter(isRecent(since, now)) + .evalTap(_ => ref.update(size => size + 1)) + .intersperse("\n") + .through(utf8Encode[F]) + .through(AWS[F].sinkS3(output.withKey("keys"), true)) + .onFinalize(ref.get.flatMap(size => Logging[F].info(s"Saved $size folders from $input in $output"))) + }.compile.drain *> ref.get.map(size => size != 0) + } /** @@ -189,14 +191,17 @@ object FolderMonitoring { archive: S3.Folder): Stream[F, Unit] = Stream.eval(Ref.of(false)).flatMap { failed => getOutputKeys[F](folders).evalMap { outputFolder => - val sinkAndCheck = sinkFolders[F](folders.since, archive, outputFolder) *> - check[F](outputFolder, storage) - .rethrowT - .flatMap { alerts => - alerts.traverse_ { payload => - Monitoring[F].alert(payload) *> Logging[F].warning(s"${payload.message} ${payload.base}") - } - } *> failed.set(false) + val sinkAndCheck = + sinkFolders[F](folders.since, archive, outputFolder).ifM( + check[F](outputFolder, storage) + .rethrowT + .flatMap { alerts => + alerts.traverse_ { payload => + Monitoring[F].alert(payload) *> Logging[F].warning(s"${payload.message} ${payload.base}") + } + }, + Logging[F].info(s"No folders were found in ${archive}. Skipping manifest check") + ) *> failed.set(false) Logging[F].info("Monitoring shredded folders") *> sinkAndCheck.handleErrorWith { error => diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala index f51b11b90..0ee26daa8 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/FolderMonitoringSpec.scala @@ -102,21 +102,21 @@ class FolderMonitoringSpec extends Specification { "isRecent" should { "return true if no duration is provided" in { - val input = S3.Key.parse("s3://bucket/key").getOrElse(throw new RuntimeException("Wrong key")) + val input = S3.Folder.parse("s3://bucket/key/").getOrElse(throw new RuntimeException("Wrong key")) val result = FolderMonitoring.isRecent(None, Instant.now())(input) result must beTrue } "return true if invalid key is provided" in { val duration = FiniteDuration.apply(1, "day") - val input = S3.Key.parse("s3://bucket/key").getOrElse(throw new RuntimeException("Wrong key")) + val input = S3.Folder.parse("s3://bucket/key/").getOrElse(throw new RuntimeException("Wrong key")) val result = FolderMonitoring.isRecent(Some(duration), Instant.now())(input) result must beTrue } "return false if key is old enough" in { val duration = FiniteDuration.apply(1, "day") - val input = S3.Key.parse("s3://bucket/run=2020-09-01-00-00-00").getOrElse(throw new RuntimeException("Wrong key")) + val input = S3.Folder.parse("s3://bucket/run=2020-09-01-00-00-00/").getOrElse(throw new RuntimeException("Wrong key")) val result = FolderMonitoring.isRecent(Some(duration), Instant.now())(input) result must beFalse } @@ -124,7 +124,7 @@ class FolderMonitoringSpec extends Specification { "return true if key is fresh enough" in { val duration = FiniteDuration.apply(1, "day") val now = Instant.parse("2021-10-30T18:35:24.00Z") - val input = S3.Key.parse("s3://bucket/run=2021-10-30-00-00-00").getOrElse(throw new RuntimeException("Wrong key")) + val input = S3.Folder.parse("s3://bucket/run=2021-10-30-00-00-00/").getOrElse(throw new RuntimeException("Wrong key")) val result = FolderMonitoring.isRecent(Some(duration), now)(input) result must beTrue }