diff --git a/build.sbt b/build.sbt index 9e7987d5..c40ba8f6 100644 --- a/build.sbt +++ b/build.sbt @@ -2,12 +2,30 @@ name := "fs2-aws" scalaVersion := "2.12.10" lazy val root = (project in file(".")) - .aggregate(`fs2-aws`, `fs2-aws-testkit`) + .aggregate(`fs2-aws`, `fs2-aws-testkit`, `fs2-aws-dynamodb`, `fs2-aws-core`, `fs2-aws-examples`) + .settings( + skip in publish := true + ) + +lazy val `fs2-aws-core` = (project in file("fs2-aws-core")) + .settings(commonSettings) + .settings(publishSettings) + +lazy val `fs2-aws-dynamodb` = (project in file("fs2-aws-dynamodb")) + .dependsOn(`fs2-aws-core`) + .settings(commonSettings) + .settings(publishSettings) + +lazy val `fs2-aws-examples` = (project in file("fs2-aws-examples")) + .dependsOn(`fs2-aws-dynamodb`) + .settings(commonSettings) + .settings(publishSettings) .settings( skip in publish := true ) lazy val `fs2-aws` = (project in file("fs2-aws")) + .dependsOn(`fs2-aws-core`) .settings(commonSettings) .settings(publishSettings) @@ -37,6 +55,25 @@ lazy val commonSettings = Seq( ) ) +addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.10") + +// publish +publishTo in ThisBuild := Some( + "Sonatype Nexus" at "https://oss.sonatype.org/service/local/staging/deploy/maven2" +) + +licenses in ThisBuild := Seq( + "MIT" -> url("https://github.com/dmateusp/fs2-aws/blob/master/LICENSE") +) +developers in ThisBuild := List( + Developer( + id = "dmateusp", + name = "Daniel Mateus Pires", + email = "dmateusp@gmail.com", + url = url("https://github.com/dmateusp") + ) +) + lazy val publishSettings = Seq( publishMavenStyle := true, Test / publishArtifact := true, diff --git a/fs2-aws-core/build.sbt b/fs2-aws-core/build.sbt new file mode 100644 index 00000000..f3805ed3 --- /dev/null +++ b/fs2-aws-core/build.sbt @@ -0,0 +1,18 @@ +name := "fs2-aws-core" + +// coverage +coverageMinimum := 40 +coverageFailOnMinimum := true + +scalaVersion := "2.12.10" + +val fs2Version = "2.2.2" + +libraryDependencies ++= Seq( + "co.fs2" %% "fs2-core" % fs2Version, + "co.fs2" %% "fs2-io" % fs2Version, + "org.typelevel" %% "alleycats-core" % "2.1.0", + "org.scalatest" %% "scalatest" % "3.1.0" % Test, + "org.mockito" % "mockito-core" % "3.2.4" % Test, + "org.mockito" %% "mockito-scala-scalatest" % "1.11.2" % Test +) diff --git a/fs2-aws/src/main/scala/fs2/aws/internal/package.scala b/fs2-aws-core/src/main/scala/fs2/aws/core/kinesis/package.scala similarity index 98% rename from fs2-aws/src/main/scala/fs2/aws/internal/package.scala rename to fs2-aws-core/src/main/scala/fs2/aws/core/kinesis/package.scala index b787e688..fa977205 100644 --- a/fs2-aws/src/main/scala/fs2/aws/internal/package.scala +++ b/fs2-aws-core/src/main/scala/fs2/aws/core/kinesis/package.scala @@ -6,7 +6,7 @@ import cats.implicits._ import fs2.concurrent.Queue import fs2.{ Pipe, Stream } -package object internal { +package object core { /** Helper flow to group elements of a stream into K substreams. * Grows with the number of distinct 'K' selectors diff --git a/fs2-aws/src/test/scala/fs2/aws/InternalSpec.scala b/fs2-aws-core/src/test/scala/fs2/aws/core/InternalSpec.scala similarity index 96% rename from fs2-aws/src/test/scala/fs2/aws/InternalSpec.scala rename to fs2-aws-core/src/test/scala/fs2/aws/core/InternalSpec.scala index e27e6cbd..31372279 100644 --- a/fs2-aws/src/test/scala/fs2/aws/InternalSpec.scala +++ b/fs2-aws-core/src/test/scala/fs2/aws/core/InternalSpec.scala @@ -1,8 +1,7 @@ -package fs2.aws +package fs2.aws.core -import fs2.Stream import cats.effect.{ ContextShift, IO } -import fs2.aws.internal._ +import fs2.Stream import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers diff --git a/fs2-aws-dynamodb/build.sbt b/fs2-aws-dynamodb/build.sbt new file mode 100644 index 00000000..d6d39424 --- /dev/null +++ b/fs2-aws-dynamodb/build.sbt @@ -0,0 +1,22 @@ +name := "fs2-aws-dynamodb" + +// coverage +coverageMinimum := 40 +coverageFailOnMinimum := true + +scalaVersion := "2.12.10" + +val fs2Version = "2.2.2" +val AwsSdkVersion = "1.11.716" +val cirisVersion = "0.12.1" + +libraryDependencies ++= Seq( + "co.fs2" %% "fs2-core" % fs2Version, + "co.fs2" %% "fs2-io" % fs2Version, + "org.typelevel" %% "alleycats-core" % "2.1.0", + "org.scalatest" %% "scalatest" % "3.1.0" % Test, + "org.mockito" % "mockito-core" % "3.2.4" % Test, + "org.mockito" %% "mockito-scala-scalatest" % "1.11.2" % Test, + "com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.0", + "io.laserdisc" %% "scanamo-circe" % "1.0.5" +) diff --git a/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/CommittableRecord.scala b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/CommittableRecord.scala new file mode 100644 index 00000000..c2cc0f70 --- /dev/null +++ b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/CommittableRecord.scala @@ -0,0 +1,34 @@ +package fs2.aws.dynamodb + +import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer + +/** A message type from Kinesis which has not yet been commited or checkpointed. + * + * @constructor create a new commitable record with a name and age. + * @param shardId the unique identifier for the shard from which this record originated + * @param millisBehindLatest ms behind the latest record, used to detect if the consumer is lagging the producer + * @param record the original record document from Kinesis + * @param recordProcessor reference to the record processor that is responsible for processing this message + * @param checkpointer reference to the checkpointer used to commit this record + */ +case class CommittableRecord( + shardId: String, + recordProcessorStartingSequenceNumber: ExtendedSequenceNumber, + millisBehindLatest: Long, + record: RecordAdapter, + recordProcessor: RecordProcessor, + checkpointer: IRecordProcessorCheckpointer +) { + val sequenceNumber: String = record.getSequenceNumber + + def canCheckpoint(): Boolean = !recordProcessor.isShutdown + def checkpoint(): Unit = checkpointer.checkpoint(record) +} + +object CommittableRecord { + + implicit val orderBySequenceNumber: Ordering[CommittableRecord] = + Ordering[String].on(cr ⇒ cr.sequenceNumber) +} diff --git a/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/KinesisStreamSettings.scala b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/KinesisStreamSettings.scala new file mode 100644 index 00000000..758b579c --- /dev/null +++ b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/KinesisStreamSettings.scala @@ -0,0 +1,56 @@ +package fs2.aws.dynamodb + +import scala.concurrent.duration._ + +/** Settings for configuring the Kinesis consumer stream + * + * @param bufferSize size of the internal buffer used when reading messages from Kinesis + */ +class KinesisStreamSettings private ( + val bufferSize: Int, + val terminateGracePeriod: FiniteDuration +) + +/** Settings for configuring the Kinesis checkpointer pipe + * + * @param maxBatchSize the maximum number of records to aggregate before checkpointing the cluster of records. Passing 1 means checkpoint on every record + * @param maxBatchWait the maximum amount of time to wait before checkpointing the cluster of records + */ +class KinesisCheckpointSettings private ( + val maxBatchSize: Int, + val maxBatchWait: FiniteDuration +) + +object KinesisStreamSettings { + val defaultInstance: KinesisStreamSettings = new KinesisStreamSettings(10, 10.seconds) + + def apply( + bufferSize: Int, + terminateGracePeriod: FiniteDuration + ): Either[Throwable, KinesisStreamSettings] = + (bufferSize, terminateGracePeriod) match { + case (bs, _) if bs < 1 => Left(new RuntimeException("Must be greater than 0")) + case (bs, period) => Right(new KinesisStreamSettings(bufferSize, period)) + } +} + +object KinesisCheckpointSettings { + val defaultInstance = new KinesisCheckpointSettings(1000, 10.seconds) + + def apply( + maxBatchSize: Int, + maxBatchWait: FiniteDuration + ): Either[Throwable, KinesisCheckpointSettings] = + (maxBatchSize, maxBatchWait) match { + case (s, _) if s <= 0 => + Left(new RuntimeException("Must be greater than 0")) + case (_, w) if w <= 0.milliseconds => + Left( + new RuntimeException( + "Must be greater than 0 milliseconds. To checkpoint immediately, pass 1 to the max batch size." + ) + ) + case (s, w) => + Right(new KinesisCheckpointSettings(s, w)) + } +} diff --git a/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/RecordProcessor.scala b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/RecordProcessor.scala new file mode 100644 index 00000000..63c86997 --- /dev/null +++ b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/RecordProcessor.scala @@ -0,0 +1,61 @@ +package fs2.aws.dynamodb + +import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter +import com.amazonaws.services.kinesis.clientlibrary.interfaces._ +import com.amazonaws.services.kinesis.clientlibrary.types._ +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.FiniteDuration + +/** Concrete implementation of the AWS RecordProcessor interface. + * Wraps incoming records into CommitableRecord types to allow for downstream + * checkpointing + * + * @constructor create a new instance with a callback function to perform on record receive + * @param cb callback function to run on record receive, passing the new CommittableRecord + */ +class RecordProcessor( + cb: CommittableRecord => Unit, + terminateGracePeriod: FiniteDuration +) extends v2.IRecordProcessor { + private var shardId: String = _ + private var extendedSequenceNumber: ExtendedSequenceNumber = _ + var latestCheckpointer: Option[IRecordProcessorCheckpointer] = None + var shutdown: Option[ShutdownReason] = None + + def isShutdown = shutdown.isDefined + + override def initialize(initializationInput: InitializationInput): Unit = { + shardId = initializationInput.getShardId + extendedSequenceNumber = initializationInput.getExtendedSequenceNumber + } + + override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { + latestCheckpointer = Some(processRecordsInput.getCheckpointer) + processRecordsInput.getRecords.asScala.foreach { record => + cb( + CommittableRecord( + shardId, + extendedSequenceNumber, + processRecordsInput.getMillisBehindLatest, + record.asInstanceOf[RecordAdapter], + recordProcessor = this, + processRecordsInput.getCheckpointer + ) + ) + } + } + + override def shutdown(shutdownInput: ShutdownInput): Unit = { + shutdown = Some(shutdownInput.getShutdownReason) + latestCheckpointer = Some(shutdownInput.getCheckpointer) + shutdownInput.getShutdownReason match { + case ShutdownReason.TERMINATE => + Thread.sleep(terminateGracePeriod.toMillis) + latestCheckpointer.foreach(_.checkpoint()) + case ShutdownReason.ZOMBIE => () + case ShutdownReason.REQUESTED => () + } + } +} diff --git a/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/package.scala b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/package.scala new file mode 100644 index 00000000..8557e586 --- /dev/null +++ b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/package.scala @@ -0,0 +1,197 @@ +package fs2.aws + +import cats.effect.{ ConcurrentEffect, Effect, IO, Sync, Timer } +import cats.implicits._ +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.regions.Regions +import com.amazonaws.services.cloudwatch.{ AmazonCloudWatch, AmazonCloudWatchClientBuilder } +import com.amazonaws.services.dynamodbv2.{ + AmazonDynamoDB, + AmazonDynamoDBClientBuilder, + AmazonDynamoDBStreams, + AmazonDynamoDBStreamsClientBuilder +} +import com.amazonaws.services.dynamodbv2.streamsadapter.{ + AmazonDynamoDBStreamsAdapterClient, + StreamsWorkerFactory +} +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ + InitialPositionInStream, + KinesisClientLibConfiguration, + Worker +} +import com.amazonaws.services.kinesis.model.Record +import fs2.concurrent.Queue +import fs2.{ Pipe, Stream } + +//This code is almost the same as for Kinesis, except that it based on KCL v1, because DynamoDB streams are not migrated to V2 +// this is why a lot of copy-paste + +package object dynamodb { + private def defaultWorker(recordProcessorFactory: IRecordProcessorFactory)( + workerConfiguration: KinesisClientLibConfiguration, + dynamoDBStreamsClient: AmazonDynamoDBStreams, + dynamoDBClient: AmazonDynamoDB, + cloudWatchClient: AmazonCloudWatch + ): Worker = { + val adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient) + StreamsWorkerFactory.createDynamoDbStreamsWorker( + recordProcessorFactory, + workerConfiguration, + adapterClient, + dynamoDBClient, + cloudWatchClient + ) + } + + /** Intialize a worker and start streaming records from a Kinesis stream + * On stream finish (due to error or other), worker will be shutdown + * + * @tparam F effect type of the fs2 stream + * @param appName name of the Kinesis application. Used by KCL when resharding + * @param streamName name of the Kinesis stream to consume from + * @return an infinite fs2 Stream that emits Kinesis Records + */ + def readFromDynamDBStream[F[_]]( + appName: String, + streamName: String + )(implicit F: ConcurrentEffect[F]): fs2.Stream[F, CommittableRecord] = { + val workerConfig = new KinesisClientLibConfiguration( + appName, + streamName, + DefaultAWSCredentialsProviderChain.getInstance(), + s"${ + import scala.sys.process._ + "hostname".!!.trim() + }:${java.util.UUID.randomUUID()}" + ).withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) + + readFromDynamoDBStream(workerConfig) + } + + /** Intialize a worker and start streaming records from a Kinesis stream + * On stream finish (due to error or other), worker will be shutdown + * + * @tparam F effect type of the fs2 stream + * @param streamConfig configuration for the internal stream + * @return an infinite fs2 Stream that emits Kinesis Records + */ + def readFromDynamoDBStream[F[_]]( + workerConfiguration: KinesisClientLibConfiguration, + dynamoDBStreamsClient: AmazonDynamoDBStreams = + AmazonDynamoDBStreamsClientBuilder.standard().withRegion(Regions.US_EAST_1).build(), + dynamoDBClient: AmazonDynamoDB = + AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_EAST_1).build(), + cloudWatchClient: AmazonCloudWatch = + AmazonCloudWatchClientBuilder.standard().withRegion(Regions.US_EAST_1).build(), + streamConfig: KinesisStreamSettings = KinesisStreamSettings.defaultInstance + )(implicit F: ConcurrentEffect[F]): fs2.Stream[F, CommittableRecord] = + readFromDynamoDBStream( + defaultWorker(_)( + workerConfiguration, + dynamoDBStreamsClient, + dynamoDBClient, + cloudWatchClient + ), + streamConfig + ) + + /** Intialize a worker and start streaming records from a Kinesis stream + * On stream finish (due to error or other), worker will be shutdown + * + * @tparam F effect type of the fs2 stream + * @param workerFactory function to create a Worker from a IRecordProcessorFactory + * @param streamConfig configuration for the internal stream + * @return an infinite fs2 Stream that emits Kinesis Records + */ + private[aws] def readFromDynamoDBStream[F[_]: ConcurrentEffect]( + workerFactory: =>IRecordProcessorFactory => Worker, + streamConfig: KinesisStreamSettings + ): fs2.Stream[F, CommittableRecord] = { + + // Initialize a KCL worker which appends to the internal stream queue on message receipt + def instantiateWorker(queue: Queue[F, CommittableRecord]): F[Worker] = Sync[F].delay { + workerFactory(() => + new RecordProcessor( + record => Effect[F].runAsync(queue.enqueue1(record))(_ => IO.unit).unsafeRunSync, + streamConfig.terminateGracePeriod + ) + ) + } + + // Instantiate a new bounded queue and concurrently run the queue populator + // Expose the elements by dequeuing the internal buffer + for { + buffer <- Stream.eval(Queue.bounded[F, CommittableRecord](streamConfig.bufferSize)) + worker = instantiateWorker(buffer) + stream <- buffer.dequeue concurrently Stream.eval(worker.map(_.run)) onFinalize worker.map( + _.shutdown + ) + } yield stream + } + + /** Pipe to checkpoint records in Kinesis, marking them as processed + * Groups records by shard id, so that each shard is subject to its own clustering of records + * After accumulating maxBatchSize or reaching maxBatchWait for a respective shard, the latest record is checkpointed + * By design, all records prior to the checkpointed record are also checkpointed in Kinesis + * + * @tparam F effect type of the fs2 stream + * @param checkpointSettings configure maxBatchSize and maxBatchWait time before triggering a checkpoint + * @return a stream of Record types representing checkpointed messages + */ + def checkpointRecords[F[_]]( + checkpointSettings: KinesisCheckpointSettings = KinesisCheckpointSettings.defaultInstance, + parallelism: Int = 10 + )( + implicit F: ConcurrentEffect[F], + timer: Timer[F] + ): Pipe[F, CommittableRecord, Record] = { + def checkpoint(checkpointSettings: KinesisCheckpointSettings, parallelism: Int)( + implicit F: ConcurrentEffect[F], + timer: Timer[F] + ): Pipe[F, CommittableRecord, Record] = + _.groupWithin(checkpointSettings.maxBatchSize, checkpointSettings.maxBatchWait) + .collect { case chunk if chunk.size > 0 => chunk.toList.max } + .flatMap { cr => + fs2.Stream.eval_( + F.async[Record] { cb => + if (cr.canCheckpoint()) { + cr.checkpoint() + cb(Right(cr.record)) + } else { + cb( + Left( + new RuntimeException( + "Record processor has been shutdown and therefore cannot checkpoint records" + ) + ) + ) + } + } + ) + } + + def bypass: Pipe[F, CommittableRecord, Record] = _.map(r => r.record) + + _.through(core.groupBy(r => F.delay(r.shardId))).map { + case (_, st) => + st.broadcastThrough(checkpoint(checkpointSettings, parallelism), bypass) + }.parJoinUnbounded + } + + /** Sink to checkpoint records in Kinesis, marking them as processed + * Groups records by shard id, so that each shard is subject to its own clustering of records + * After accumulating maxBatchSize or reaching maxBatchWait for a respective shard, the latest record is checkpointed + * By design, all records prior to the checkpointed record are also checkpointed in Kinesis + * + * @tparam F effect type of the fs2 stream + * @param checkpointSettings configure maxBatchSize and maxBatchWait time before triggering a checkpoint + * @return a Sink that accepts a stream of CommittableRecords + */ + def checkpointRecords_[F[_]]( + checkpointSettings: KinesisCheckpointSettings = KinesisCheckpointSettings.defaultInstance + )(implicit F: ConcurrentEffect[F], timer: Timer[F]): Pipe[F, CommittableRecord, Unit] = + _.through(checkpointRecords(checkpointSettings)) + .map(_ => ()) +} diff --git a/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/parsers/BeforeAfter.scala b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/parsers/BeforeAfter.scala new file mode 100644 index 00000000..ba082536 --- /dev/null +++ b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/parsers/BeforeAfter.scala @@ -0,0 +1,7 @@ +package fs2.aws.dynamodb.parsers + +sealed trait DynamoDBBeforeAfter[T] + +case class Insert[T](after: T) extends DynamoDBBeforeAfter[T] +case class Update[T](before: T, after: T) extends DynamoDBBeforeAfter[T] +case class Delete[T](before: T) extends DynamoDBBeforeAfter[T] diff --git a/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/parsers/package.scala b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/parsers/package.scala new file mode 100644 index 00000000..ff0a6cef --- /dev/null +++ b/fs2-aws-dynamodb/src/main/scala/fs2/aws/dynamodb/parsers/package.scala @@ -0,0 +1,38 @@ +package fs2.aws.dynamodb + +import java.util + +import cats.effect.Sync +import cats.implicits._ +import com.amazonaws.services.dynamodbv2.model.AttributeValue +import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter +import org.scanamo.{ DynamoFormat, DynamoObject, DynamoReadError } + +package object parsers { + def parseDynamoEvent[F[_]: Sync, T]( + record: RecordAdapter + )(implicit df: DynamoFormat[T]): F[DynamoDBBeforeAfter[T]] = + record.getInternalObject.getEventName match { + case "INSERT" => + parseDynamoRecord(record.getInternalObject.getDynamodb.getNewImage) + .map(after => Insert(after)) + case "MODIFY" => + for { + before <- parseDynamoRecord(record.getInternalObject.getDynamodb.getOldImage) + after <- parseDynamoRecord(record.getInternalObject.getDynamodb.getNewImage) + } yield Update(before, after) + case "REMOVE" => + parseDynamoRecord(record.getInternalObject.getDynamodb.getOldImage) + .map(after => Insert(after)) + } + + def parseDynamoRecord[F[_]: Sync, T]( + image: util.Map[String, AttributeValue] + )(implicit df: DynamoFormat[T]): F[T] = + df.read( + DynamoObject(image).toAttributeValue + ) + .leftMap(e => new RuntimeException(DynamoReadError.describe(e))) + .liftTo[F] + +} diff --git a/fs2-aws-dynamodb/src/test/scala/fs2/aws/dynamodb/DynamoDBConsumerSpec.scala b/fs2-aws-dynamodb/src/test/scala/fs2/aws/dynamodb/DynamoDBConsumerSpec.scala new file mode 100644 index 00000000..d3fd1fd9 --- /dev/null +++ b/fs2-aws-dynamodb/src/test/scala/fs2/aws/dynamodb/DynamoDBConsumerSpec.scala @@ -0,0 +1,366 @@ +package fs2 +package aws + +import java.util.Date +import java.util.concurrent.Semaphore + +import cats.effect.{ ContextShift, IO, Timer } +import com.amazonaws.services.dynamodbv2.model +import com.amazonaws.services.dynamodbv2.model.{ AttributeValue, StreamRecord } +import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.{ + IRecordProcessor, + IRecordProcessorFactory +} +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ ShutdownReason, Worker } +import com.amazonaws.services.kinesis.clientlibrary.types._ +import com.amazonaws.services.kinesis.model.Record +import fs2.aws.dynamodb._ +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.time._ + +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.{ ExecutionContext, Future } + +class DynamoDBConsumerSpec + extends AnyFlatSpec + with Matchers + with BeforeAndAfterEach + with Eventually { + + implicit val ec: ExecutionContext = ExecutionContext.global + implicit val timer: Timer[IO] = IO.timer(ec) + implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec) + + implicit override val patienceConfig: PatienceConfig = + PatienceConfig(timeout = scaled(Span(2, Seconds)), interval = scaled(Span(5, Millis))) + + "KinesisWorker source" should "successfully read data from the Kinesis stream" in new WorkerContext + with TestData { + semaphore.acquire() + recordProcessor.initialize(initializationInput) + recordProcessor.processRecords(recordsInput) + + eventually(verify(mockWorker, times(1)).run()) + + eventually(timeout(1.second)) { + val commitableRecord = output.head + commitableRecord.record.getData should be(record.getData) + commitableRecord.recordProcessorStartingSequenceNumber shouldBe initializationInput.getExtendedSequenceNumber + commitableRecord.shardId shouldBe initializationInput.getShardId + commitableRecord.millisBehindLatest shouldBe recordsInput.getMillisBehindLatest + } + semaphore.release() + } + + it should "not shutdown the worker if the stream is drained but has not failed" in new WorkerContext + with TestData { + semaphore.acquire() + recordProcessor.initialize(initializationInput) + recordProcessor.processRecords(recordsInput) + + eventually(verify(mockWorker, times(0)).shutdown()) + semaphore.release() + } + + it should "shutdown the worker if the stream terminates" in new WorkerContext(errorStream = true) + with TestData { + semaphore.acquire() + recordProcessor.initialize(initializationInput) + recordProcessor.processRecords(recordsInput) + + eventually(verify(mockWorker, times(1)).shutdown()) + } + + it should "not drop messages in case of back-pressure" in new WorkerContext with TestData { + semaphore.acquire() + // Create and send 10 records (to match buffer size) + for (i <- 1 to 10) { + val record: Record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn(i.toString) + recordProcessor.processRecords(recordsInput.withRecords(List(record).asJava)) + } + + // Should process all 10 messages + eventually(output.size shouldBe (10)) + + // Send a batch that exceeds the internal buffer size + for (i <- 1 to 50) { + val record: Record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn(i.toString) + recordProcessor.processRecords(recordsInput.withRecords(List(record).asJava)) + } + + // Should have processed all 60 messages + eventually(output.size shouldBe (60)) + + eventually(verify(mockWorker, times(0)).shutdown()) + semaphore.release() + } + + it should "not drop messages in case of back-pressure with multiple shard workers" in new WorkerContext + with TestData { + semaphore.acquire() + recordProcessor.initialize(initializationInput) + recordProcessor2.initialize(initializationInput.withShardId("shard2")) + + // Create and send 10 records (to match buffer size) + for (i <- 1 to 5) { + val record: Record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn(i.toString) + recordProcessor.processRecords(recordsInput.withRecords(List(record).asJava)) + recordProcessor2.processRecords(recordsInput.withRecords(List(record).asJava)) + } + + // Should process all 10 messages + eventually(output.size shouldBe (10)) + + // Each shard is assigned its own worker thread, so we get messages + // from each thread simultaneously. + def simulateWorkerThread(rp: IRecordProcessor): Future[Unit] = + Future { + for (i <- 1 to 25) { // 10 is a buffer size + val record: Record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn(i.toString) + rp.processRecords(recordsInput.withRecords(List(record).asJava)) + } + } + + simulateWorkerThread(recordProcessor) + simulateWorkerThread(recordProcessor2) + + // Should have processed all 60 messages + eventually(output.size shouldBe (60)) + semaphore.release() + } + + "KinesisWorker checkpoint pipe" should "checkpoint batch of records with same sequence number" in new KinesisWorkerCheckpointContext { + val input: immutable.IndexedSeq[CommittableRecord] = (1 to 3) map { i => + val record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn(i.toString) + new CommittableRecord( + "shard-1", + mock(classOf[ExtendedSequenceNumber]), + 1L, + record, + recordProcessor, + checkpointerShard1 + ) + } + + startStream(input) + + eventually(timeout(1.second)) { + verify(checkpointerShard1).checkpoint(input.last.record) + } + } + + it should "checkpoint batch of records of different shards" in new KinesisWorkerCheckpointContext { + val checkpointerShard2: IRecordProcessorCheckpointer = + mock(classOf[IRecordProcessorCheckpointer]) + + val input: immutable.IndexedSeq[CommittableRecord] = (1 to 6) map { i => + if (i <= 3) { + val record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn(i.toString) + new CommittableRecord( + "shard-1", + mock(classOf[ExtendedSequenceNumber]), + i, + record, + recordProcessor, + checkpointerShard1 + ) + } else { + val record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn(i.toString) + new CommittableRecord( + "shard-2", + mock(classOf[ExtendedSequenceNumber]), + i, + record, + recordProcessor, + checkpointerShard2 + ) + } + } + + startStream(input) + + eventually(timeout(3.seconds)) { + verify(checkpointerShard1).checkpoint(input(2).record) + verify(checkpointerShard2).checkpoint(input.last.record) + } + + } + + it should "not checkpoint the batch if the IRecordProcessor has been shutdown with ZOMBIE reason" in new KinesisWorkerCheckpointContext { + recordProcessor.shutdown( + new ShutdownInput() + .withShutdownReason(ShutdownReason.ZOMBIE) + .withCheckpointer(checkpointerShard1) + ) + + val input: immutable.IndexedSeq[CommittableRecord] = (1 to 3) map { i => + val record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn("1") + new CommittableRecord( + "shard-1", + mock(classOf[ExtendedSequenceNumber]), + 1L, + record, + recordProcessor, + checkpointerShard1 + ) + } + + startStream(input) + + verify(checkpointerShard1, times(0)).checkpoint() + } + + it should "checkpoint one last time if the IRecordProcessor has been shutdown with TERMINATE reason" in new KinesisWorkerCheckpointContext { + recordProcessor.shutdown( + new ShutdownInput() + .withShutdownReason(ShutdownReason.TERMINATE) + .withCheckpointer(checkpointerShard1) + ) + + val input: immutable.IndexedSeq[CommittableRecord] = (1 to 3) map { i => + val record = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn("1") + new CommittableRecord( + "shard-1", + mock(classOf[ExtendedSequenceNumber]), + 1L, + record, + recordProcessor, + checkpointerShard1 + ) + } + + startStream(input) + + verify(checkpointerShard1, times(1)).checkpoint() + } + + it should "fail with Exception if checkpoint action fails" in new KinesisWorkerCheckpointContext { + val checkpointer: IRecordProcessorCheckpointer = mock(classOf[IRecordProcessorCheckpointer]) + + val record: RecordAdapter = mock(classOf[RecordAdapter]) + when(record.getSequenceNumber).thenReturn("1") + + val input = new CommittableRecord( + "shard-1", + mock(classOf[ExtendedSequenceNumber]), + 1L, + record, + recordProcessor, + checkpointer + ) + + val failure = new RuntimeException("you have no power here") + when(checkpointer.checkpoint(record)).thenThrow(failure) + + the[RuntimeException] thrownBy fs2.Stream + .emits(Seq(input)) + .through(checkpointRecords[IO](settings)) + .compile + .toVector + .unsafeRunSync should have message "you have no power here" + + eventually(verify(checkpointer).checkpoint(input.record)) + } + + abstract private class WorkerContext( + backpressureTimeout: FiniteDuration = 1.minute, + errorStream: Boolean = false + ) { + + val semaphore = new Semaphore(1) + semaphore.acquire() + var output: List[CommittableRecord] = List() + + protected val mockWorker: Worker = mock(classOf[Worker]) + + when(mockWorker.run()).thenAnswer(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = () + }) + + when(mockWorker.shutdown()).thenAnswer(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = () + }) + + var recordProcessorFactory: IRecordProcessorFactory = _ + var recordProcessor: IRecordProcessor = _ + var recordProcessor2: IRecordProcessor = _ + + val builder: IRecordProcessorFactory => Worker = { x: IRecordProcessorFactory => + recordProcessorFactory = x + recordProcessor = x.createProcessor() + recordProcessor2 = x.createProcessor() + semaphore.release() + mockWorker + } + + val config: KinesisStreamSettings = KinesisStreamSettings(bufferSize = 10, 10.seconds).right.get + + val stream: Unit = + readFromDynamoDBStream[IO](builder, config) + .through(_.evalMap(i => IO(output = output :+ i))) + .map(i => if (errorStream) throw new Exception("boom") else i) + .compile + .toVector + .unsafeRunAsync(_ => ()) + } + + private trait TestData { + protected val checkpointer: IRecordProcessorCheckpointer = mock( + classOf[IRecordProcessorCheckpointer] + ) + + val initializationInput: InitializationInput = { + new InitializationInput() + .withShardId("shardId") + .withExtendedSequenceNumber(ExtendedSequenceNumber.AT_TIMESTAMP) + } + val record: Record = + new RecordAdapter( + new model.Record() + .withDynamodb(new StreamRecord().addNewImageEntry("name", new AttributeValue("Barry"))) + ).withApproximateArrivalTimestamp(new Date()) + .withEncryptionType("encryption") + + val recordsInput: ProcessRecordsInput = + new ProcessRecordsInput() + .withCheckpointer(checkpointer) + .withMillisBehindLatest(1L) + .withRecords(List(record).asJava) + } + + private trait KinesisWorkerCheckpointContext { + val recordProcessor = new RecordProcessor(_ => (), 1.seconds) + val checkpointerShard1: IRecordProcessorCheckpointer = mock( + classOf[IRecordProcessorCheckpointer] + ) + val settings: KinesisCheckpointSettings = + KinesisCheckpointSettings(maxBatchSize = 100, maxBatchWait = 500.millis).right.get + + def startStream(input: Seq[CommittableRecord]): Unit = + fs2.Stream + .emits(input) + .through(checkpointRecords[IO](settings)) + .compile + .toVector + .unsafeRunAsync(_ => ()) + } +} diff --git a/fs2-aws-examples/build.sbt b/fs2-aws-examples/build.sbt new file mode 100644 index 00000000..2d10aa04 --- /dev/null +++ b/fs2-aws-examples/build.sbt @@ -0,0 +1,25 @@ +name := "fs2-aws-examples" + +// coverage +coverageMinimum := 40 +coverageFailOnMinimum := true + +scalaVersion := "2.12.10" + +val fs2Version = "2.2.2" +val AwsSdkVersion = "1.11.716" +val cirisVersion = "0.12.1" + +libraryDependencies ++= Seq( + "org.mockito" % "mockito-core" % "3.2.4" % Test, + "org.mockito" %% "mockito-scala-scalatest" % "1.11.2" % Test, + "ch.qos.logback" % "logback-classic" % "1.2.3", + "ch.qos.logback" % "logback-core" % "1.2.3", + "org.slf4j" % "jcl-over-slf4j" % "1.7.30", + "org.slf4j" % "jul-to-slf4j" % "1.7.30", + "io.chrisdavenport" %% "log4cats-slf4j" % "1.0.1", + "io.laserdisc" %% "scanamo-circe" % "1.0.5" +) + +addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1") +addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.3") diff --git a/fs2-aws-examples/src/main/resources/logback.xml b/fs2-aws-examples/src/main/resources/logback.xml new file mode 100644 index 00000000..aa377883 --- /dev/null +++ b/fs2-aws-examples/src/main/resources/logback.xml @@ -0,0 +1,25 @@ + + + + + %date |-%level %logger [%thread] %message%n%xException + + + + + + + + + + + + + + + + + + + + diff --git a/fs2-aws-examples/src/main/scala/DynamoDBStreamer.scala b/fs2-aws-examples/src/main/scala/DynamoDBStreamer.scala new file mode 100644 index 00000000..19a27f6c --- /dev/null +++ b/fs2-aws-examples/src/main/scala/DynamoDBStreamer.scala @@ -0,0 +1,23 @@ +import cats.effect.{ ExitCode, IO, IOApp } +import fs2.aws.dynamodb +import fs2.aws.dynamodb.parsers +import io.chrisdavenport.log4cats.SelfAwareStructuredLogger +import io.chrisdavenport.log4cats.slf4j.Slf4jLogger +import io.circe.Json +import io.github.howardjohn.scanamo.CirceDynamoFormat._ + +object DynamoDBStreamer extends IOApp { + override def run(args: List[String]): IO[ExitCode] = + for { + implicit0(logger: SelfAwareStructuredLogger[IO]) <- Slf4jLogger.fromName[IO]("example") + _ <- dynamodb + .readFromDynamDBStream[IO]( + "dynamo_db_example", + "arn:aws:dynamodb:us-east-1:023006903388:table/nightly-sync-events-Production/stream/2020-01-27T22:49:13.204" + ) + .evalMap(cr => parsers.parseDynamoEvent[IO, Json](cr.record)) + .evalTap(msg => logger.info(s"received $msg")) + .compile + .drain + } yield ExitCode.Success +} diff --git a/fs2-aws/src/main/scala/fs2/aws/kinesis/consumer.scala b/fs2-aws/src/main/scala/fs2/aws/kinesis/consumer.scala index a5fb9a91..211a5c52 100644 --- a/fs2-aws/src/main/scala/fs2/aws/kinesis/consumer.scala +++ b/fs2-aws/src/main/scala/fs2/aws/kinesis/consumer.scala @@ -4,8 +4,8 @@ import java.util.UUID import cats.effect.{ ConcurrentEffect, IO, Timer } import cats.implicits._ +import fs2.aws.core import fs2.{ Chunk, Pipe, RaiseThrowable, Stream } -import fs2.aws.internal._ import fs2.aws.internal.Exceptions.KinesisCheckpointException import fs2.concurrent.Queue import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider @@ -275,7 +275,7 @@ object consumer { def bypass: Pipe[F, CommittableRecord, KinesisClientRecord] = _.map(r => r.record) - _.through(groupBy(r => F.delay(r.shardId))).map { + _.through(core.groupBy(r => F.delay(r.shardId))).map { case (_, st) => st.broadcastThrough(checkpoint(checkpointSettings, parallelism), bypass) }.parJoinUnbounded