diff --git a/.gitignore b/.gitignore
index 4c44cdd..bcba1aa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,7 @@ project/project/target
.settings/
.cache
.aws
+
+
+.bloop
+.metals
diff --git a/project/Publish.scala b/project/Publish.scala
index 60709de..9df2352 100644
--- a/project/Publish.scala
+++ b/project/Publish.scala
@@ -12,7 +12,7 @@ object Publish extends AutoPlugin {
val defaultPublishTo = settingKey[File]("Default publish directory")
- override def trigger = allRequirements
+ override def trigger = allRequirements
override def requires = sbtrelease.ReleasePlugin
override lazy val projectSettings = Seq(
@@ -27,8 +27,7 @@ object Publish extends AutoPlugin {
homepage := Some(url("https://github.com/akka/akka-persistence-dynamodb")),
publishMavenStyle := true,
pomIncludeRepository := { x => false },
- defaultPublishTo := crossTarget.value / "repository",
- )
+ defaultPublishTo := crossTarget.value / "repository")
def akkaPomExtra = {
@@ -41,15 +40,16 @@ object Publish extends AutoPlugin {
}
- private def akkaPublishTo = Def.setting {
- sonatypeRepo(version.value) orElse localRepo(defaultPublishTo.value)
- }
+ private def akkaPublishTo =
+ Def.setting {
+ sonatypeRepo(version.value).orElse(localRepo(defaultPublishTo.value))
+ }
private def sonatypeRepo(version: String): Option[Resolver] =
- Option(sys.props("publish.maven.central")) filter (_.toLowerCase == "true") map { _ =>
+ Option(sys.props("publish.maven.central")).filter(_.toLowerCase == "true").map { _ =>
val nexus = "https://oss.sonatype.org/"
- if (version endsWith "-SNAPSHOT") "snapshots" at nexus + "content/repositories/snapshots"
- else "releases" at nexus + "service/local/staging/deploy/maven2"
+ if (version.endsWith("-SNAPSHOT")) "snapshots".at(nexus + "content/repositories/snapshots")
+ else "releases".at(nexus + "service/local/staging/deploy/maven2")
}
private def localRepo(repository: File) =
diff --git a/project/Whitesource.scala b/project/Whitesource.scala
index a88841a..5aac5e6 100644
--- a/project/Whitesource.scala
+++ b/project/Whitesource.scala
@@ -14,15 +14,15 @@ object Whitesource extends AutoPlugin {
whitesourceProduct := "Lightbend Reactive Platform",
whitesourceAggregateProjectName := {
val projectName = (moduleName in LocalRootProject).value.replace("-root", "")
- projectName + "-" + (
- if (isSnapshot.value)
- if (gitCurrentBranch.value == "master") "master"
- else "adhoc"
- else CrossVersion.partialVersion((version in LocalRootProject).value)
- .map { case (major,minor) => s"$major.$minor-stable" }
- .getOrElse("adhoc"))
+ projectName + "-" + (if (isSnapshot.value)
+ if (gitCurrentBranch.value == "master") "master"
+ else "adhoc"
+ else
+ CrossVersion
+ .partialVersion((version in LocalRootProject).value)
+ .map { case (major, minor) => s"$major.$minor-stable" }
+ .getOrElse("adhoc"))
},
whitesourceForceCheckAllDependencies := true,
- whitesourceFailOnError := true
- )
+ whitesourceFailOnError := true)
}
diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf
index 72125b0..5c87219 100644
--- a/src/main/resources/reference.conf
+++ b/src/main/resources/reference.conf
@@ -120,6 +120,28 @@ dynamodb-journal {
parallelism-max = 8
}
}
+
+ fixes {
+ # Bug akka/akka-persistence-dynamodb#98 can cause an event source which is written successfully
+ # and is valid, but where the High Sequence number marker is missing. This causes incomplete
+ # replay of the event source. A root cause fix prevents this from occurring, but does not
+ # repair existing event sources. Writing new events to incompletely replayed persistent actors
+ # will corrupt the event source. If write aligns with an atomic write this will be hidden.
+ # This fix cannot recover corrupted event sources, but it attempts to avoid corruption from
+ # taking place where this hasn't happened yet. It does so by not fully trusting the high mark.
+ # If the last event in a partition is 99, it will attempt to chase the tail of the event
+ # source. This guards valid event sources with hidden tails from incomplete replay.
+ # For event sources not suffering from this problem there's 1% chance that this leads to a
+ # useless query.
+ # This is a performance/consistency tradeoff to be made. It should not be required for newly
+ # created event sources that have the root cause fix, hence it is off by default.
+ # NOTE: With current implementation of the high mark being written after the events,
+ # there is a slim chance that a valid event source is written, but a network error occurs
+ # before the high mark is written. In this case the write would be reported as failed to the
+ # writing party. Whether you want to "keep the event source" is up for discussion. But this fix
+ # would also recover from that situation.
+ high-distrust = false
+ }
}
dynamodb-snapshot-store {
diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala
index 074a128..51deb5f 100644
--- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala
+++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalConfig.scala
@@ -25,6 +25,10 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
val MaxBatchWrite = c.getInt("aws-api-limits.max-batch-write")
val MaxItemSize = c.getInt("aws-api-limits.max-item-size")
+ object Fixes {
+ val HighDistrust = c.getBoolean("fixes.high-distrust")
+ }
+
val client = new DynamoDBClientConfig(c)
override def toString: String =
"DynamoDBJournalConfig(" +
@@ -40,6 +44,7 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
",MaxBatchGet:" + MaxBatchGet +
",MaxBatchWrite:" + MaxBatchWrite +
",MaxItemSize:" + MaxItemSize +
+ ",Fixes.HighDistrust:" + Fixes.HighDistrust +
",client.config:" + client +
")"
}
diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala
index 04f6a31..8aef289 100644
--- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala
+++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBJournalRequests.scala
@@ -96,7 +96,7 @@ trait DynamoDBJournalRequests extends DynamoDBRequests {
item.put(AtomIndex, N(index))
item.put(AtomEnd, size)
putReq(item)
- } ++ (if (low / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None)
+ } ++ (if ((low - 1) / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None)
val futures = writes.grouped(MaxBatchWrite).map { batch =>
dynamo.batchWriteItem(batchWriteReq(batch)).flatMap(r => sendUnprocessedItems(r))
diff --git a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala
index 34fdce5..43b58b6 100644
--- a/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala
+++ b/src/main/scala/akka/persistence/dynamodb/journal/DynamoDBRecovery.scala
@@ -272,8 +272,7 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
* for which it was written, all other entries do not update the highest value. Therefore we
* must scan the partition of this Sort=0 entry and find the highest occupied number.
*/
- val request = eventQuery(persistenceId, start)
- dynamo.query(request).flatMap(getRemainingQueryItems(request, _)).flatMap { result =>
+ getAllPartitionSequenceNrs(persistenceId, start).flatMap { result =>
if (result.getItems.isEmpty) {
/*
* If this comes back empty then that means that all events have been deleted. The only
@@ -285,6 +284,42 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
+ } else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting
+ // this function will keep on chasing the event source tail
+ // if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1
+ def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = {
+ if (nextResults.getItems.isEmpty) {
+ // first iteraton will not pass here, as the query result is not empty
+ // if the new query result is empty the highest observed is partition -1
+ Future.successful(partitionStart - 1)
+ } else {
+ /*
+ * `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key.
+ */
+ val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max
+ val ret = partitionStart + partitionMax
+
+ if (partitionMax == PartitionSize - 1) {
+ val nextStart = ret + 1
+ getAllPartitionSequenceNrs(persistenceId, nextStart)
+ .map { logResult =>
+ if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results
+ log.warning(
+ "readSequenceNr(highest=true persistenceId={}) tail found after {}",
+ persistenceId,
+ ret)
+ logResult
+ }
+ .flatMap(tailChase(nextStart, _))
+ } else
+ Future.successful(ret)
+ }
+ }
+
+ tailChase(start, result).map { ret =>
+ log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
+ ret
+ }
} else {
/*
* `start` is the Sort=0 entry’s sequence number, so add the maximum sort key.
@@ -438,15 +473,17 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
}
}
- def getRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = {
+ private[dynamodb] def getAllRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = {
val last = result.getLastEvaluatedKey
if (last == null || last.isEmpty || last.get(Sort).getN.toLong == 99) Future.successful(result)
else {
- dynamo.query(request.withExclusiveStartKey(last)).map { next =>
+ dynamo.query(request.withExclusiveStartKey(last)).flatMap { next =>
val merged = new ArrayList[Item](result.getItems.size + next.getItems.size)
merged.addAll(result.getItems)
merged.addAll(next.getItems)
- next.withItems(merged)
+
+ // need to keep on reading until there's nothing more to read
+ getAllRemainingQueryItems(request, next.withItems(merged))
}
}
}
@@ -460,6 +497,11 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
.withProjectionExpression("num")
.withConsistentRead(true)
+ private[dynamodb] def getAllPartitionSequenceNrs(persistenceId: String, sequenceNr: Long) = {
+ val request = eventQuery(persistenceId, sequenceNr)
+ dynamo.query(request).flatMap(getAllRemainingQueryItems(request, _))
+ }
+
def batchGetReq(items: JMap[String, KeysAndAttributes]) =
new BatchGetItemRequest().withRequestItems(items).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
}
diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf
index ae661a6..99949c8 100644
--- a/src/test/resources/application.conf
+++ b/src/test/resources/application.conf
@@ -6,6 +6,7 @@ my-dynamodb-journal {
aws-access-key-id = "AWS_ACCESS_KEY_ID"
aws-secret-access-key = "AWS_SECRET_ACCESS_KEY"
tracing = off
+ fixes.high-distrust = true
}
my-dynamodb-snapshot-store = ${dynamodb-snapshot-store}
diff --git a/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala
index 183c0b4..f476347 100644
--- a/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala
+++ b/src/test/scala/akka/persistence/dynamodb/journal/DeletionSpec.scala
@@ -36,8 +36,8 @@ class DeletionSpec
* noisy logging, and I like my build output clean and green.
*/
Thread.sleep(500)
- system.terminate().futureValue
client.shutdown()
+ system.terminate().futureValue
super.afterAll()
}
diff --git a/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala
index b01ee19..38f3f2a 100644
--- a/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala
+++ b/src/test/scala/akka/persistence/dynamodb/journal/DynamoPartitionGroupedSpec.scala
@@ -10,6 +10,8 @@ import org.scalatest.WordSpecLike
class DynamoPartitionGroupedSpec extends TestKit(ActorSystem("DynamoPartitionGroupedSpec")) with WordSpecLike {
implicit val materializer = ActorMaterializer()
+ assert(PartitionSize == 100, "This test is only valid with PartitionSize == 100")
+
"A DynamoPartitionGroup should create the correct PartitionKey outputs" when {
"events 1 thru 250 are presented" in {
val sourceUnderTest =
diff --git a/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala
new file mode 100644
index 0000000..93d6c7b
--- /dev/null
+++ b/src/test/scala/akka/persistence/dynamodb/journal/PersistAllConsistencySpec.scala
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2021 Typesafe Inc.
+ */
+package akka.persistence.dynamodb.journal
+
+import org.scalactic.ConversionCheckedTripleEquals
+import org.scalatest._
+import org.scalatest.concurrent.ScalaFutures
+import akka.actor.ActorSystem
+import akka.persistence._
+import akka.persistence.JournalProtocol._
+import akka.testkit._
+import akka.persistence.journal.AsyncWriteTarget.ReplaySuccess
+import com.amazonaws.services.dynamodbv2.model._
+import java.util.{ HashMap => JHMap }
+import akka.persistence.dynamodb._
+
+class PersistAllConsistencySpec
+ extends TestKit(ActorSystem("PersistAllConsistencySpec"))
+ with ImplicitSender
+ with WordSpecLike
+ with BeforeAndAfterAll
+ with Matchers
+ with ScalaFutures
+ with ConversionCheckedTripleEquals
+ with DynamoDBUtils
+ with IntegSpec {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ ensureJournalTableExists()
+ }
+
+ override def afterAll(): Unit = {
+ client.shutdown()
+ system.terminate().futureValue
+ super.afterAll()
+ }
+
+ override val persistenceId = "PersistAllConsistencySpec"
+ lazy val journal = Persistence(system).journalFor("")
+
+ import settings._
+
+ "DynamoDB Journal (persistAll)" must {
+
+ "recover correctly if the first write is a batch" in {
+ journal ! Purge(persistenceId, testActor)
+ expectMsg(Purged(persistenceId))
+
+ val start = nextSeqNr
+ val end = 10
+ println(s"start: ${start}; end: ${end}")
+ val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil
+
+ journal ! WriteMessages(padding, testActor, 1)
+ expectMsg(WriteMessagesSuccessful)
+ (start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))
+
+ journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
+ (start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
+ expectMsg(RecoverySuccess(end))
+ }
+
+ for (t <- Seq(("last", 3), ("middle", 2), ("first", 1)))
+ s"correctly cross page boundaries with AtomicWrite position ${t._1}" in {
+ val start1 = nextSeqNr
+ val end1 = ((start1 / PartitionSize) + 1) * PartitionSize - t._2
+ println(s"start: ${start1}; end: ${end1}")
+ val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil
+
+ journal ! WriteMessages(padding, testActor, 1)
+ expectMsg(WriteMessagesSuccessful)
+ (start1 to end1).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))
+
+ val start2 = nextSeqNr
+ val end2 = start2 + 2
+ println(s"start: ${start2}; end: ${end2}")
+ val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil
+
+ journal ! WriteMessages(subject, testActor, 1)
+ expectMsg(WriteMessagesSuccessful)
+ (start2 to end2).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))
+
+ journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
+ (start1 to end2).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
+ expectMsg(RecoverySuccess(end2))
+ }
+
+ s"recover correctly when the last partition event ends on ${PartitionSize - 1}" in {
+ val start = nextSeqNr
+ val end = ((start / PartitionSize) + 1) * PartitionSize - 1
+ println(s"start: ${start}; end: ${end}")
+ val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil
+
+ journal ! WriteMessages(padding, testActor, 1)
+ expectMsg(WriteMessagesSuccessful)
+ (start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))
+
+ journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
+ (start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
+ expectMsg(RecoverySuccess(end))
+ }
+
+ }
+
+}
diff --git a/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala b/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala
index ac4dd15..4b21d91 100644
--- a/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala
+++ b/src/test/scala/akka/persistence/dynamodb/journal/RecoveryConsistencySpec.scala
@@ -88,7 +88,7 @@ class RecoveryConsistencySpec
"read correct highest sequence number even if a Sort=0 entry is lost" in {
val start = messages + 19
- val end = (start / 100 + 1) * 100
+ val end = (start / PartitionSize + 1) * PartitionSize
val more = (start to end).map(i => AtomicWrite(persistentRepr(f"e-$i")))
journal ! WriteMessages(more, testActor, 1)
expectMsg(WriteMessagesSuccessful)
@@ -98,7 +98,8 @@ class RecoveryConsistencySpec
journal ! ListAll(persistenceId, testActor)
val ids = ((1L to (end - 1)).toSet -- Set[Long](2, 4, 12, 15).map(_ + messages)).toSeq.sorted
- expectMsg(ListAllResult(persistenceId, Set.empty, (1L to (end / 100)).map(_ * 100).toSet, ids))
+ expectMsg(
+ ListAllResult(persistenceId, Set.empty, (1L to (end / PartitionSize)).map(_ * PartitionSize).toSet, ids))
journal ! ReplayMessages(0, Long.MaxValue, 0, persistenceId, testActor)
expectMsg(RecoverySuccess(end))
@@ -157,8 +158,8 @@ class RecoveryConsistencySpec
private def delete(num: Long) = {
val key: Item = new JHMap
- key.put(Key, S(s"$JournalName-P-$persistenceId-${num / 100}"))
- key.put(Sort, N(num % 100))
+ key.put(Key, S(s"$JournalName-P-$persistenceId-${num / PartitionSize}"))
+ key.put(Sort, N(num % PartitionSize))
client.deleteItem(new DeleteItemRequest().withTableName(JournalTable).withKey(key)).futureValue
}
}