From 60d38d267e131da5dbd939c64872c122c68e87a6 Mon Sep 17 00:00:00 2001 From: Jean-Luc Deprez Date: Tue, 12 Oct 2021 16:08:05 +0200 Subject: [PATCH 01/10] make bloop compilable --- .gitignore | 4 ++++ .vscode/settings.json | 8 ++++++++ build.sbt | 4 ++-- metals.sbt | 1 + project/Publish.scala | 5 +---- project/build.properties | 2 +- project/esko.sbt | 1 + project/plugins.sbt | 8 ++++---- 8 files changed, 22 insertions(+), 11 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 metals.sbt create mode 100644 project/esko.sbt diff --git a/.gitignore b/.gitignore index 4c44cdd..9be9696 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,7 @@ project/project/target .settings/ .cache .aws + + +.bloop +.metals \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..0603b4a --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "files.watcherExclude": { + "**/target": true + }, + "metals.bloopSbtAlreadyInstalled": true, + // "editor.formatOnSave": true, + "java.server.launchMode": "LightWeight", +} \ No newline at end of file diff --git a/build.sbt b/build.sbt index 9c39f4d..7e4ba25 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ name := "akka-persistence-dynamodb" -scalaVersion := "2.11.8" -crossScalaVersions := Seq("2.11.8", "2.12.3") +scalaVersion := "2.11.12" +crossScalaVersions := Seq("2.11.12", "2.12.10") crossVersion := CrossVersion.binary val akkaVersion = "2.4.20" diff --git a/metals.sbt b/metals.sbt new file mode 100644 index 0000000..6fd9f2c --- /dev/null +++ b/metals.sbt @@ -0,0 +1 @@ +bloopExportJarClassifiers in Global := Some(Set("sources")) \ No newline at end of file diff --git a/project/Publish.scala b/project/Publish.scala index 01d0d85..a75c703 100644 --- a/project/Publish.scala +++ b/project/Publish.scala @@ -7,7 +7,6 @@ import sbt._ import sbt.Keys._ import java.io.File import sbtrelease.ReleasePlugin.autoImport.releasePublishArtifactsAction -import com.typesafe.sbt.pgp.PgpKeys object Publish extends AutoPlugin { @@ -28,9 +27,7 @@ object Publish extends AutoPlugin { homepage := Some(url("https://github.com/akka/akka-persistence-dynamodb")), publishMavenStyle := true, pomIncludeRepository := { x => false }, - defaultPublishTo := crossTarget.value / "repository", - releasePublishArtifactsAction := PgpKeys.publishSigned.value - ) + defaultPublishTo := crossTarget.value / "repository") def akkaPomExtra = { diff --git a/project/build.properties b/project/build.properties index 35c88ba..64cf32f 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.12 +sbt.version=1.1.4 diff --git a/project/esko.sbt b/project/esko.sbt new file mode 100644 index 0000000..1995ef9 --- /dev/null +++ b/project/esko.sbt @@ -0,0 +1 @@ +addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.4.6") \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 88c353c..e030d11 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,10 +1,10 @@ -resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/" +//resolvers += "Typesafe repository" at "https://repo.typesafe.com/typesafe/maven-releases/" -addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.6.0") +addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.0") -addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3") +addSbtPlugin("com.github.sbt" % "sbt-release" % "1.0.15") -addSbtPlugin("com.typesafe.sbt" % "sbt-pgp" % "0.8.3") +addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.1.2") addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.7") From 5705670f66b4e0f6c4f97800236883b98f9cce41 Mon Sep 17 00:00:00 2001 From: Jean-Luc Deprez Date: Tue, 12 Oct 2021 20:45:52 +0200 Subject: [PATCH 02/10] disable scalariform autoformat --- .scalariform.conf | 1 + 1 file changed, 1 insertion(+) create mode 100644 .scalariform.conf diff --git a/.scalariform.conf b/.scalariform.conf new file mode 100644 index 0000000..5c109e9 --- /dev/null +++ b/.scalariform.conf @@ -0,0 +1 @@ +autoformat=false \ No newline at end of file From 2567a6b87a2316b73f3826d94f0a7e9a385b6d52 Mon Sep 17 00:00:00 2001 From: Jean-Luc Deprez Date: Tue, 12 Oct 2021 22:07:34 +0200 Subject: [PATCH 03/10] akka/akka-persistence-dynamodb#98 test that fails on issue --- .../journal/PersistAllConsistencySpec.scala | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala diff --git a/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala new file mode 100644 index 0000000..29d1a79 --- /dev/null +++ b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2021 Typesafe Inc. + */ +package akka.persistence.dynamodb.journal + +import org.scalactic.ConversionCheckedTripleEquals +import org.scalatest._ +import org.scalatest.concurrent.ScalaFutures +import akka.actor.ActorSystem +import akka.persistence._ +import akka.persistence.JournalProtocol._ +import akka.testkit._ +import akka.persistence.journal.AsyncWriteTarget.ReplaySuccess +import com.amazonaws.services.dynamodbv2.model._ +import java.util.{ HashMap => JHMap } +import akka.persistence.dynamodb._ + +class PersistAllConsistencySpec extends TestKit(ActorSystem("PersistAllConsistencySpec")) + with ImplicitSender + with WordSpecLike + with BeforeAndAfterAll + with Matchers + with ScalaFutures + with ConversionCheckedTripleEquals + with DynamoDBUtils { + + override def beforeAll(): Unit = ensureJournalTableExists() + override def afterAll(): Unit = { + client.shutdown() + system.terminate().futureValue + } + + override val persistenceId = "PersistAllConsistencySpec" + lazy val journal = Persistence(system).journalFor("") + + import settings._ + + "DynamoDB Journal (persistAll)" must { + + "recover correctly if the first write is a batch" in { + journal ! Purge(persistenceId, testActor) + expectMsg(Purged(persistenceId)) + + val start = nextSeqNr + val end = 10 + println(s"start: ${start}; end: ${end}") + val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil + + journal ! WriteMessages(padding, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start to end) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + + journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor) + (start to end) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i)))) + expectMsg(RecoverySuccess(end)) + } + + for (t <- Seq(("last", 3), ("middle", 2), ("first", 1))) s"correctly cross page boundaries with AtomicWrite position ${t._1}" in { + val start1 = nextSeqNr + val end1 = ((start1 / 100) + 1) * 100 - t._2 + println(s"start: ${start1}; end: ${end1}") + val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil + + journal ! WriteMessages(padding, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start1 to end1) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + + val start2 = nextSeqNr + val end2 = start2 + 2 + println(s"start: ${start2}; end: ${end2}") + val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil + + journal ! WriteMessages(subject, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start2 to end2) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + + journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor) + (start1 to end2) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i)))) + expectMsg(RecoverySuccess(end2)) + } + + } + +} From 0fd638335d2e265d8cebedb0c1fd1ed0cc25f751 Mon Sep 17 00:00:00 2001 From: Jean-Luc Deprez Date: Tue, 12 Oct 2021 17:02:23 +0200 Subject: [PATCH 04/10] akka/akka-persistence-dynamodb#98 minimalistic fix --- .../persistence/dynamodb/journal/DynamoDBJournalRequests.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala index b91e531..5bb477a 100644 --- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala +++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala @@ -85,7 +85,7 @@ trait DynamoDBJournalRequests extends DynamoDBRequests { item.put(AtomIndex, N(index)) item.put(AtomEnd, size) putReq(item) - } ++ (if (low / 100 != high / 100) Some(putReq(toHSItem(id, high))) else None) + } ++ (if ((low - 1) / 100 != high / 100) Some(putReq(toHSItem(id, high))) else None) val futures = writes.grouped(MaxBatchWrite).map { batch => From 9bb07fa6d4671a9aea30f387add3c96dfe9ce0e2 Mon Sep 17 00:00:00 2001 From: Jean-Luc Deprez Date: Tue, 12 Oct 2021 20:57:33 +0200 Subject: [PATCH 05/10] akka/akka-persistence-dynamodb#98 tail chasing fix --- src/main/resources/reference.conf | 22 ++++++++ .../journal/DynamoDBJournalConfig.scala | 5 ++ .../dynamodb/journal/DynamoDBRecovery.scala | 52 ++++++++++++++++--- src/test/resources/application.conf | 1 + 4 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 72125b0..5c87219 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -120,6 +120,28 @@ dynamodb-journal { parallelism-max = 8 } } + + fixes { + # Bug akka/akka-persistence-dynamodb#98 can cause an event source which is written successfully + # and is valid, but where the High Sequence number marker is missing. This causes incomplete + # replay of the event source. A root cause fix prevents this from occurring, but does not + # repair existing event sources. Writing new events to incompletely replayed persistent actors + # will corrupt the event source. If write aligns with an atomic write this will be hidden. + # This fix cannot recover corrupted event sources, but it attempts to avoid corruption from + # taking place where this hasn't happened yet. It does so by not fully trusting the high mark. + # If the last event in a partition is 99, it will attempt to chase the tail of the event + # source. This guards valid event sources with hidden tails from incomplete replay. + # For event sources not suffering from this problem there's 1% chance that this leads to a + # useless query. + # This is a performance/consistency tradeoff to be made. It should not be required for newly + # created event sources that have the root cause fix, hence it is off by default. + # NOTE: With current implementation of the high mark being written after the events, + # there is a slim chance that a valid event source is written, but a network error occurs + # before the high mark is written. In this case the write would be reported as failed to the + # writing party. Whether you want to "keep the event source" is up for discussion. But this fix + # would also recover from that situation. + high-distrust = false + } } dynamodb-snapshot-store { diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala index 0b580ad..7d78858 100644 --- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala +++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala @@ -25,6 +25,10 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig { val MaxBatchWrite = c getInt "aws-api-limits.max-batch-write" val MaxItemSize = c getInt "aws-api-limits.max-item-size" + object Fixes { + val HighDistrust = c getBoolean "fixes.high-distrust" + } + val client = new DynamoDBClientConfig(c) override def toString: String = "DynamoDBJournalConfig(" + "JournalTable:" + JournalTable + @@ -39,5 +43,6 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig { ",MaxBatchGet:" + MaxBatchGet + ",MaxBatchWrite:" + MaxBatchWrite + ",MaxItemSize:" + MaxItemSize + + ",Fixes.HighDistrust:" + Fixes.HighDistrust + ",client.config:" + client } diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala index 022d546..be915dc 100644 --- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala +++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala @@ -170,9 +170,7 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => * for which it was written, all other entries do not update the highest value. Therefore we * must scan the partition of this Sort=0 entry and find the highest occupied number. */ - val request = eventQuery(persistenceId, start) - dynamo.query(request) - .flatMap(getRemainingQueryItems(request, _)) + makeEventQuery(persistenceId, start) .flatMap { result => if (result.getItems.isEmpty) { /* @@ -185,6 +183,40 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) ret } + } else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting + // this function will keep on casing the event source tail + // if HighDistrust is enabled and as long as the partitionMax == 99 + def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = { + if (nextResults.getItems.isEmpty) { + // first iteraton will not pass here, as the query result is not empty + // if the new query result is empty the highest observed is partition -1 + Future.successful(partitionStart - 1) + } else { + /* + * `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key. + */ + val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max + val ret = partitionStart + partitionMax + + if (partitionMax == 99) { + val nextStart = ret + 1 + makeEventQuery(persistenceId, nextStart) + .map { logResult => + if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results + log.warning("readSequenceNr(highest=true persistenceId={}) tail found after {}", persistenceId, ret) + logResult + } + .flatMap(tailChase(nextStart, _)) + } else + Future.successful(ret) + } + } + + tailChase(start, result) + .map { ret => + log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) + ret + } } else { /* * `start` is the Sort=0 entry’s sequence number, so add the maximum sort key. @@ -276,15 +308,17 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => } } - def getRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = { + private[dynamodb] def getAllRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = { val last = result.getLastEvaluatedKey if (last == null || last.isEmpty || last.get(Sort).getN.toLong == 99) Future.successful(result) else { - dynamo.query(request.withExclusiveStartKey(last)).map { next => + dynamo.query(request.withExclusiveStartKey(last)).flatMap { next => val merged = new ArrayList[Item](result.getItems.size + next.getItems.size) merged.addAll(result.getItems) merged.addAll(next.getItems) - next.withItems(merged) + + // need to keep on reading until there's nothing more to read + getAllRemainingQueryItems(request, next.withItems(merged)) } } } @@ -297,6 +331,12 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => .withProjectionExpression("num") .withConsistentRead(true) + private[dynamodb] def makeEventQuery(persistenceId: String, sequenceNr: Long) = { + val request = eventQuery(persistenceId, sequenceNr) + dynamo.query(request) + .flatMap(getAllRemainingQueryItems(request, _)) + } + def batchGetReq(items: JMap[String, KeysAndAttributes]) = new BatchGetItemRequest() .withRequestItems(items) diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index c18d55a..f081446 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -6,6 +6,7 @@ my-dynamodb-journal { aws-access-key-id = "set something in case no real creds are there" aws-secret-access-key = "set something in case no real creds are there" tracing = off + fixes.high-distrust = true } my-dynamodb-snapshot-store = ${dynamodb-snapshot-store} From a9e6d077ad39c20c3a17453933bbae1c786ff110 Mon Sep 17 00:00:00 2001 From: Jean-Luc Deprez Date: Wed, 13 Oct 2021 16:52:28 +0200 Subject: [PATCH 06/10] akka/akka-persistence-dynamodb#98 additional test that verified pages ending in 99 still work --- .../journal/PersistAllConsistencySpec.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala index 29d1a79..1f84ff0 100644 --- a/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala +++ b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala @@ -79,6 +79,21 @@ class PersistAllConsistencySpec extends TestKit(ActorSystem("PersistAllConsisten expectMsg(RecoverySuccess(end2)) } + "recover correctly when the last partition event ends on 99" in { + val start = nextSeqNr + val end = ((start / 100) + 1) * 100 - 1 + println(s"start: ${start}; end: ${end}") + val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil + + journal ! WriteMessages(padding, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start to end) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + + journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor) + (start to end) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i)))) + expectMsg(RecoverySuccess(end)) + } + } } From a1bf40b98f89acbedce8b9729dead58bebb66dd4 Mon Sep 17 00:00:00 2001 From: Jean-Luc Deprez Date: Fri, 15 Oct 2021 11:38:04 +0200 Subject: [PATCH 07/10] adapt to PR remarks iteration 1 - new line - remove 3 files --- .gitignore | 2 +- .scalariform.conf | 1 - .vscode/settings.json | 8 -------- project/esko.sbt | 1 - 4 files changed, 1 insertion(+), 11 deletions(-) delete mode 100644 .scalariform.conf delete mode 100644 .vscode/settings.json delete mode 100644 project/esko.sbt diff --git a/.gitignore b/.gitignore index 9be9696..bcba1aa 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,4 @@ project/project/target .bloop -.metals \ No newline at end of file +.metals diff --git a/.scalariform.conf b/.scalariform.conf deleted file mode 100644 index 5c109e9..0000000 --- a/.scalariform.conf +++ /dev/null @@ -1 +0,0 @@ -autoformat=false \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 0603b4a..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "files.watcherExclude": { - "**/target": true - }, - "metals.bloopSbtAlreadyInstalled": true, - // "editor.formatOnSave": true, - "java.server.launchMode": "LightWeight", -} \ No newline at end of file diff --git a/project/esko.sbt b/project/esko.sbt deleted file mode 100644 index 1995ef9..0000000 --- a/project/esko.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.4.6") \ No newline at end of file From 748208bff5f0965379348f7caa5e8477ddd97b3e Mon Sep 17 00:00:00 2001 From: Jean-Luc Deprez Date: Wed, 9 Feb 2022 11:57:51 +0100 Subject: [PATCH 08/10] retrigger checks From 2d30d817e0125889ce62ca5a874093e4895c5175 Mon Sep 17 00:00:00 2001 From: Corey O'Connor Date: Wed, 16 Feb 2022 16:08:11 -0800 Subject: [PATCH 09/10] Delete metals.sbt --- metals.sbt | 1 - 1 file changed, 1 deletion(-) delete mode 100644 metals.sbt diff --git a/metals.sbt b/metals.sbt deleted file mode 100644 index 6fd9f2c..0000000 --- a/metals.sbt +++ /dev/null @@ -1 +0,0 @@ -bloopExportJarClassifiers in Global := Some(Set("sources")) \ No newline at end of file From c659dfb5f32c8d1e6246e6c6bb87f047f426b231 Mon Sep 17 00:00:00 2001 From: Corey O'Connor Date: Wed, 16 Feb 2022 17:31:25 -0800 Subject: [PATCH 10/10] Fixup merge --- project/Publish.scala | 18 +-- project/Whitesource.scala | 18 +-- .../journal/DynamoDBJournalConfig.scala | 2 +- .../dynamodb/journal/DynamoDBRecovery.scala | 106 +++++++++--------- .../dynamodb/journal/DeletionSpec.scala | 2 +- .../journal/DynamoPartitionGroupedSpec.scala | 2 + .../journal/PersistAllConsistencySpec.scala | 68 ++++++----- .../journal/RecoveryConsistencySpec.scala | 9 +- 8 files changed, 118 insertions(+), 107 deletions(-) diff --git a/project/Publish.scala b/project/Publish.scala index 60709de..9df2352 100644 --- a/project/Publish.scala +++ b/project/Publish.scala @@ -12,7 +12,7 @@ object Publish extends AutoPlugin { val defaultPublishTo = settingKey[File]("Default publish directory") - override def trigger = allRequirements + override def trigger = allRequirements override def requires = sbtrelease.ReleasePlugin override lazy val projectSettings = Seq( @@ -27,8 +27,7 @@ object Publish extends AutoPlugin { homepage := Some(url("https://github.com/akka/akka-persistence-dynamodb")), publishMavenStyle := true, pomIncludeRepository := { x => false }, - defaultPublishTo := crossTarget.value / "repository", - ) + defaultPublishTo := crossTarget.value / "repository") def akkaPomExtra = { @@ -41,15 +40,16 @@ object Publish extends AutoPlugin { } - private def akkaPublishTo = Def.setting { - sonatypeRepo(version.value) orElse localRepo(defaultPublishTo.value) - } + private def akkaPublishTo = + Def.setting { + sonatypeRepo(version.value).orElse(localRepo(defaultPublishTo.value)) + } private def sonatypeRepo(version: String): Option[Resolver] = - Option(sys.props("publish.maven.central")) filter (_.toLowerCase == "true") map { _ => + Option(sys.props("publish.maven.central")).filter(_.toLowerCase == "true").map { _ => val nexus = "https://oss.sonatype.org/" - if (version endsWith "-SNAPSHOT") "snapshots" at nexus + "content/repositories/snapshots" - else "releases" at nexus + "service/local/staging/deploy/maven2" + if (version.endsWith("-SNAPSHOT")) "snapshots".at(nexus + "content/repositories/snapshots") + else "releases".at(nexus + "service/local/staging/deploy/maven2") } private def localRepo(repository: File) = diff --git a/project/Whitesource.scala b/project/Whitesource.scala index a88841a..5aac5e6 100644 --- a/project/Whitesource.scala +++ b/project/Whitesource.scala @@ -14,15 +14,15 @@ object Whitesource extends AutoPlugin { whitesourceProduct := "Lightbend Reactive Platform", whitesourceAggregateProjectName := { val projectName = (moduleName in LocalRootProject).value.replace("-root", "") - projectName + "-" + ( - if (isSnapshot.value) - if (gitCurrentBranch.value == "master") "master" - else "adhoc" - else CrossVersion.partialVersion((version in LocalRootProject).value) - .map { case (major,minor) => s"$major.$minor-stable" } - .getOrElse("adhoc")) + projectName + "-" + (if (isSnapshot.value) + if (gitCurrentBranch.value == "master") "master" + else "adhoc" + else + CrossVersion + .partialVersion((version in LocalRootProject).value) + .map { case (major, minor) => s"$major.$minor-stable" } + .getOrElse("adhoc")) }, whitesourceForceCheckAllDependencies := true, - whitesourceFailOnError := true - ) + whitesourceFailOnError := true) } diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala index 2759578..51deb5f 100644 --- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala +++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala @@ -26,7 +26,7 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig { val MaxItemSize = c.getInt("aws-api-limits.max-item-size") object Fixes { - val HighDistrust = c getBoolean "fixes.high-distrust" + val HighDistrust = c.getBoolean("fixes.high-distrust") } val client = new DynamoDBClientConfig(c) diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala index 5c7f35c..43b58b6 100644 --- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala +++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala @@ -272,62 +272,63 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => * for which it was written, all other entries do not update the highest value. Therefore we * must scan the partition of this Sort=0 entry and find the highest occupied number. */ - getAllPartitionSequenceNrs(persistenceId, start) - .flatMap { result => - if (result.getItems.isEmpty) { - /* - * If this comes back empty then that means that all events have been deleted. The only - * reliable way to obtain the previously highest number is to also read the lowest number - * (which is always stored in full), knowing that it will be either highest-1 or zero. - */ - readSequenceNr(persistenceId, highest = false).map { lowest => - val ret = Math.max(start, lowest - 1) - log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) - ret - } - } else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting - // this function will keep on chasing the event source tail - // if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1 - def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = { - if (nextResults.getItems.isEmpty) { - // first iteraton will not pass here, as the query result is not empty - // if the new query result is empty the highest observed is partition -1 - Future.successful(partitionStart - 1) - } else { - /* - * `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key. - */ - val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max - val ret = partitionStart + partitionMax - - if (partitionMax == PartitionSize - 1) { - val nextStart = ret + 1 - makeEventQuery(persistenceId, nextStart) - .map { logResult => - if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results - log.warning("readSequenceNr(highest=true persistenceId={}) tail found after {}", persistenceId, ret) - logResult - } - .flatMap(tailChase(nextStart, _)) - } else - Future.successful(ret) - } + getAllPartitionSequenceNrs(persistenceId, start).flatMap { result => + if (result.getItems.isEmpty) { + /* + * If this comes back empty then that means that all events have been deleted. The only + * reliable way to obtain the previously highest number is to also read the lowest number + * (which is always stored in full), knowing that it will be either highest-1 or zero. + */ + readSequenceNr(persistenceId, highest = false).map { lowest => + val ret = Math.max(start, lowest - 1) + log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) + ret + } + } else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting + // this function will keep on chasing the event source tail + // if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1 + def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = { + if (nextResults.getItems.isEmpty) { + // first iteraton will not pass here, as the query result is not empty + // if the new query result is empty the highest observed is partition -1 + Future.successful(partitionStart - 1) + } else { + /* + * `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key. + */ + val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max + val ret = partitionStart + partitionMax + + if (partitionMax == PartitionSize - 1) { + val nextStart = ret + 1 + getAllPartitionSequenceNrs(persistenceId, nextStart) + .map { logResult => + if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results + log.warning( + "readSequenceNr(highest=true persistenceId={}) tail found after {}", + persistenceId, + ret) + logResult + } + .flatMap(tailChase(nextStart, _)) + } else + Future.successful(ret) } + } - tailChase(start, result) - .map { ret => - log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) - ret - } - } else { - /* - * `start` is the Sort=0 entry’s sequence number, so add the maximum sort key. - */ - val ret = start + result.getItems.asScala.map(_.get(Sort).getN.toLong).max + tailChase(start, result).map { ret => log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) - Future.successful(ret) + ret } + } else { + /* + * `start` is the Sort=0 entry’s sequence number, so add the maximum sort key. + */ + val ret = start + result.getItems.asScala.map(_.get(Sort).getN.toLong).max + log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) + Future.successful(ret) } + } } else { log.debug("readSequenceNr(highest=false persistenceId={}) = {}", persistenceId, start) Future.successful(start) @@ -498,8 +499,7 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => private[dynamodb] def getAllPartitionSequenceNrs(persistenceId: String, sequenceNr: Long) = { val request = eventQuery(persistenceId, sequenceNr) - dynamo.query(request) - .flatMap(getAllRemainingQueryItems(request, _)) + dynamo.query(request).flatMap(getAllRemainingQueryItems(request, _)) } def batchGetReq(items: JMap[String, KeysAndAttributes]) = diff --git a/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala index 183c0b4..f476347 100644 --- a/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala +++ b/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala @@ -36,8 +36,8 @@ class DeletionSpec * noisy logging, and I like my build output clean and green. */ Thread.sleep(500) - system.terminate().futureValue client.shutdown() + system.terminate().futureValue super.afterAll() } diff --git a/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala index b01ee19..38f3f2a 100644 --- a/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala +++ b/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala @@ -10,6 +10,8 @@ import org.scalatest.WordSpecLike class DynamoPartitionGroupedSpec extends TestKit(ActorSystem("DynamoPartitionGroupedSpec")) with WordSpecLike { implicit val materializer = ActorMaterializer() + assert(PartitionSize == 100, "This test is only valid with PartitionSize == 100") + "A DynamoPartitionGroup should create the correct PartitionKey outputs" when { "events 1 thru 250 are presented" in { val sourceUnderTest = diff --git a/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala index 1f84ff0..93d6c7b 100644 --- a/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala +++ b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala @@ -15,23 +15,30 @@ import com.amazonaws.services.dynamodbv2.model._ import java.util.{ HashMap => JHMap } import akka.persistence.dynamodb._ -class PersistAllConsistencySpec extends TestKit(ActorSystem("PersistAllConsistencySpec")) +class PersistAllConsistencySpec + extends TestKit(ActorSystem("PersistAllConsistencySpec")) with ImplicitSender with WordSpecLike with BeforeAndAfterAll with Matchers with ScalaFutures with ConversionCheckedTripleEquals - with DynamoDBUtils { + with DynamoDBUtils + with IntegSpec { + + override def beforeAll(): Unit = { + super.beforeAll() + ensureJournalTableExists() + } - override def beforeAll(): Unit = ensureJournalTableExists() override def afterAll(): Unit = { client.shutdown() system.terminate().futureValue + super.afterAll() } override val persistenceId = "PersistAllConsistencySpec" - lazy val journal = Persistence(system).journalFor("") + lazy val journal = Persistence(system).journalFor("") import settings._ @@ -42,55 +49,56 @@ class PersistAllConsistencySpec extends TestKit(ActorSystem("PersistAllConsisten expectMsg(Purged(persistenceId)) val start = nextSeqNr - val end = 10 + val end = 10 println(s"start: ${start}; end: ${end}") val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil journal ! WriteMessages(padding, testActor, 1) expectMsg(WriteMessagesSuccessful) - (start to end) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + (start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor) - (start to end) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i)))) + (start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i)))) expectMsg(RecoverySuccess(end)) } - for (t <- Seq(("last", 3), ("middle", 2), ("first", 1))) s"correctly cross page boundaries with AtomicWrite position ${t._1}" in { - val start1 = nextSeqNr - val end1 = ((start1 / 100) + 1) * 100 - t._2 - println(s"start: ${start1}; end: ${end1}") - val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil + for (t <- Seq(("last", 3), ("middle", 2), ("first", 1))) + s"correctly cross page boundaries with AtomicWrite position ${t._1}" in { + val start1 = nextSeqNr + val end1 = ((start1 / PartitionSize) + 1) * PartitionSize - t._2 + println(s"start: ${start1}; end: ${end1}") + val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil - journal ! WriteMessages(padding, testActor, 1) - expectMsg(WriteMessagesSuccessful) - (start1 to end1) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + journal ! WriteMessages(padding, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start1 to end1).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) - val start2 = nextSeqNr - val end2 = start2 + 2 - println(s"start: ${start2}; end: ${end2}") - val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil + val start2 = nextSeqNr + val end2 = start2 + 2 + println(s"start: ${start2}; end: ${end2}") + val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil - journal ! WriteMessages(subject, testActor, 1) - expectMsg(WriteMessagesSuccessful) - (start2 to end2) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + journal ! WriteMessages(subject, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start2 to end2).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) - journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor) - (start1 to end2) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i)))) - expectMsg(RecoverySuccess(end2)) - } + journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor) + (start1 to end2).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i)))) + expectMsg(RecoverySuccess(end2)) + } - "recover correctly when the last partition event ends on 99" in { + s"recover correctly when the last partition event ends on ${PartitionSize - 1}" in { val start = nextSeqNr - val end = ((start / 100) + 1) * 100 - 1 + val end = ((start / PartitionSize) + 1) * PartitionSize - 1 println(s"start: ${start}; end: ${end}") val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil journal ! WriteMessages(padding, testActor, 1) expectMsg(WriteMessagesSuccessful) - (start to end) foreach (i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + (start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor) - (start to end) foreach (i => expectMsg(ReplayedMessage(generatedMessages(i)))) + (start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i)))) expectMsg(RecoverySuccess(end)) } diff --git a/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala index ac4dd15..4b21d91 100644 --- a/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala +++ b/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala @@ -88,7 +88,7 @@ class RecoveryConsistencySpec "read correct highest sequence number even if a Sort=0 entry is lost" in { val start = messages + 19 - val end = (start / 100 + 1) * 100 + val end = (start / PartitionSize + 1) * PartitionSize val more = (start to end).map(i => AtomicWrite(persistentRepr(f"e-$i"))) journal ! WriteMessages(more, testActor, 1) expectMsg(WriteMessagesSuccessful) @@ -98,7 +98,8 @@ class RecoveryConsistencySpec journal ! ListAll(persistenceId, testActor) val ids = ((1L to (end - 1)).toSet -- Set[Long](2, 4, 12, 15).map(_ + messages)).toSeq.sorted - expectMsg(ListAllResult(persistenceId, Set.empty, (1L to (end / 100)).map(_ * 100).toSet, ids)) + expectMsg( + ListAllResult(persistenceId, Set.empty, (1L to (end / PartitionSize)).map(_ * PartitionSize).toSet, ids)) journal ! ReplayMessages(0, Long.MaxValue, 0, persistenceId, testActor) expectMsg(RecoverySuccess(end)) @@ -157,8 +158,8 @@ class RecoveryConsistencySpec private def delete(num: Long) = { val key: Item = new JHMap - key.put(Key, S(s"$JournalName-P-$persistenceId-${num / 100}")) - key.put(Sort, N(num % 100)) + key.put(Key, S(s"$JournalName-P-$persistenceId-${num / PartitionSize}")) + key.put(Sort, N(num % PartitionSize)) client.deleteItem(new DeleteItemRequest().withTableName(JournalTable).withKey(key)).futureValue } }