diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala index 60a68acca..7f5c2e2a2 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala @@ -61,7 +61,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont bucketAndPrefix, commitPolicy = DefaultCommitPolicy(None, None, Some(2)), formatSelection = FormatSelection(Avro), - fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Avro)), + fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Avro), NoOpPaddingStrategy), localStagingArea = LocalStagingArea(localRoot), ), ), diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala index b10eea394..ba0094ef3 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala @@ -59,7 +59,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont bucketAndPrefix, commitPolicy = DefaultCommitPolicy(None, None, Some(1)), formatSelection = FormatSelection(Json), - fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Json)), + fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Json), NoOpPaddingStrategy), localStagingArea = LocalStagingArea(localRoot), ), // JsonS3Format ), @@ -96,10 +96,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont SinkBucketOptions( TopicName, bucketAndPrefix, - commitPolicy = DefaultCommitPolicy(None, None, Some(3)), - formatSelection = FormatSelection(Json), - fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Json)), // JsonS3Format - localStagingArea = LocalStagingArea(localRoot), + commitPolicy = DefaultCommitPolicy(None, None, Some(3)), + formatSelection = FormatSelection(Json), + fileNamingStrategy = + new HierarchicalS3FileNamingStrategy(FormatSelection(Json), NoOpPaddingStrategy), // JsonS3Format + localStagingArea = LocalStagingArea(localRoot), ), ), offsetSeekerOptions = OffsetSeekerOptions(5, true), diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala index bc5865f30..e1b0694f8 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala @@ -61,7 +61,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC TopicName, bucketAndPrefix, commitPolicy = DefaultCommitPolicy(None, None, Some(2)), - fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Parquet)), + fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Parquet), NoOpPaddingStrategy), formatSelection = FormatSelection(Parquet), localStagingArea = LocalStagingArea(localRoot), ), diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskTest.scala index ac02b25c7..85c86d474 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskTest.scala @@ -2119,6 +2119,37 @@ class S3SinkTaskTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest ) } + "S3SinkTask" should "write files with topic partition padded when requested" in { + + val task = new S3SinkTask() + + val props = DefaultProps + .combine( + Map( + "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName WITH_FLUSH_COUNT = 3", + "connect.s3.padding.strategy" -> "LeftPad", + "connect.s3.padding.length" -> "10", + ), + ).asJava + + task.start(props) + task.open(Seq(new TopicPartition(TopicName, 1)).asJava) + task.put(records.asJava) + task.close(Seq(new TopicPartition(TopicName, 1)).asJava) + task.stop() + + listBucketPath(BucketName, "streamReactorBackups/myTopic/0000000001/").size should be(1) + + remoteFileAsString(BucketName, "streamReactorBackups/myTopic/0000000001/0000000002.json") should be( + """ + |{"name":"sam","title":"mr","salary":100.43} + |{"name":"laura","title":"ms","salary":429.06} + |{"name":"tom","title":null,"salary":395.44} + |""".stripMargin.filter(_ >= ' '), + ) + + } + private def createSinkRecord(partition: Int, valueStruct: Struct, offset: Int, headers: lang.Iterable[Header]) = new SinkRecord(TopicName, partition, null, null, null, valueStruct, offset.toLong, null, null, headers) diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3WriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3WriterManagerTest.scala index 2fa6f5f71..dd2fa0963 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3WriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3WriterManagerTest.scala @@ -26,7 +26,7 @@ class S3WriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContaine "myLovelySink", _ => DefaultCommitPolicy(Some(5L), Some(FiniteDuration(5, SECONDS)), Some(5L)).asRight, _ => RemoteS3RootLocation("bucketAndPath:location").asRight, - _ => new HierarchicalS3FileNamingStrategy(FormatSelection.fromString("csv")).asRight, + _ => new HierarchicalS3FileNamingStrategy(FormatSelection.fromString("csv"), NoOpPaddingStrategy).asRight, (_, _) => new File("blah.csv").asRight, (_, _, _) => RemoteS3PathLocation("bucket", "path").asRight, (_, _) => mock[S3FormatWriter].asRight, diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/PaddingStrategySettings.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/PaddingStrategySettings.scala new file mode 100644 index 000000000..169223801 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/PaddingStrategySettings.scala @@ -0,0 +1,37 @@ +package io.lenses.streamreactor.connect.aws.s3.config + +import com.datamountaineer.streamreactor.common.config.base.traits.BaseSettings +import enumeratum.Enum +import enumeratum.EnumEntry +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.PADDING_LENGTH +import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.PADDING_STRATEGY +import io.lenses.streamreactor.connect.aws.s3.sink.LeftPadPaddingStrategy +import io.lenses.streamreactor.connect.aws.s3.sink.NoOpPaddingStrategy +import io.lenses.streamreactor.connect.aws.s3.sink.PaddingStrategy +import io.lenses.streamreactor.connect.aws.s3.sink.RightPadPaddingStrategy + +sealed trait PaddingStrategyOptions extends EnumEntry + +object PaddingStrategyOptions extends Enum[PaddingStrategyOptions] { + + val values = findValues + + case object LeftPad extends PaddingStrategyOptions + case object RightPad extends PaddingStrategyOptions + case object NoOp extends PaddingStrategyOptions + +} + +trait PaddingStrategySettings extends BaseSettings { + + private val paddingChar: Char = '0' + + def getPaddingStrategy(): PaddingStrategy = { + val paddingLength = getInt(PADDING_LENGTH) + PaddingStrategyOptions.withNameInsensitive(getString(PADDING_STRATEGY)) match { + case PaddingStrategyOptions.LeftPad => LeftPadPaddingStrategy(paddingLength, paddingChar) + case PaddingStrategyOptions.RightPad => RightPadPaddingStrategy(paddingLength, paddingChar) + case _ => NoOpPaddingStrategy + } + } +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigDef.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigDef.scala index 98fe0709c..92fffd986 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigDef.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigDef.scala @@ -227,6 +227,20 @@ object S3ConfigDef { Importance.LOW, COMPRESSION_LEVEL_DOC, ) + .define( + PADDING_STRATEGY, + Type.STRING, + PADDING_STRATEGY_DEFAULT, + Importance.LOW, + PADDING_STRATEGY_DOC, + ) + .define( + PADDING_LENGTH, + Type.INT, + PADDING_LENGTH_DEFAULT, + Importance.LOW, + PADDING_LENGTH_DOC, + ) } class S3ConfigDef() extends ConfigDef with LazyLogging { @@ -283,7 +297,8 @@ case class S3ConfigDefBuilder(sinkName: Option[String], props: util.Map[String, with UserSettings with ConnectionSettings with S3FlushSettings - with CompressionCodecSettings { + with CompressionCodecSettings + with PaddingStrategySettings { def getParsedValues: Map[String, _] = values().asScala.toMap diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettings.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettings.scala index ab37a0715..34bf46b39 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettings.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettings.scala @@ -120,4 +120,13 @@ object S3ConfigSettings { val COMPRESSION_LEVEL = s"$CONNECTOR_PREFIX.compression.level" val COMPRESSION_LEVEL_DOC = "Certain compression codecs require a level specified." val COMPRESSION_LEVEL_DEFAULT = -1 + + val PADDING_STRATEGY = s"$CONNECTOR_PREFIX.padding.strategy" + val PADDING_STRATEGY_DOC = + "Configure in order to pad the partition and offset on the sink output files. Options are `LeftPad`, `RightPad` or `NoOp`. Defaults to `NoOp` (does not add padding)." + val PADDING_STRATEGY_DEFAULT = "NoOp" + + val PADDING_LENGTH = s"$CONNECTOR_PREFIX.padding.length" + val PADDING_LENGTH_DOC = s"Length to pad the string up to if $PADDING_STRATEGY is set." + val PADDING_LENGTH_DEFAULT = 8 } diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/FileNamingStrategy.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/FileNamingStrategy.scala index e1b287488..bc1964362 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/FileNamingStrategy.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/FileNamingStrategy.scala @@ -68,7 +68,10 @@ trait S3FileNamingStrategy { } -class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection) extends S3FileNamingStrategy { +class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection, paddingStrategy: PaddingStrategy) + extends S3FileNamingStrategy { + + import paddingStrategy._ val format: Format = formatSelection.format @@ -83,8 +86,8 @@ class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection) extends val file = stagingDirectory .toPath .resolve(prefix(bucketAndPrefix)) - .resolve(topicPartition.topic.value) - .resolve(s"${topicPartition.partition}.${format.entryName.toLowerCase}") + .resolve(padString(topicPartition.topic.value)) + .resolve(s"${padString(topicPartition.partition.toString)}.${format.entryName.toLowerCase}") .resolve(uuid) .toFile createFileAndParents(file) @@ -98,7 +101,9 @@ class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection) extends ): Either[FatalS3SinkError, RemoteS3PathLocation] = Try( bucketAndPrefix.withPath( - s"${prefix(bucketAndPrefix)}/${topicPartitionOffset.topic.value}/${topicPartitionOffset.partition}/${topicPartitionOffset.offset.value}.${format.entryName.toLowerCase}", + s"${prefix(bucketAndPrefix)}/${topicPartitionOffset.topic.value}/${padString( + topicPartitionOffset.partition.toString, + )}/${padString(topicPartitionOffset.offset.value.toString)}.${format.entryName.toLowerCase}", ), ).toEither.left.map(ex => FatalS3SinkError(ex.getMessage, topicPartitionOffset.toTopicPartition)) @@ -118,12 +123,19 @@ class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection) extends bucketAndPrefix: RemoteS3RootLocation, topicPartition: TopicPartition, ): RemoteS3PathLocation = - bucketAndPrefix.withPath(s"${prefix(bucketAndPrefix)}/${topicPartition.topic.value}/${topicPartition.partition}/") + bucketAndPrefix.withPath( + s"${prefix(bucketAndPrefix)}/${topicPartition.topic.value}/${padString(topicPartition.partition.toString)}/", + ) } -class PartitionedS3FileNamingStrategy(formatSelection: FormatSelection, partitionSelection: PartitionSelection) - extends S3FileNamingStrategy { +class PartitionedS3FileNamingStrategy( + formatSelection: FormatSelection, + paddingStrategy: PaddingStrategy, + partitionSelection: PartitionSelection, +) extends S3FileNamingStrategy { + + import paddingStrategy._ val format: Format = formatSelection.format @@ -142,7 +154,7 @@ class PartitionedS3FileNamingStrategy(formatSelection: FormatSelection, partitio .resolve(prefix(bucketAndPrefix)) .resolve(buildPartitionPrefix(partitionValues)) .resolve(topicPartition.topic.value) - .resolve(topicPartition.partition.toString) + .resolve(padString(topicPartition.partition.toString)) .resolve(format.entryName.toLowerCase) .resolve(uuid) .toFile @@ -167,7 +179,9 @@ class PartitionedS3FileNamingStrategy(formatSelection: FormatSelection, partitio ): Either[FatalS3SinkError, RemoteS3PathLocation] = Try( bucketAndPrefix.withPath( - s"${prefix(bucketAndPrefix)}/${buildPartitionPrefix(partitionValues)}/${topicPartitionOffset.topic.value}(${topicPartitionOffset.partition}_${topicPartitionOffset.offset.value}).${format.entryName.toLowerCase}", + s"${prefix(bucketAndPrefix)}/${buildPartitionPrefix(partitionValues)}/${topicPartitionOffset.topic.value}(${padString( + topicPartitionOffset.partition.toString, + )}_${padString(topicPartitionOffset.offset.value.toString)}).${format.entryName.toLowerCase}", ), ).toEither.left.map(ex => FatalS3SinkError(ex.getMessage, topicPartitionOffset.toTopicPartition)) @@ -196,7 +210,7 @@ class PartitionedS3FileNamingStrategy(formatSelection: FormatSelection, partitio case partition @ WholeKeyPartitionField() => partition -> getPartitionByWholeKeyValue(messageDetail.keySinkData) case partition @ TopicPartitionField() => partition -> topicPartition.topic.value - case partition @ PartitionPartitionField() => partition -> topicPartition.partition.toString + case partition @ PartitionPartitionField() => partition -> padString(topicPartition.partition.toString) case partition @ DatePartitionField(_) => partition -> messageDetail.time.fold( throw new IllegalArgumentException( diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/PaddingStrategy.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/PaddingStrategy.scala new file mode 100644 index 000000000..5538072c2 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/PaddingStrategy.scala @@ -0,0 +1,17 @@ +package io.lenses.streamreactor.connect.aws.s3.sink + +trait PaddingStrategy { + def padString(padMe: String): String +} + +object NoOpPaddingStrategy extends PaddingStrategy { + override def padString(dontPadMe: String): String = dontPadMe +} + +case class LeftPadPaddingStrategy(maxDigits: Int, padCharacter: Char) extends PaddingStrategy { + override def padString(padMe: String): String = padMe.reverse.padTo(maxDigits, padCharacter).reverse +} + +case class RightPadPaddingStrategy(maxDigits: Int, padCharacter: Char) extends PaddingStrategy { + override def padString(padMe: String): String = padMe.padTo(maxDigits, padCharacter) +} diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala index 99602edb3..3ddf5ab81 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala @@ -70,8 +70,8 @@ object SinkBucketOptions extends LazyLogging { val partitionSelection = PartitionSelection(kcql) val namingStrategy = partitionSelection match { - case Some(partSel) => new PartitionedS3FileNamingStrategy(formatSelection, partSel) - case None => new HierarchicalS3FileNamingStrategy(formatSelection) + case Some(partSel) => new PartitionedS3FileNamingStrategy(formatSelection, config.getPaddingStrategy(), partSel) + case None => new HierarchicalS3FileNamingStrategy(formatSelection, config.getPaddingStrategy()) } val stagingArea = LocalStagingArea(config) diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigDefTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigDefTest.scala index 80c1ce601..3b3ad817c 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigDefTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigDefTest.scala @@ -47,20 +47,20 @@ class S3ConfigDefTest extends AnyFlatSpec with Matchers { "S3ConfigDef" should "parse original properties" in { val resultMap = S3ConfigDef.config.parse(DefaultProps.asJava).asScala - resultMap should have size 24 + resultMap should have size 26 DeprecatedProps.filterNot { case (k, _) => k == KCQL_CONFIG }.foreach { case (k, _) => resultMap.get(k) should be(None) } - DefaultProps.foreach { case (k, _) => resultMap.keySet.contains(k) should be(true) } + resultMap.keys should contain allElementsOf DefaultProps.keys } "S3ConfigDef" should "parse deprecated properties" in { val resultMap = S3ConfigDef.config.parse(DeprecatedProps.asJava).asScala - resultMap should have size 24 + resultMap should have size 26 DeprecatedProps.filterNot { case (k, _) => k == KCQL_CONFIG }.foreach { case (k, _) => resultMap.get(k) should be(None) } - DefaultProps.foreach { case (k, _) => resultMap.keySet.contains(k) should be(true) } + resultMap.keys should contain allElementsOf DefaultProps.keys } "S3ConfigDef" should "parse merged properties" in { @@ -69,6 +69,6 @@ class S3ConfigDefTest extends AnyFlatSpec with Matchers { DeprecatedProps.filterNot { case (k, _) => k == KCQL_CONFIG }.foreach { case (k, _) => resultMap.get(k) should be(None) } - DefaultProps.foreach { case (k, _) => resultMap.keySet.contains(k) should be(true) } + resultMap.keys should contain allElementsOf DefaultProps.keys } } diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala index d66bea15e..9daaebdb7 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/config/S3ConfigSettingsTest.scala @@ -44,7 +44,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging { case (k, _) => ignorePropertiesWithSuffix.exists(k.contains(_)) } - docs should have size 33 + docs should have size 35 docs.foreach { case (k, v) => { logger.info("method: {}, value: {}", k, v) diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/model/S3StoredFileTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/model/S3StoredFileTest.scala index dbdeb49bc..8eb81792e 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/model/S3StoredFileTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/model/S3StoredFileTest.scala @@ -18,6 +18,7 @@ package io.lenses.streamreactor.connect.aws.s3.model import io.lenses.streamreactor.connect.aws.s3.config.FormatSelection import io.lenses.streamreactor.connect.aws.s3.sink.HierarchicalS3FileNamingStrategy +import io.lenses.streamreactor.connect.aws.s3.sink.NoOpPaddingStrategy import io.lenses.streamreactor.connect.aws.s3.sink.PartitionedS3FileNamingStrategy import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -30,7 +31,7 @@ class S3StoredFileTest extends AnyFlatSpec with Matchers { "apply" should "parse hierarchical scheme" in { implicit val hierarchical: HierarchicalS3FileNamingStrategy = - new HierarchicalS3FileNamingStrategy(FormatSelection.fromString("`JSON`")) + new HierarchicalS3FileNamingStrategy(FormatSelection.fromString("`JSON`"), NoOpPaddingStrategy) S3StoredFile("dragon-test/myTopicName/1/1.json") should be(Some(S3StoredFile( "dragon-test/myTopicName/1/1.json", @@ -42,6 +43,7 @@ class S3StoredFileTest extends AnyFlatSpec with Matchers { implicit val partitioned: PartitionedS3FileNamingStrategy = new PartitionedS3FileNamingStrategy( FormatSelection.fromString("`JSON`"), + NoOpPaddingStrategy, PartitionSelection(Seq.empty[PartitionField]), ) diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/CommittedFileNameTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/CommittedFileNameTest.scala index 32ed910e2..93d869923 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/CommittedFileNameTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/CommittedFileNameTest.scala @@ -34,10 +34,11 @@ class CommittedFileNameTest extends AnyFlatSpecLike with Matchers { ValuePartitionField(PartitionNamePath("partition2")), )) - class HierarchicalJsonTestContext extends TestContext(new HierarchicalS3FileNamingStrategy(FormatSelection(Json))) + class HierarchicalJsonTestContext + extends TestContext(new HierarchicalS3FileNamingStrategy(FormatSelection(Json), NoOpPaddingStrategy)) class PartitionedAvroTestContext - extends TestContext(new PartitionedS3FileNamingStrategy(FormatSelection(Avro), partitions)) + extends TestContext(new PartitionedS3FileNamingStrategy(FormatSelection(Avro), NoOpPaddingStrategy, partitions)) "unapply" should "recognise hierarchical filenames in prefix/topic/927/77.json format" in new HierarchicalJsonTestContext { CommittedFileName.unapply("prefix/topic/927/77.json") should be(Some((Topic("topic"), 927, Offset(77), Json))) diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/PaddingStrategyTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/PaddingStrategyTest.scala new file mode 100644 index 000000000..3ae02d80b --- /dev/null +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/PaddingStrategyTest.scala @@ -0,0 +1,19 @@ +package io.lenses.streamreactor.connect.aws.s3.sink + +import org.scalatest.flatspec.AnyFlatSpecLike +import org.scalatest.matchers.should.Matchers + +class PaddingStrategyTest extends AnyFlatSpecLike with Matchers { + + "NoOpPaddingStrategy" should "return string as is" in { + NoOpPaddingStrategy.padString("1") should be("1") + } + + "LeftPaddingStrategy" should "pad string left" in { + LeftPadPaddingStrategy(5, '0').padString("2") should be("00002") + } + + "RightPaddingStrategy" should "pad string right" in { + RightPadPaddingStrategy(10, '0').padString("3") should be("3000000000") + } +} diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/seek/LegacyOffsetSeekerTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/seek/LegacyOffsetSeekerTest.scala index 0b671bf99..fd91a3a23 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/seek/LegacyOffsetSeekerTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/seek/LegacyOffsetSeekerTest.scala @@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.aws.s3.model.Offset import io.lenses.streamreactor.connect.aws.s3.model.Topic import io.lenses.streamreactor.connect.aws.s3.model.TopicPartitionOffset import io.lenses.streamreactor.connect.aws.s3.sink.HierarchicalS3FileNamingStrategy +import io.lenses.streamreactor.connect.aws.s3.sink.NoOpPaddingStrategy import io.lenses.streamreactor.connect.aws.s3.storage.StorageInterface import org.mockito.MockitoSugar import org.scalatest.flatspec.AnyFlatSpec @@ -41,7 +42,7 @@ class LegacyOffsetSeekerTest with EitherValues with OptionValues { - private val fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Json)) + private val fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Json), NoOpPaddingStrategy) private implicit val storageInterface: StorageInterface = mock[StorageInterface]