Skip to content

Commit

Permalink
RDB Loader: drop a root folder from folders monitoring (close #602)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 20, 2021
1 parent f8a7581 commit 9099253
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@ object FolderMonitoring {
* (no trailing slash); keys of a wrong format won't be filtered out
* @return false if folder is old enough, true otherwise
*/
def isRecent(since: Option[FiniteDuration], now: Instant)(key: S3.Key): Boolean =
def isRecent(since: Option[FiniteDuration], now: Instant)(folder: S3.Folder): Boolean =
since match {
case Some(duration) =>
Either.catchOnly[DateTimeParseException](LogTimeFormatter.parse(key.takeRight(TimePattern.size))) match {
Either.catchOnly[DateTimeParseException](LogTimeFormatter.parse(folder.stripSuffix("/").takeRight(TimePattern.size))) match {
case Right(accessor) =>
val oldest = now.minusMillis(duration.toMillis)
accessor.query(Instant.from).isAfter(oldest)
case Left(_) =>
// TODO: check if list returns keys without trailing slash
true
}
case None => true
Expand All @@ -96,18 +95,21 @@ object FolderMonitoring {
* @param since optional duration to ignore old folders
* @param input shredded archive
* @param output temp staging path to store the list
* @return whether the list was non-empty (true) or empty (false)
*/
def sinkFolders[F[_]: Sync: Timer: Logging: AWS](since: Option[FiniteDuration], input: S3.Folder, output: S3.Folder): F[Unit] =
Stream.eval((Ref.of(0), Timer[F].clock.instantNow).tupled).flatMap { case (ref, now) =>
AWS[F].listS3(input, recursive = false)
.map(_.key)
.filter(isRecent(since, now))
.evalTap(_ => ref.update(size => size + 1))
.intersperse("\n")
.through(utf8Encode[F])
.through(AWS[F].sinkS3(output.withKey("keys"), true))
.onFinalize(ref.get.flatMap(size => Logging[F].info(s"Saved $size folders from $input in $output")))
}.compile.drain
def sinkFolders[F[_]: Sync: Timer: Logging: AWS](since: Option[FiniteDuration], input: S3.Folder, output: S3.Folder): F[Boolean] =
Ref.of[F, Int](0).flatMap { ref =>
Stream.eval(Timer[F].clock.instantNow).flatMap { now =>
AWS[F].listS3(input, recursive = false)
.mapFilter(blob => if (blob.key.endsWith("/") && blob.key != input) S3.Folder.parse(blob.key).toOption else None) // listS3 returns the root dir as well
.filter(isRecent(since, now))
.evalTap(_ => ref.update(size => size + 1))
.intersperse("\n")
.through(utf8Encode[F])
.through(AWS[F].sinkS3(output.withKey("keys"), true))
.onFinalize(ref.get.flatMap(size => Logging[F].info(s"Saved $size folders from $input in $output")))
}.compile.drain *> ref.get.map(size => size != 0)
}


/**
Expand Down Expand Up @@ -189,14 +191,17 @@ object FolderMonitoring {
archive: S3.Folder): Stream[F, Unit] =
Stream.eval(Ref.of(false)).flatMap { failed =>
getOutputKeys[F](folders).evalMap { outputFolder =>
val sinkAndCheck = sinkFolders[F](folders.since, archive, outputFolder) *>
check[F](outputFolder, storage)
.rethrowT
.flatMap { alerts =>
alerts.traverse_ { payload =>
Monitoring[F].alert(payload) *> Logging[F].warning(s"${payload.message} ${payload.base}")
}
} *> failed.set(false)
val sinkAndCheck =
sinkFolders[F](folders.since, archive, outputFolder).ifM(
check[F](outputFolder, storage)
.rethrowT
.flatMap { alerts =>
alerts.traverse_ { payload =>
Monitoring[F].alert(payload) *> Logging[F].warning(s"${payload.message} ${payload.base}")
}
},
Logging[F].info(s"No folders were found in ${archive}. Skipping manifest check")
) *> failed.set(false)

Logging[F].info("Monitoring shredded folders") *>
sinkAndCheck.handleErrorWith { error =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,29 @@ class FolderMonitoringSpec extends Specification {

"isRecent" should {
"return true if no duration is provided" in {
val input = S3.Key.parse("s3://bucket/key").getOrElse(throw new RuntimeException("Wrong key"))
val input = S3.Folder.parse("s3://bucket/key/").getOrElse(throw new RuntimeException("Wrong key"))
val result = FolderMonitoring.isRecent(None, Instant.now())(input)
result must beTrue
}

"return true if invalid key is provided" in {
val duration = FiniteDuration.apply(1, "day")
val input = S3.Key.parse("s3://bucket/key").getOrElse(throw new RuntimeException("Wrong key"))
val input = S3.Folder.parse("s3://bucket/key/").getOrElse(throw new RuntimeException("Wrong key"))
val result = FolderMonitoring.isRecent(Some(duration), Instant.now())(input)
result must beTrue
}

"return false if key is old enough" in {
val duration = FiniteDuration.apply(1, "day")
val input = S3.Key.parse("s3://bucket/run=2020-09-01-00-00-00").getOrElse(throw new RuntimeException("Wrong key"))
val input = S3.Folder.parse("s3://bucket/run=2020-09-01-00-00-00/").getOrElse(throw new RuntimeException("Wrong key"))
val result = FolderMonitoring.isRecent(Some(duration), Instant.now())(input)
result must beFalse
}

"return true if key is fresh enough" in {
val duration = FiniteDuration.apply(1, "day")
val now = Instant.parse("2021-10-30T18:35:24.00Z")
val input = S3.Key.parse("s3://bucket/run=2021-10-30-00-00-00").getOrElse(throw new RuntimeException("Wrong key"))
val input = S3.Folder.parse("s3://bucket/run=2021-10-30-00-00-00/").getOrElse(throw new RuntimeException("Wrong key"))
val result = FolderMonitoring.isRecent(Some(duration), now)(input)
result must beTrue
}
Expand Down

0 comments on commit 9099253

Please sign in to comment.