diff --git a/.gitignore b/.gitignore index 4c44cdd..bcba1aa 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,7 @@ project/project/target .settings/ .cache .aws + + +.bloop +.metals diff --git a/project/Publish.scala b/project/Publish.scala index 60709de..9df2352 100644 --- a/project/Publish.scala +++ b/project/Publish.scala @@ -12,7 +12,7 @@ object Publish extends AutoPlugin { val defaultPublishTo = settingKey[File]("Default publish directory") - override def trigger = allRequirements + override def trigger = allRequirements override def requires = sbtrelease.ReleasePlugin override lazy val projectSettings = Seq( @@ -27,8 +27,7 @@ object Publish extends AutoPlugin { homepage := Some(url("https://github.com/akka/akka-persistence-dynamodb")), publishMavenStyle := true, pomIncludeRepository := { x => false }, - defaultPublishTo := crossTarget.value / "repository", - ) + defaultPublishTo := crossTarget.value / "repository") def akkaPomExtra = { @@ -41,15 +40,16 @@ object Publish extends AutoPlugin { } - private def akkaPublishTo = Def.setting { - sonatypeRepo(version.value) orElse localRepo(defaultPublishTo.value) - } + private def akkaPublishTo = + Def.setting { + sonatypeRepo(version.value).orElse(localRepo(defaultPublishTo.value)) + } private def sonatypeRepo(version: String): Option[Resolver] = - Option(sys.props("publish.maven.central")) filter (_.toLowerCase == "true") map { _ => + Option(sys.props("publish.maven.central")).filter(_.toLowerCase == "true").map { _ => val nexus = "https://oss.sonatype.org/" - if (version endsWith "-SNAPSHOT") "snapshots" at nexus + "content/repositories/snapshots" - else "releases" at nexus + "service/local/staging/deploy/maven2" + if (version.endsWith("-SNAPSHOT")) "snapshots".at(nexus + "content/repositories/snapshots") + else "releases".at(nexus + "service/local/staging/deploy/maven2") } private def localRepo(repository: File) = diff --git a/project/Whitesource.scala b/project/Whitesource.scala index a88841a..5aac5e6 100644 --- a/project/Whitesource.scala +++ b/project/Whitesource.scala @@ -14,15 +14,15 @@ object Whitesource extends AutoPlugin { whitesourceProduct := "Lightbend Reactive Platform", whitesourceAggregateProjectName := { val projectName = (moduleName in LocalRootProject).value.replace("-root", "") - projectName + "-" + ( - if (isSnapshot.value) - if (gitCurrentBranch.value == "master") "master" - else "adhoc" - else CrossVersion.partialVersion((version in LocalRootProject).value) - .map { case (major,minor) => s"$major.$minor-stable" } - .getOrElse("adhoc")) + projectName + "-" + (if (isSnapshot.value) + if (gitCurrentBranch.value == "master") "master" + else "adhoc" + else + CrossVersion + .partialVersion((version in LocalRootProject).value) + .map { case (major, minor) => s"$major.$minor-stable" } + .getOrElse("adhoc")) }, whitesourceForceCheckAllDependencies := true, - whitesourceFailOnError := true - ) + whitesourceFailOnError := true) } diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 72125b0..5c87219 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -120,6 +120,28 @@ dynamodb-journal { parallelism-max = 8 } } + + fixes { + # Bug akka/akka-persistence-dynamodb#98 can cause an event source which is written successfully + # and is valid, but where the High Sequence number marker is missing. This causes incomplete + # replay of the event source. A root cause fix prevents this from occurring, but does not + # repair existing event sources. Writing new events to incompletely replayed persistent actors + # will corrupt the event source. If write aligns with an atomic write this will be hidden. + # This fix cannot recover corrupted event sources, but it attempts to avoid corruption from + # taking place where this hasn't happened yet. It does so by not fully trusting the high mark. + # If the last event in a partition is 99, it will attempt to chase the tail of the event + # source. This guards valid event sources with hidden tails from incomplete replay. + # For event sources not suffering from this problem there's 1% chance that this leads to a + # useless query. + # This is a performance/consistency tradeoff to be made. It should not be required for newly + # created event sources that have the root cause fix, hence it is off by default. + # NOTE: With current implementation of the high mark being written after the events, + # there is a slim chance that a valid event source is written, but a network error occurs + # before the high mark is written. In this case the write would be reported as failed to the + # writing party. Whether you want to "keep the event source" is up for discussion. But this fix + # would also recover from that situation. + high-distrust = false + } } dynamodb-snapshot-store { diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala index 074a128..51deb5f 100644 --- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala +++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala @@ -25,6 +25,10 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig { val MaxBatchWrite = c.getInt("aws-api-limits.max-batch-write") val MaxItemSize = c.getInt("aws-api-limits.max-item-size") + object Fixes { + val HighDistrust = c.getBoolean("fixes.high-distrust") + } + val client = new DynamoDBClientConfig(c) override def toString: String = "DynamoDBJournalConfig(" + @@ -40,6 +44,7 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig { ",MaxBatchGet:" + MaxBatchGet + ",MaxBatchWrite:" + MaxBatchWrite + ",MaxItemSize:" + MaxItemSize + + ",Fixes.HighDistrust:" + Fixes.HighDistrust + ",client.config:" + client + ")" } diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala index 04f6a31..8aef289 100644 --- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala +++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala @@ -96,7 +96,7 @@ trait DynamoDBJournalRequests extends DynamoDBRequests { item.put(AtomIndex, N(index)) item.put(AtomEnd, size) putReq(item) - } ++ (if (low / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None) + } ++ (if ((low - 1) / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None) val futures = writes.grouped(MaxBatchWrite).map { batch => dynamo.batchWriteItem(batchWriteReq(batch)).flatMap(r => sendUnprocessedItems(r)) diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala index 34fdce5..43b58b6 100644 --- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala +++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala @@ -272,8 +272,7 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => * for which it was written, all other entries do not update the highest value. Therefore we * must scan the partition of this Sort=0 entry and find the highest occupied number. */ - val request = eventQuery(persistenceId, start) - dynamo.query(request).flatMap(getRemainingQueryItems(request, _)).flatMap { result => + getAllPartitionSequenceNrs(persistenceId, start).flatMap { result => if (result.getItems.isEmpty) { /* * If this comes back empty then that means that all events have been deleted. The only @@ -285,6 +284,42 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) ret } + } else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting + // this function will keep on chasing the event source tail + // if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1 + def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = { + if (nextResults.getItems.isEmpty) { + // first iteraton will not pass here, as the query result is not empty + // if the new query result is empty the highest observed is partition -1 + Future.successful(partitionStart - 1) + } else { + /* + * `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key. + */ + val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max + val ret = partitionStart + partitionMax + + if (partitionMax == PartitionSize - 1) { + val nextStart = ret + 1 + getAllPartitionSequenceNrs(persistenceId, nextStart) + .map { logResult => + if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results + log.warning( + "readSequenceNr(highest=true persistenceId={}) tail found after {}", + persistenceId, + ret) + logResult + } + .flatMap(tailChase(nextStart, _)) + } else + Future.successful(ret) + } + } + + tailChase(start, result).map { ret => + log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret) + ret + } } else { /* * `start` is the Sort=0 entry’s sequence number, so add the maximum sort key. @@ -438,15 +473,17 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => } } - def getRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = { + private[dynamodb] def getAllRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = { val last = result.getLastEvaluatedKey if (last == null || last.isEmpty || last.get(Sort).getN.toLong == 99) Future.successful(result) else { - dynamo.query(request.withExclusiveStartKey(last)).map { next => + dynamo.query(request.withExclusiveStartKey(last)).flatMap { next => val merged = new ArrayList[Item](result.getItems.size + next.getItems.size) merged.addAll(result.getItems) merged.addAll(next.getItems) - next.withItems(merged) + + // need to keep on reading until there's nothing more to read + getAllRemainingQueryItems(request, next.withItems(merged)) } } } @@ -460,6 +497,11 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal => .withProjectionExpression("num") .withConsistentRead(true) + private[dynamodb] def getAllPartitionSequenceNrs(persistenceId: String, sequenceNr: Long) = { + val request = eventQuery(persistenceId, sequenceNr) + dynamo.query(request).flatMap(getAllRemainingQueryItems(request, _)) + } + def batchGetReq(items: JMap[String, KeysAndAttributes]) = new BatchGetItemRequest().withRequestItems(items).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) } diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index ae661a6..99949c8 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -6,6 +6,7 @@ my-dynamodb-journal { aws-access-key-id = "AWS_ACCESS_KEY_ID" aws-secret-access-key = "AWS_SECRET_ACCESS_KEY" tracing = off + fixes.high-distrust = true } my-dynamodb-snapshot-store = ${dynamodb-snapshot-store} diff --git a/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala index 183c0b4..f476347 100644 --- a/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala +++ b/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala @@ -36,8 +36,8 @@ class DeletionSpec * noisy logging, and I like my build output clean and green. */ Thread.sleep(500) - system.terminate().futureValue client.shutdown() + system.terminate().futureValue super.afterAll() } diff --git a/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala index b01ee19..38f3f2a 100644 --- a/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala +++ b/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala @@ -10,6 +10,8 @@ import org.scalatest.WordSpecLike class DynamoPartitionGroupedSpec extends TestKit(ActorSystem("DynamoPartitionGroupedSpec")) with WordSpecLike { implicit val materializer = ActorMaterializer() + assert(PartitionSize == 100, "This test is only valid with PartitionSize == 100") + "A DynamoPartitionGroup should create the correct PartitionKey outputs" when { "events 1 thru 250 are presented" in { val sourceUnderTest = diff --git a/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala new file mode 100644 index 0000000..93d6c7b --- /dev/null +++ b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2021 Typesafe Inc. + */ +package akka.persistence.dynamodb.journal + +import org.scalactic.ConversionCheckedTripleEquals +import org.scalatest._ +import org.scalatest.concurrent.ScalaFutures +import akka.actor.ActorSystem +import akka.persistence._ +import akka.persistence.JournalProtocol._ +import akka.testkit._ +import akka.persistence.journal.AsyncWriteTarget.ReplaySuccess +import com.amazonaws.services.dynamodbv2.model._ +import java.util.{ HashMap => JHMap } +import akka.persistence.dynamodb._ + +class PersistAllConsistencySpec + extends TestKit(ActorSystem("PersistAllConsistencySpec")) + with ImplicitSender + with WordSpecLike + with BeforeAndAfterAll + with Matchers + with ScalaFutures + with ConversionCheckedTripleEquals + with DynamoDBUtils + with IntegSpec { + + override def beforeAll(): Unit = { + super.beforeAll() + ensureJournalTableExists() + } + + override def afterAll(): Unit = { + client.shutdown() + system.terminate().futureValue + super.afterAll() + } + + override val persistenceId = "PersistAllConsistencySpec" + lazy val journal = Persistence(system).journalFor("") + + import settings._ + + "DynamoDB Journal (persistAll)" must { + + "recover correctly if the first write is a batch" in { + journal ! Purge(persistenceId, testActor) + expectMsg(Purged(persistenceId)) + + val start = nextSeqNr + val end = 10 + println(s"start: ${start}; end: ${end}") + val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil + + journal ! WriteMessages(padding, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + + journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor) + (start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i)))) + expectMsg(RecoverySuccess(end)) + } + + for (t <- Seq(("last", 3), ("middle", 2), ("first", 1))) + s"correctly cross page boundaries with AtomicWrite position ${t._1}" in { + val start1 = nextSeqNr + val end1 = ((start1 / PartitionSize) + 1) * PartitionSize - t._2 + println(s"start: ${start1}; end: ${end1}") + val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil + + journal ! WriteMessages(padding, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start1 to end1).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + + val start2 = nextSeqNr + val end2 = start2 + 2 + println(s"start: ${start2}; end: ${end2}") + val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil + + journal ! WriteMessages(subject, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start2 to end2).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + + journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor) + (start1 to end2).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i)))) + expectMsg(RecoverySuccess(end2)) + } + + s"recover correctly when the last partition event ends on ${PartitionSize - 1}" in { + val start = nextSeqNr + val end = ((start / PartitionSize) + 1) * PartitionSize - 1 + println(s"start: ${start}; end: ${end}") + val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil + + journal ! WriteMessages(padding, testActor, 1) + expectMsg(WriteMessagesSuccessful) + (start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1))) + + journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor) + (start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i)))) + expectMsg(RecoverySuccess(end)) + } + + } + +} diff --git a/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala index ac4dd15..4b21d91 100644 --- a/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala +++ b/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala @@ -88,7 +88,7 @@ class RecoveryConsistencySpec "read correct highest sequence number even if a Sort=0 entry is lost" in { val start = messages + 19 - val end = (start / 100 + 1) * 100 + val end = (start / PartitionSize + 1) * PartitionSize val more = (start to end).map(i => AtomicWrite(persistentRepr(f"e-$i"))) journal ! WriteMessages(more, testActor, 1) expectMsg(WriteMessagesSuccessful) @@ -98,7 +98,8 @@ class RecoveryConsistencySpec journal ! ListAll(persistenceId, testActor) val ids = ((1L to (end - 1)).toSet -- Set[Long](2, 4, 12, 15).map(_ + messages)).toSeq.sorted - expectMsg(ListAllResult(persistenceId, Set.empty, (1L to (end / 100)).map(_ * 100).toSet, ids)) + expectMsg( + ListAllResult(persistenceId, Set.empty, (1L to (end / PartitionSize)).map(_ * PartitionSize).toSet, ids)) journal ! ReplayMessages(0, Long.MaxValue, 0, persistenceId, testActor) expectMsg(RecoverySuccess(end)) @@ -157,8 +158,8 @@ class RecoveryConsistencySpec private def delete(num: Long) = { val key: Item = new JHMap - key.put(Key, S(s"$JournalName-P-$persistenceId-${num / 100}")) - key.put(Sort, N(num % 100)) + key.put(Key, S(s"$JournalName-P-$persistenceId-${num / PartitionSize}")) + key.put(Sort, N(num % PartitionSize)) client.deleteItem(new DeleteItemRequest().withTableName(JournalTable).withKey(key)).futureValue } }