From d1bf60e784478a54d14061d655fd2e379c4fa3d5 Mon Sep 17 00:00:00 2001 From: Julien Richard-Foy Date: Tue, 16 Apr 2024 18:51:57 +0200 Subject: [PATCH] Replace our fork of spark-kinesis with our adapted copy of the relevant classes. We adapted the `KinesisReceiver` and its related classes to work with DynamoDB Streams, and we renamed it into `KinesisDynamoDBReceiver`. These classes are based on the code from the original `spark-kinesis-asl` module with some slight modifications based on the following resources: - https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.html - https://medium.com/@ravi72munde/using-spark-streaming-with-dynamodb-d325b9a73c79 - and our previous fork implementation As a result, instead of maintaining a complete fork of `spark-kinesis-asl`, we only maintain a copy of the relevant classes, which should result in much faster build times (especially in the CI). It is still not possible to test the streaming feature locally (thus not in the CI either), see #113. These changes were tested with my actual AWS account. Fixes #119 --- .gitmodules | 3 - build.sbt | 50 +++++++----- build.sh | 4 - .../writers/DynamoStreamReplication.scala | 81 +++++++++---------- spark-kinesis | 1 - ...cala => KinesisDynamoDBCheckpointer.scala} | 4 +- .../kinesis/KinesisDynamoDBInputDStream.scala | 52 ++++++++++++ ...er.scala => KinesisDynamoDBReceiver.scala} | 65 ++++++++++++--- ...a => KinesisDynamoDBRecordProcessor.scala} | 2 +- .../kinesis/V1ToV2RecordProcessor.scala | 20 +++++ 10 files changed, 202 insertions(+), 80 deletions(-) delete mode 160000 spark-kinesis rename spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/{KinesisCheckpointer.scala => KinesisDynamoDBCheckpointer.scala} (98%) create mode 100644 spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBInputDStream.scala rename spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/{KinesisReceiver.scala => KinesisDynamoDBReceiver.scala} (87%) rename spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/{KinesisRecordProcessor.scala => KinesisDynamoDBRecordProcessor.scala} (98%) create mode 100644 spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/V1ToV2RecordProcessor.scala diff --git a/.gitmodules b/.gitmodules index 8d0f11a6..78f4fa73 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,6 +2,3 @@ path = spark-cassandra-connector url = https://github.com/scylladb/spark-cassandra-connector branch = feature/track-token-ranges -[submodule "spark-kinesis"] - path = spark-kinesis - url = https://github.com/scylladb/spark-kinesis diff --git a/build.sbt b/build.sbt index 0d8f065a..5eead8be 100644 --- a/build.sbt +++ b/build.sbt @@ -2,6 +2,7 @@ import sbt.librarymanagement.InclExclRule val awsSdkVersion = "1.11.728" val sparkVersion = "2.4.4" +val dynamodbStreamsKinesisAdapterVersion = "1.5.2" inThisBuild( List( @@ -11,9 +12,12 @@ inThisBuild( ) ) -// Adaptation of spark-streaming-kinesis-asl to work with DynamoDB Streams +// Augmentation of spark-streaming-kinesis-asl to also work with DynamoDB Streams lazy val `spark-kinesis-dynamodb` = project.in(file("spark-kinesis-dynamodb")).settings( - libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion, + libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-streaming-kinesis-asl" % sparkVersion, + "com.amazonaws" % "dynamodb-streams-kinesis-adapter" % dynamodbStreamsKinesisAdapterVersion + ) ) lazy val migrator = (project in file("migrator")).settings( @@ -28,29 +32,39 @@ lazy val migrator = (project in file("migrator")).settings( "-XX:+CMSClassUnloadingEnabled"), scalacOptions ++= Seq("-deprecation", "-unchecked", "-Ypartial-unification"), Test / parallelExecution := false, - fork := true, - scalafmtOnCompile := true, + fork := true, + scalafmtOnCompile := true, libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "com.amazonaws" % "aws-java-sdk-sts" % awsSdkVersion, "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, - ("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.2") + ("com.amazonaws" % "dynamodb-streams-kinesis-adapter" % dynamodbStreamsKinesisAdapterVersion) .excludeAll(InclExclRule("com.fasterxml.jackson.core")), "com.amazon.emr" % "emr-dynamodb-hadoop" % "4.16.0", - "org.yaml" % "snakeyaml" % "1.23", - "io.circe" %% "circe-yaml" % "0.9.0", - "io.circe" %% "circe-generic" % "0.9.0", + "org.yaml" % "snakeyaml" % "1.23", + "io.circe" %% "circe-yaml" % "0.9.0", + "io.circe" %% "circe-generic" % "0.9.0", ), assembly / assemblyShadeRules := Seq( ShadeRule.rename("org.yaml.snakeyaml.**" -> "com.scylladb.shaded.@1").inAll ), assembly / assemblyMergeStrategy := { - case PathList("org", "joda", "time", _ @_*) => MergeStrategy.first + // Handle conflicts between our own library dependencies and those that are bundled into + // the spark-cassandra-connector fat-jar + case PathList("com", "codahale", "metrics", _ @_*) => MergeStrategy.first + case PathList("digesterRules.xml") => MergeStrategy.first + case PathList("org", "aopalliance", _ @_*) => MergeStrategy.first + case PathList("org", "apache", "commons", "collections", _ @_*) => MergeStrategy.first + case PathList("org", "apache", "commons", "configuration", _ @_*) => MergeStrategy.first case PathList("org", "apache", "commons", "logging", _ @_*) => MergeStrategy.first - case PathList("com", "fasterxml", "jackson", "annotation", _ @_*) => MergeStrategy.first - case PathList("com", "fasterxml", "jackson", "core", _ @_*) => MergeStrategy.first - case PathList("com", "fasterxml", "jackson", "databind", _ @_*) => MergeStrategy.first + case PathList("org", "apache", "spark", _ @_*) => MergeStrategy.first + case PathList("org", "slf4j", _ @_*) => MergeStrategy.first + case PathList("properties.dtd") => MergeStrategy.first + case PathList("PropertyList-1.0.dtd") => MergeStrategy.first + // Other conflicts + case PathList("javax", "inject", _ @_*) => MergeStrategy.first + case PathList("org", "apache", "hadoop", _ @_*) => MergeStrategy.first case x => val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) @@ -78,12 +92,12 @@ lazy val migrator = (project in file("migrator")).settings( lazy val tests = project.in(file("tests")).settings( libraryDependencies ++= Seq( - "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, - "org.apache.cassandra" % "java-driver-query-builder" % "4.18.0", - "com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4", - "org.apache.hadoop" % "hadoop-client" % "2.9.2", - "org.scalameta" %% "munit" % "0.7.29", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" + "com.amazonaws" % "aws-java-sdk-dynamodb" % awsSdkVersion, + "org.apache.cassandra" % "java-driver-query-builder" % "4.18.0", + "com.github.mjakubowski84" %% "parquet4s-core" % "1.9.4", + "org.apache.hadoop" % "hadoop-client" % "2.9.2", + "org.scalameta" %% "munit" % "0.7.29", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" ), Test / parallelExecution := false ) diff --git a/build.sh b/build.sh index a1b503f2..01837eba 100755 --- a/build.sh +++ b/build.sh @@ -14,15 +14,11 @@ trap "rm -rf $TMPDIR" EXIT pushd spark-cassandra-connector sbt -Djava.io.tmpdir="$TMPDIR" ++2.11.12 assembly popd -pushd spark-kinesis -sbt assembly -popd if [ ! -d "./migrator/lib" ]; then mkdir migrator/lib fi cp ./spark-cassandra-connector/connector/target/scala-2.11/spark-cassandra-connector-assembly-*.jar ./migrator/lib -cp ./spark-kinesis/target/scala-2.11/spark-streaming-kinesis-asl-assembly-*.jar ./migrator/lib sbt -Djava.io.tmpdir="$TMPDIR" migrator/assembly diff --git a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala index ca17bf50..70de3fd1 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/writers/DynamoStreamReplication.scala @@ -9,8 +9,8 @@ import org.apache.log4j.LogManager import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.kinesis.{ + KinesisDynamoDBInputDStream, KinesisInitialPositions, - KinesisInputDStream, SparkAWSCredentials } @@ -29,22 +29,13 @@ object DynamoStreamReplication { target: TargetSettings.DynamoDB, targetTableDesc: TableDescription, renames: List[Rename]): Unit = - KinesisInputDStream.builder - .streamingContext(streamingContext) - .streamName(src.table) - .dynamoStream(true) - .kinesisCredentials( - src.credentials.map { - case AWSCredentials(accessKey, secretKey) => - SparkAWSCredentials.builder - .basicCredentials(accessKey, secretKey) - .build - }.orNull - ) - .regionName(src.region.orNull) - .checkpointAppName(s"migrator_${src.table}_${System.currentTimeMillis()}") - .initialPosition(new KinesisInitialPositions.TrimHorizon) - .buildWithMessageHandler { + new KinesisDynamoDBInputDStream( + streamingContext, + streamName = src.table, + regionName = src.region.orNull, + initialPosition = new KinesisInitialPositions.TrimHorizon, + checkpointAppName = s"migrator_${src.table}_${System.currentTimeMillis()}", + messageHandler = { case recAdapter: RecordAdapter => val rec = recAdapter.getInternalObject val newMap = new util.HashMap[String, AttributeValue]() @@ -63,35 +54,41 @@ object DynamoStreamReplication { Some(newMap) case _ => None - } - .foreachRDD { msgs => - val rdd = msgs - .collect { case Some(item) => new DynamoDBItemWritable(item) } - .repartition(Runtime.getRuntime.availableProcessors() * 2) - .map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues + }, + kinesisCreds = src.credentials.map { + case AWSCredentials(accessKey, secretKey) => + SparkAWSCredentials.builder + .basicCredentials(accessKey, secretKey) + .build() + }.orNull + ).foreachRDD { msgs => + val rdd = msgs + .collect { case Some(item) => new DynamoDBItemWritable(item) } + .repartition(Runtime.getRuntime.availableProcessors() * 2) + .map(item => (new Text, item)) // Create the key after repartitioning to avoid Serialization issues - val changes = - rdd - .map(_._2) // Remove keys because they are not serializable - .groupBy { itemWritable => - itemWritable.getItem.get(operationTypeColumn) match { - case `putOperation` => "UPSERT" - case `deleteOperation` => "DELETE" - case _ => "UNKNOWN" - } + val changes = + rdd + .map(_._2) // Remove keys because they are not serializable + .groupBy { itemWritable => + itemWritable.getItem.get(operationTypeColumn) match { + case `putOperation` => "UPSERT" + case `deleteOperation` => "DELETE" + case _ => "UNKNOWN" } - .mapValues(_.size) - .collect() - if (changes.nonEmpty) { - log.info("Changes to be applied:") - for ((operation, count) <- changes) { - log.info(s"${operation}: ${count}") } - } else { - log.info("No changes to apply") + .mapValues(_.size) + .collect() + if (changes.nonEmpty) { + log.info("Changes to be applied:") + for ((operation, count) <- changes) { + log.info(s"${operation}: ${count}") } - - DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark) + } else { + log.info("No changes to apply") } + DynamoDB.writeRDD(target, renames, rdd, Some(targetTableDesc))(spark) + } + } diff --git a/spark-kinesis b/spark-kinesis deleted file mode 160000 index d7ba51cc..00000000 --- a/spark-kinesis +++ /dev/null @@ -1 +0,0 @@ -Subproject commit d7ba51cc670b0e323017694804fab5f9e528b4c9 diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBCheckpointer.scala similarity index 98% rename from spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala rename to spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBCheckpointer.scala index 11e94953..946ba82e 100644 --- a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBCheckpointer.scala @@ -36,8 +36,8 @@ import org.apache.spark.util.{Clock, SystemClock} * @param workerId Worker Id of KCL worker for logging purposes * @param clock In order to use ManualClocks for the purpose of testing */ -private[kinesis] class KinesisCheckpointer( - receiver: KinesisReceiver[_], +private[kinesis] class KinesisDynamoDBCheckpointer( + receiver: KinesisDynamoDBReceiver[_], checkpointInterval: Duration, workerId: String, clock: Clock = new SystemClock) extends Logging { diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBInputDStream.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBInputDStream.scala new file mode 100644 index 00000000..e30500ff --- /dev/null +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBInputDStream.scala @@ -0,0 +1,52 @@ +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.model.Record +import org.apache.spark.streaming.kinesis.KinesisInputDStream.{DEFAULT_KINESIS_ENDPOINT_URL, DEFAULT_STORAGE_LEVEL} +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.StreamingContext + +import scala.reflect.ClassTag + +/** + * Override the default behavior of [[KinesisInputDStream]] to create a [[KinesisDynamoDBReceiver]]. + */ +class KinesisDynamoDBInputDStream[T: ClassTag]( + ssc: StreamingContext, + streamName: String, + regionName: String, + initialPosition: KinesisInitialPosition, + checkpointAppName: String, + messageHandler: Record => T, + kinesisCreds: SparkAWSCredentials +) extends KinesisInputDStream[T]( + ssc, + streamName, + DEFAULT_KINESIS_ENDPOINT_URL, + regionName, + initialPosition, + checkpointAppName, + ssc.graph.batchDuration, + DEFAULT_STORAGE_LEVEL, + messageHandler, + kinesisCreds, + None, + None + ) { + + override def getReceiver(): Receiver[T] = { + new KinesisDynamoDBReceiver( + streamName, + endpointUrl, + regionName, + initialPosition, + checkpointAppName, + checkpointInterval, + DEFAULT_STORAGE_LEVEL, + messageHandler, + kinesisCreds, + dynamoDBCreds, + cloudWatchCreds + ) + } + +} diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBReceiver.scala similarity index 87% rename from spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala rename to spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBReceiver.scala index 69c52365..ca37d7db 100644 --- a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBReceiver.scala @@ -23,7 +23,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal -import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory} +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder +import com.amazonaws.services.dynamodbv2.{AmazonDynamoDBClientBuilder, AmazonDynamoDBStreamsClient} +import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest +import com.amazonaws.services.dynamodbv2.streamsadapter.{AmazonDynamoDBStreamsAdapterClient, StreamsWorkerFactory} +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker} import com.amazonaws.services.kinesis.model.Record @@ -81,7 +86,7 @@ import org.apache.spark.util.Utils * AWSCredentialsProvider passed to the KCL to authorize DynamoDB API calls. * Will use kinesisCreds if value is None. */ -private[kinesis] class KinesisReceiver[T]( +private[kinesis] class KinesisDynamoDBReceiver[T]( val streamName: String, endpointUrl: String, regionName: String, @@ -132,7 +137,7 @@ private[kinesis] class KinesisReceiver[T]( /** * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval. */ - @volatile private var kinesisCheckpointer: KinesisCheckpointer = null + @volatile private var kinesisCheckpointer: KinesisDynamoDBCheckpointer = null /** * Latest sequence number ranges that have been stored successfully. @@ -148,13 +153,27 @@ private[kinesis] class KinesisReceiver[T]( workerId = Utils.localHostName() + ":" + UUID.randomUUID() - kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId) + kinesisCheckpointer = + new KinesisDynamoDBCheckpointer(receiver, checkpointInterval, workerId) val kinesisProvider = kinesisCreds.provider + val dynamoDBClient = + AmazonDynamoDBClientBuilder + .standard() + .withCredentials(dynamoDBCreds.fold(kinesisProvider)(_.provider)) + .withRegion(regionName) + .build() + + val actualStreamName = + dynamoDBClient + .describeTable(new DescribeTableRequest(streamName)) + .getTable + .getLatestStreamArn + val kinesisClientLibConfiguration = { val baseClientLibConfiguration = new KinesisClientLibConfiguration( checkpointAppName, - streamName, + actualStreamName, kinesisProvider, dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider), cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider), @@ -162,6 +181,10 @@ private[kinesis] class KinesisReceiver[T]( .withKinesisEndpoint(endpointUrl) .withTaskBackoffTimeMillis(500) .withRegionName(regionName) + .withIdleTimeBetweenReadsInMillis(500) + .withMaxRecords(1000) + .withFailoverTimeMillis(60000) + .withParentShardPollIntervalMillis(10000) // Update the Kinesis client lib config with timestamp // if InitialPositionInStream.AT_TIMESTAMP is passed @@ -179,12 +202,36 @@ private[kinesis] class KinesisReceiver[T]( * IRecordProcessor.processRecords() method. * We're using our custom KinesisRecordProcessor in this case. */ - val recordProcessorFactory = new IRecordProcessorFactory { - override def createProcessor: IRecordProcessor = - new KinesisRecordProcessor(receiver, workerId) + val recordProcessorFactory = new v2.IRecordProcessorFactory { + override def createProcessor(): v2.IRecordProcessor = + new V1ToV2RecordProcessor(new KinesisDynamoDBRecordProcessor(receiver, workerId)) + } + + worker = { + val streamsAdapter = new AmazonDynamoDBStreamsAdapterClient( + AmazonDynamoDBStreamsClient.builder() + .withCredentials(kinesisProvider) + .withRegion(regionName) + .build() + ) + + val cloudWatchClient = AmazonCloudWatchClientBuilder.standard + .withCredentials(cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider)) + .withRegion(regionName) + .withClientConfiguration( + kinesisClientLibConfiguration.getCloudWatchClientConfiguration + ) + .build + + StreamsWorkerFactory.createDynamoDbStreamsWorker( + recordProcessorFactory, + kinesisClientLibConfiguration, + streamsAdapter, + dynamoDBClient, + cloudWatchClient + ) } - worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) workerThread = new Thread() { override def run(): Unit = { try { diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBRecordProcessor.scala similarity index 98% rename from spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala rename to spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBRecordProcessor.scala index 8c6a399d..f3fba11e 100644 --- a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/KinesisDynamoDBRecordProcessor.scala @@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging * @param receiver Kinesis receiver * @param workerId for logging purposes */ -private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], workerId: String) +private[kinesis] class KinesisDynamoDBRecordProcessor[T](receiver: KinesisDynamoDBReceiver[T], workerId: String) extends IRecordProcessor with Logging { // shardId populated during initialize() diff --git a/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/V1ToV2RecordProcessor.scala b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/V1ToV2RecordProcessor.scala new file mode 100644 index 00000000..3832905d --- /dev/null +++ b/spark-kinesis-dynamodb/src/main/scala/org/apache/spark/streaming/kinesis/V1ToV2RecordProcessor.scala @@ -0,0 +1,20 @@ +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 +import com.amazonaws.services.kinesis.clientlibrary.types.{InitializationInput, ProcessRecordsInput, ShutdownInput} + +class V1ToV2RecordProcessor(v1RecordProcessor: IRecordProcessor) extends v2.IRecordProcessor { + + def initialize(initializationInput: InitializationInput): Unit = + v1RecordProcessor.initialize(initializationInput.getShardId) + + def processRecords(processRecordsInput: ProcessRecordsInput): Unit = { + println(s"PROCESSING RECORDS ${processRecordsInput}") + v1RecordProcessor.processRecords(processRecordsInput.getRecords, processRecordsInput.getCheckpointer) + } + + def shutdown(shutdownInput: ShutdownInput): Unit = + v1RecordProcessor.shutdown(shutdownInput.getCheckpointer, shutdownInput.getShutdownReason) + +}