Skip to content

Commit

Permalink
Dynamodb streaming (#143)
Browse files Browse the repository at this point in the history
* stream dynamodb events

* provide examples
parse dynamodb records

* bump version

* formatting

* stream dynamodb events

* provide examples
parse dynamodb records

* bump version

* formatting

* build sbt

* fix build sbt

* formatting
update versions

* fix version

* fix version

Co-authored-by: Barry O'Neill <[email protected]>
  • Loading branch information
semenodm and barryoneill authored Feb 14, 2020
1 parent 106dbc3 commit b4edce5
Show file tree
Hide file tree
Showing 16 changed files with 915 additions and 7 deletions.
39 changes: 38 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,30 @@ name := "fs2-aws"
scalaVersion := "2.12.10"

lazy val root = (project in file("."))
.aggregate(`fs2-aws`, `fs2-aws-testkit`)
.aggregate(`fs2-aws`, `fs2-aws-testkit`, `fs2-aws-dynamodb`, `fs2-aws-core`, `fs2-aws-examples`)
.settings(
skip in publish := true
)

lazy val `fs2-aws-core` = (project in file("fs2-aws-core"))
.settings(commonSettings)
.settings(publishSettings)

lazy val `fs2-aws-dynamodb` = (project in file("fs2-aws-dynamodb"))
.dependsOn(`fs2-aws-core`)
.settings(commonSettings)
.settings(publishSettings)

lazy val `fs2-aws-examples` = (project in file("fs2-aws-examples"))
.dependsOn(`fs2-aws-dynamodb`)
.settings(commonSettings)
.settings(publishSettings)
.settings(
skip in publish := true
)

lazy val `fs2-aws` = (project in file("fs2-aws"))
.dependsOn(`fs2-aws-core`)
.settings(commonSettings)
.settings(publishSettings)

Expand Down Expand Up @@ -37,6 +55,25 @@ lazy val commonSettings = Seq(
)
)

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.10")

// publish
publishTo in ThisBuild := Some(
"Sonatype Nexus" at "https://oss.sonatype.org/service/local/staging/deploy/maven2"
)

licenses in ThisBuild := Seq(
"MIT" -> url("https://github.com/dmateusp/fs2-aws/blob/master/LICENSE")
)
developers in ThisBuild := List(
Developer(
id = "dmateusp",
name = "Daniel Mateus Pires",
email = "[email protected]",
url = url("https://github.com/dmateusp")
)
)

lazy val publishSettings = Seq(
publishMavenStyle := true,
Test / publishArtifact := true,
Expand Down
18 changes: 18 additions & 0 deletions fs2-aws-core/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name := "fs2-aws-core"

// coverage
coverageMinimum := 40
coverageFailOnMinimum := true

scalaVersion := "2.12.10"

val fs2Version = "2.2.2"

libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-io" % fs2Version,
"org.typelevel" %% "alleycats-core" % "2.1.0",
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
"org.mockito" % "mockito-core" % "3.2.4" % Test,
"org.mockito" %% "mockito-scala-scalatest" % "1.11.2" % Test
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.implicits._
import fs2.concurrent.Queue
import fs2.{ Pipe, Stream }

package object internal {
package object core {

/** Helper flow to group elements of a stream into K substreams.
* Grows with the number of distinct 'K' selectors
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package fs2.aws
package fs2.aws.core

import fs2.Stream
import cats.effect.{ ContextShift, IO }
import fs2.aws.internal._
import fs2.Stream
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

Expand Down
22 changes: 22 additions & 0 deletions fs2-aws-dynamodb/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name := "fs2-aws-dynamodb"

// coverage
coverageMinimum := 40
coverageFailOnMinimum := true

scalaVersion := "2.12.10"

val fs2Version = "2.2.2"
val AwsSdkVersion = "1.11.716"
val cirisVersion = "0.12.1"

libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-io" % fs2Version,
"org.typelevel" %% "alleycats-core" % "2.1.0",
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
"org.mockito" % "mockito-core" % "3.2.4" % Test,
"org.mockito" %% "mockito-scala-scalatest" % "1.11.2" % Test,
"com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.0",
"io.laserdisc" %% "scanamo-circe" % "1.0.5"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package fs2.aws.dynamodb

import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer

/** A message type from Kinesis which has not yet been commited or checkpointed.
*
* @constructor create a new commitable record with a name and age.
* @param shardId the unique identifier for the shard from which this record originated
* @param millisBehindLatest ms behind the latest record, used to detect if the consumer is lagging the producer
* @param record the original record document from Kinesis
* @param recordProcessor reference to the record processor that is responsible for processing this message
* @param checkpointer reference to the checkpointer used to commit this record
*/
case class CommittableRecord(
shardId: String,
recordProcessorStartingSequenceNumber: ExtendedSequenceNumber,
millisBehindLatest: Long,
record: RecordAdapter,
recordProcessor: RecordProcessor,
checkpointer: IRecordProcessorCheckpointer
) {
val sequenceNumber: String = record.getSequenceNumber

def canCheckpoint(): Boolean = !recordProcessor.isShutdown
def checkpoint(): Unit = checkpointer.checkpoint(record)
}

object CommittableRecord {

implicit val orderBySequenceNumber: Ordering[CommittableRecord] =
Ordering[String].on(cr cr.sequenceNumber)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package fs2.aws.dynamodb

import scala.concurrent.duration._

/** Settings for configuring the Kinesis consumer stream
*
* @param bufferSize size of the internal buffer used when reading messages from Kinesis
*/
class KinesisStreamSettings private (
val bufferSize: Int,
val terminateGracePeriod: FiniteDuration
)

/** Settings for configuring the Kinesis checkpointer pipe
*
* @param maxBatchSize the maximum number of records to aggregate before checkpointing the cluster of records. Passing 1 means checkpoint on every record
* @param maxBatchWait the maximum amount of time to wait before checkpointing the cluster of records
*/
class KinesisCheckpointSettings private (
val maxBatchSize: Int,
val maxBatchWait: FiniteDuration
)

object KinesisStreamSettings {
val defaultInstance: KinesisStreamSettings = new KinesisStreamSettings(10, 10.seconds)

def apply(
bufferSize: Int,
terminateGracePeriod: FiniteDuration
): Either[Throwable, KinesisStreamSettings] =
(bufferSize, terminateGracePeriod) match {
case (bs, _) if bs < 1 => Left(new RuntimeException("Must be greater than 0"))
case (bs, period) => Right(new KinesisStreamSettings(bufferSize, period))
}
}

object KinesisCheckpointSettings {
val defaultInstance = new KinesisCheckpointSettings(1000, 10.seconds)

def apply(
maxBatchSize: Int,
maxBatchWait: FiniteDuration
): Either[Throwable, KinesisCheckpointSettings] =
(maxBatchSize, maxBatchWait) match {
case (s, _) if s <= 0 =>
Left(new RuntimeException("Must be greater than 0"))
case (_, w) if w <= 0.milliseconds =>
Left(
new RuntimeException(
"Must be greater than 0 milliseconds. To checkpoint immediately, pass 1 to the max batch size."
)
)
case (s, w) =>
Right(new KinesisCheckpointSettings(s, w))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package fs2.aws.dynamodb

import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter
import com.amazonaws.services.kinesis.clientlibrary.interfaces._
import com.amazonaws.services.kinesis.clientlibrary.types._
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason

import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration

/** Concrete implementation of the AWS RecordProcessor interface.
* Wraps incoming records into CommitableRecord types to allow for downstream
* checkpointing
*
* @constructor create a new instance with a callback function to perform on record receive
* @param cb callback function to run on record receive, passing the new CommittableRecord
*/
class RecordProcessor(
cb: CommittableRecord => Unit,
terminateGracePeriod: FiniteDuration
) extends v2.IRecordProcessor {
private var shardId: String = _
private var extendedSequenceNumber: ExtendedSequenceNumber = _
var latestCheckpointer: Option[IRecordProcessorCheckpointer] = None
var shutdown: Option[ShutdownReason] = None

def isShutdown = shutdown.isDefined

override def initialize(initializationInput: InitializationInput): Unit = {
shardId = initializationInput.getShardId
extendedSequenceNumber = initializationInput.getExtendedSequenceNumber
}

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
latestCheckpointer = Some(processRecordsInput.getCheckpointer)
processRecordsInput.getRecords.asScala.foreach { record =>
cb(
CommittableRecord(
shardId,
extendedSequenceNumber,
processRecordsInput.getMillisBehindLatest,
record.asInstanceOf[RecordAdapter],
recordProcessor = this,
processRecordsInput.getCheckpointer
)
)
}
}

override def shutdown(shutdownInput: ShutdownInput): Unit = {
shutdown = Some(shutdownInput.getShutdownReason)
latestCheckpointer = Some(shutdownInput.getCheckpointer)
shutdownInput.getShutdownReason match {
case ShutdownReason.TERMINATE =>
Thread.sleep(terminateGracePeriod.toMillis)
latestCheckpointer.foreach(_.checkpoint())
case ShutdownReason.ZOMBIE => ()
case ShutdownReason.REQUESTED => ()
}
}
}
Loading

0 comments on commit b4edce5

Please sign in to comment.