Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.1.x issue 98 #100

Merged
merged 13 commits into from
Feb 17, 2022
Merged
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ project/project/target
.settings/
.cache
.aws


.bloop
.metals
18 changes: 9 additions & 9 deletions project/Publish.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object Publish extends AutoPlugin {

val defaultPublishTo = settingKey[File]("Default publish directory")

override def trigger = allRequirements
override def trigger = allRequirements
override def requires = sbtrelease.ReleasePlugin

override lazy val projectSettings = Seq(
Expand All @@ -27,8 +27,7 @@ object Publish extends AutoPlugin {
homepage := Some(url("https://github.com/akka/akka-persistence-dynamodb")),
publishMavenStyle := true,
pomIncludeRepository := { x => false },
defaultPublishTo := crossTarget.value / "repository",
)
defaultPublishTo := crossTarget.value / "repository")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove change from PR? Or is this no longer required with the updated sbt-pgp plugin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't know. I just made an effort to make it build. I haven't actually done PGP signed publishes.

This part specifically I stumbled on in this commit:
akka/akka-persistence-dynamodb@0192e14

So it looks like it can do without.


def akkaPomExtra = {
<developers>
Expand All @@ -41,15 +40,16 @@ object Publish extends AutoPlugin {
</developers>
}

private def akkaPublishTo = Def.setting {
sonatypeRepo(version.value) orElse localRepo(defaultPublishTo.value)
}
private def akkaPublishTo =
Def.setting {
sonatypeRepo(version.value).orElse(localRepo(defaultPublishTo.value))
}

private def sonatypeRepo(version: String): Option[Resolver] =
Option(sys.props("publish.maven.central")) filter (_.toLowerCase == "true") map { _ =>
Option(sys.props("publish.maven.central")).filter(_.toLowerCase == "true").map { _ =>
val nexus = "https://oss.sonatype.org/"
if (version endsWith "-SNAPSHOT") "snapshots" at nexus + "content/repositories/snapshots"
else "releases" at nexus + "service/local/staging/deploy/maven2"
if (version.endsWith("-SNAPSHOT")) "snapshots".at(nexus + "content/repositories/snapshots")
else "releases".at(nexus + "service/local/staging/deploy/maven2")
}

private def localRepo(repository: File) =
Expand Down
18 changes: 9 additions & 9 deletions project/Whitesource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ object Whitesource extends AutoPlugin {
whitesourceProduct := "Lightbend Reactive Platform",
whitesourceAggregateProjectName := {
val projectName = (moduleName in LocalRootProject).value.replace("-root", "")
projectName + "-" + (
if (isSnapshot.value)
if (gitCurrentBranch.value == "master") "master"
else "adhoc"
else CrossVersion.partialVersion((version in LocalRootProject).value)
.map { case (major,minor) => s"$major.$minor-stable" }
.getOrElse("adhoc"))
projectName + "-" + (if (isSnapshot.value)
if (gitCurrentBranch.value == "master") "master"
else "adhoc"
else
CrossVersion
.partialVersion((version in LocalRootProject).value)
.map { case (major, minor) => s"$major.$minor-stable" }
.getOrElse("adhoc"))
},
whitesourceForceCheckAllDependencies := true,
whitesourceFailOnError := true
)
whitesourceFailOnError := true)
}
22 changes: 22 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,28 @@ dynamodb-journal {
parallelism-max = 8
}
}

fixes {
# Bug akka/akka-persistence-dynamodb#98 can cause an event source which is written successfully
# and is valid, but where the High Sequence number marker is missing. This causes incomplete
# replay of the event source. A root cause fix prevents this from occurring, but does not
# repair existing event sources. Writing new events to incompletely replayed persistent actors
# will corrupt the event source. If write aligns with an atomic write this will be hidden.
# This fix cannot recover corrupted event sources, but it attempts to avoid corruption from
# taking place where this hasn't happened yet. It does so by not fully trusting the high mark.
# If the last event in a partition is 99, it will attempt to chase the tail of the event
# source. This guards valid event sources with hidden tails from incomplete replay.
# For event sources not suffering from this problem there's 1% chance that this leads to a
# useless query.
# This is a performance/consistency tradeoff to be made. It should not be required for newly
# created event sources that have the root cause fix, hence it is off by default.
# NOTE: With current implementation of the high mark being written after the events,
# there is a slim chance that a valid event source is written, but a network error occurs
# before the high mark is written. In this case the write would be reported as failed to the
# writing party. Whether you want to "keep the event source" is up for discussion. But this fix
# would also recover from that situation.
high-distrust = false
}
}
dynamodb-snapshot-store {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
val MaxBatchWrite = c.getInt("aws-api-limits.max-batch-write")
val MaxItemSize = c.getInt("aws-api-limits.max-item-size")

object Fixes {
val HighDistrust = c.getBoolean("fixes.high-distrust")
}

val client = new DynamoDBClientConfig(c)
override def toString: String =
"DynamoDBJournalConfig(" +
Expand All @@ -40,6 +44,7 @@ class DynamoDBJournalConfig(c: Config) extends DynamoDBConfig {
",MaxBatchGet:" + MaxBatchGet +
",MaxBatchWrite:" + MaxBatchWrite +
",MaxItemSize:" + MaxItemSize +
",Fixes.HighDistrust:" + Fixes.HighDistrust +
",client.config:" + client +
")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ trait DynamoDBJournalRequests extends DynamoDBRequests {
item.put(AtomIndex, N(index))
item.put(AtomEnd, size)
putReq(item)
} ++ (if (low / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None)
} ++ (if ((low - 1) / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None)

val futures = writes.grouped(MaxBatchWrite).map { batch =>
dynamo.batchWriteItem(batchWriteReq(batch)).flatMap(r => sendUnprocessedItems(r))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
* for which it was written, all other entries do not update the highest value. Therefore we
* must scan the partition of this Sort=0 entry and find the highest occupied number.
*/
val request = eventQuery(persistenceId, start)
dynamo.query(request).flatMap(getRemainingQueryItems(request, _)).flatMap { result =>
getAllPartitionSequenceNrs(persistenceId, start).flatMap { result =>
if (result.getItems.isEmpty) {
/*
* If this comes back empty then that means that all events have been deleted. The only
Expand All @@ -285,6 +284,42 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else if (Fixes.HighDistrust) { // allows recovering from failed high mark setting
// this function will keep on chasing the event source tail
// if HighDistrust is enabled and as long as the partitionMax == PartitionSize - 1
def tailChase(partitionStart: Long, nextResults: QueryResult): Future[Long] = {
if (nextResults.getItems.isEmpty) {
// first iteraton will not pass here, as the query result is not empty
// if the new query result is empty the highest observed is partition -1
Future.successful(partitionStart - 1)
} else {
/*
* `partitionStart` is the Sort=0 entry’s sequence number, so add the maximum sort key.
*/
val partitionMax = nextResults.getItems.asScala.map(_.get(Sort).getN.toLong).max
val ret = partitionStart + partitionMax

if (partitionMax == PartitionSize - 1) {
val nextStart = ret + 1
getAllPartitionSequenceNrs(persistenceId, nextStart)
.map { logResult =>
if (!logResult.getItems().isEmpty()) // will only log if a follow-up query produced results
log.warning(
"readSequenceNr(highest=true persistenceId={}) tail found after {}",
persistenceId,
ret)
logResult
}
.flatMap(tailChase(nextStart, _))
} else
Future.successful(ret)
}
}

tailChase(start, result).map { ret =>
log.debug("readSequenceNr(highest=true persistenceId={}) = {}", persistenceId, ret)
ret
}
} else {
/*
* `start` is the Sort=0 entry’s sequence number, so add the maximum sort key.
Expand Down Expand Up @@ -438,15 +473,17 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
}
}

def getRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = {
private[dynamodb] def getAllRemainingQueryItems(request: QueryRequest, result: QueryResult): Future[QueryResult] = {
val last = result.getLastEvaluatedKey
if (last == null || last.isEmpty || last.get(Sort).getN.toLong == 99) Future.successful(result)
else {
dynamo.query(request.withExclusiveStartKey(last)).map { next =>
dynamo.query(request.withExclusiveStartKey(last)).flatMap { next =>
val merged = new ArrayList[Item](result.getItems.size + next.getItems.size)
merged.addAll(result.getItems)
merged.addAll(next.getItems)
next.withItems(merged)

// need to keep on reading until there's nothing more to read
getAllRemainingQueryItems(request, next.withItems(merged))
}
}
}
Expand All @@ -460,6 +497,11 @@ trait DynamoDBRecovery extends AsyncRecovery { this: DynamoDBJournal =>
.withProjectionExpression("num")
.withConsistentRead(true)

private[dynamodb] def getAllPartitionSequenceNrs(persistenceId: String, sequenceNr: Long) = {
val request = eventQuery(persistenceId, sequenceNr)
dynamo.query(request).flatMap(getAllRemainingQueryItems(request, _))
}

def batchGetReq(items: JMap[String, KeysAndAttributes]) =
new BatchGetItemRequest().withRequestItems(items).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
}
1 change: 1 addition & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ my-dynamodb-journal {
aws-access-key-id = "AWS_ACCESS_KEY_ID"
aws-secret-access-key = "AWS_SECRET_ACCESS_KEY"
tracing = off
fixes.high-distrust = true
}

my-dynamodb-snapshot-store = ${dynamodb-snapshot-store}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class DeletionSpec
* noisy logging, and I like my build output clean and green.
*/
Thread.sleep(500)
system.terminate().futureValue
client.shutdown()
system.terminate().futureValue
super.afterAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.scalatest.WordSpecLike
class DynamoPartitionGroupedSpec extends TestKit(ActorSystem("DynamoPartitionGroupedSpec")) with WordSpecLike {
implicit val materializer = ActorMaterializer()

assert(PartitionSize == 100, "This test is only valid with PartitionSize == 100")

"A DynamoPartitionGroup should create the correct PartitionKey outputs" when {
"events 1 thru 250 are presented" in {
val sourceUnderTest =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright (C) 2021 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.dynamodb.journal

import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest._
import org.scalatest.concurrent.ScalaFutures
import akka.actor.ActorSystem
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.testkit._
import akka.persistence.journal.AsyncWriteTarget.ReplaySuccess
import com.amazonaws.services.dynamodbv2.model._
import java.util.{ HashMap => JHMap }
import akka.persistence.dynamodb._

class PersistAllConsistencySpec
extends TestKit(ActorSystem("PersistAllConsistencySpec"))
with ImplicitSender
with WordSpecLike
with BeforeAndAfterAll
with Matchers
with ScalaFutures
with ConversionCheckedTripleEquals
with DynamoDBUtils
with IntegSpec {

override def beforeAll(): Unit = {
super.beforeAll()
ensureJournalTableExists()
}

override def afterAll(): Unit = {
client.shutdown()
system.terminate().futureValue
super.afterAll()
}

override val persistenceId = "PersistAllConsistencySpec"
lazy val journal = Persistence(system).journalFor("")

import settings._

"DynamoDB Journal (persistAll)" must {

"recover correctly if the first write is a batch" in {
journal ! Purge(persistenceId, testActor)
expectMsg(Purged(persistenceId))

val start = nextSeqNr
val end = 10
println(s"start: ${start}; end: ${end}")
val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end))
}

for (t <- Seq(("last", 3), ("middle", 2), ("first", 1)))
s"correctly cross page boundaries with AtomicWrite position ${t._1}" in {
val start1 = nextSeqNr
val end1 = ((start1 / PartitionSize) + 1) * PartitionSize - t._2
println(s"start: ${start1}; end: ${end1}")
val padding = AtomicWrite((start1 to end1).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start1 to end1).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

val start2 = nextSeqNr
val end2 = start2 + 2
println(s"start: ${start2}; end: ${end2}")
val subject = AtomicWrite((start2 to end2).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(subject, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start2 to end2).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start1, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start1 to end2).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end2))
}

s"recover correctly when the last partition event ends on ${PartitionSize - 1}" in {
val start = nextSeqNr
val end = ((start / PartitionSize) + 1) * PartitionSize - 1
println(s"start: ${start}; end: ${end}")
val padding = AtomicWrite((start to end).map(i => persistentRepr(f"h-$i"))) :: Nil

journal ! WriteMessages(padding, testActor, 1)
expectMsg(WriteMessagesSuccessful)
(start to end).foreach(i => expectMsg(WriteMessageSuccess(generatedMessages(i), 1)))

journal ! ReplayMessages(start, Long.MaxValue, Long.MaxValue, persistenceId, testActor)
(start to end).foreach(i => expectMsg(ReplayedMessage(generatedMessages(i))))
expectMsg(RecoverySuccess(end))
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class RecoveryConsistencySpec

"read correct highest sequence number even if a Sort=0 entry is lost" in {
val start = messages + 19
val end = (start / 100 + 1) * 100
val end = (start / PartitionSize + 1) * PartitionSize
val more = (start to end).map(i => AtomicWrite(persistentRepr(f"e-$i")))
journal ! WriteMessages(more, testActor, 1)
expectMsg(WriteMessagesSuccessful)
Expand All @@ -98,7 +98,8 @@ class RecoveryConsistencySpec

journal ! ListAll(persistenceId, testActor)
val ids = ((1L to (end - 1)).toSet -- Set[Long](2, 4, 12, 15).map(_ + messages)).toSeq.sorted
expectMsg(ListAllResult(persistenceId, Set.empty, (1L to (end / 100)).map(_ * 100).toSet, ids))
expectMsg(
ListAllResult(persistenceId, Set.empty, (1L to (end / PartitionSize)).map(_ * PartitionSize).toSet, ids))

journal ! ReplayMessages(0, Long.MaxValue, 0, persistenceId, testActor)
expectMsg(RecoverySuccess(end))
Expand Down Expand Up @@ -157,8 +158,8 @@ class RecoveryConsistencySpec

private def delete(num: Long) = {
val key: Item = new JHMap
key.put(Key, S(s"$JournalName-P-$persistenceId-${num / 100}"))
key.put(Sort, N(num % 100))
key.put(Key, S(s"$JournalName-P-$persistenceId-${num / PartitionSize}"))
key.put(Sort, N(num % PartitionSize))
client.deleteItem(new DeleteItemRequest().withTableName(JournalTable).withKey(key)).futureValue
}
}