Skip to content

Commit

Permalink
Add option to add a padding so that files can be restored in order by…
Browse files Browse the repository at this point in the history
… the source (#919)

Co-authored-by: David Sloan <[email protected]>
  • Loading branch information
davidsloan and davidsloan authored Feb 10, 2023
1 parent d103515 commit c4ea086
Show file tree
Hide file tree
Showing 17 changed files with 178 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = DefaultCommitPolicy(None, None, Some(2)),
formatSelection = FormatSelection(Avro),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Avro)),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Avro), NoOpPaddingStrategy),
localStagingArea = LocalStagingArea(localRoot),
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = DefaultCommitPolicy(None, None, Some(1)),
formatSelection = FormatSelection(Json),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Json)),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Json), NoOpPaddingStrategy),
localStagingArea = LocalStagingArea(localRoot),
), // JsonS3Format
),
Expand Down Expand Up @@ -96,10 +96,11 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
SinkBucketOptions(
TopicName,
bucketAndPrefix,
commitPolicy = DefaultCommitPolicy(None, None, Some(3)),
formatSelection = FormatSelection(Json),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Json)), // JsonS3Format
localStagingArea = LocalStagingArea(localRoot),
commitPolicy = DefaultCommitPolicy(None, None, Some(3)),
formatSelection = FormatSelection(Json),
fileNamingStrategy =
new HierarchicalS3FileNamingStrategy(FormatSelection(Json), NoOpPaddingStrategy), // JsonS3Format
localStagingArea = LocalStagingArea(localRoot),
),
),
offsetSeekerOptions = OffsetSeekerOptions(5, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
TopicName,
bucketAndPrefix,
commitPolicy = DefaultCommitPolicy(None, None, Some(2)),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Parquet)),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Parquet), NoOpPaddingStrategy),
formatSelection = FormatSelection(Parquet),
localStagingArea = LocalStagingArea(localRoot),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2119,6 +2119,37 @@ class S3SinkTaskTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest
)
}

"S3SinkTask" should "write files with topic partition padded when requested" in {

val task = new S3SinkTask()

val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName WITH_FLUSH_COUNT = 3",
"connect.s3.padding.strategy" -> "LeftPad",
"connect.s3.padding.length" -> "10",
),
).asJava

task.start(props)
task.open(Seq(new TopicPartition(TopicName, 1)).asJava)
task.put(records.asJava)
task.close(Seq(new TopicPartition(TopicName, 1)).asJava)
task.stop()

listBucketPath(BucketName, "streamReactorBackups/myTopic/0000000001/").size should be(1)

remoteFileAsString(BucketName, "streamReactorBackups/myTopic/0000000001/0000000002.json") should be(
"""
|{"name":"sam","title":"mr","salary":100.43}
|{"name":"laura","title":"ms","salary":429.06}
|{"name":"tom","title":null,"salary":395.44}
|""".stripMargin.filter(_ >= ' '),
)

}

private def createSinkRecord(partition: Int, valueStruct: Struct, offset: Int, headers: lang.Iterable[Header]) =
new SinkRecord(TopicName, partition, null, null, null, valueStruct, offset.toLong, null, null, headers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class S3WriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContaine
"myLovelySink",
_ => DefaultCommitPolicy(Some(5L), Some(FiniteDuration(5, SECONDS)), Some(5L)).asRight,
_ => RemoteS3RootLocation("bucketAndPath:location").asRight,
_ => new HierarchicalS3FileNamingStrategy(FormatSelection.fromString("csv")).asRight,
_ => new HierarchicalS3FileNamingStrategy(FormatSelection.fromString("csv"), NoOpPaddingStrategy).asRight,
(_, _) => new File("blah.csv").asRight,
(_, _, _) => RemoteS3PathLocation("bucket", "path").asRight,
(_, _) => mock[S3FormatWriter].asRight,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.lenses.streamreactor.connect.aws.s3.config

import com.datamountaineer.streamreactor.common.config.base.traits.BaseSettings
import enumeratum.Enum
import enumeratum.EnumEntry
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.PADDING_LENGTH
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.PADDING_STRATEGY
import io.lenses.streamreactor.connect.aws.s3.sink.LeftPadPaddingStrategy
import io.lenses.streamreactor.connect.aws.s3.sink.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.aws.s3.sink.PaddingStrategy
import io.lenses.streamreactor.connect.aws.s3.sink.RightPadPaddingStrategy

sealed trait PaddingStrategyOptions extends EnumEntry

object PaddingStrategyOptions extends Enum[PaddingStrategyOptions] {

val values = findValues

case object LeftPad extends PaddingStrategyOptions
case object RightPad extends PaddingStrategyOptions
case object NoOp extends PaddingStrategyOptions

}

trait PaddingStrategySettings extends BaseSettings {

private val paddingChar: Char = '0'

def getPaddingStrategy(): PaddingStrategy = {
val paddingLength = getInt(PADDING_LENGTH)
PaddingStrategyOptions.withNameInsensitive(getString(PADDING_STRATEGY)) match {
case PaddingStrategyOptions.LeftPad => LeftPadPaddingStrategy(paddingLength, paddingChar)
case PaddingStrategyOptions.RightPad => RightPadPaddingStrategy(paddingLength, paddingChar)
case _ => NoOpPaddingStrategy
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ object S3ConfigDef {
Importance.LOW,
COMPRESSION_LEVEL_DOC,
)
.define(
PADDING_STRATEGY,
Type.STRING,
PADDING_STRATEGY_DEFAULT,
Importance.LOW,
PADDING_STRATEGY_DOC,
)
.define(
PADDING_LENGTH,
Type.INT,
PADDING_LENGTH_DEFAULT,
Importance.LOW,
PADDING_LENGTH_DOC,
)
}

class S3ConfigDef() extends ConfigDef with LazyLogging {
Expand Down Expand Up @@ -283,7 +297,8 @@ case class S3ConfigDefBuilder(sinkName: Option[String], props: util.Map[String,
with UserSettings
with ConnectionSettings
with S3FlushSettings
with CompressionCodecSettings {
with CompressionCodecSettings
with PaddingStrategySettings {

def getParsedValues: Map[String, _] = values().asScala.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,13 @@ object S3ConfigSettings {
val COMPRESSION_LEVEL = s"$CONNECTOR_PREFIX.compression.level"
val COMPRESSION_LEVEL_DOC = "Certain compression codecs require a level specified."
val COMPRESSION_LEVEL_DEFAULT = -1

val PADDING_STRATEGY = s"$CONNECTOR_PREFIX.padding.strategy"
val PADDING_STRATEGY_DOC =
"Configure in order to pad the partition and offset on the sink output files. Options are `LeftPad`, `RightPad` or `NoOp`. Defaults to `NoOp` (does not add padding)."
val PADDING_STRATEGY_DEFAULT = "NoOp"

val PADDING_LENGTH = s"$CONNECTOR_PREFIX.padding.length"
val PADDING_LENGTH_DOC = s"Length to pad the string up to if $PADDING_STRATEGY is set."
val PADDING_LENGTH_DEFAULT = 8
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ trait S3FileNamingStrategy {

}

class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection) extends S3FileNamingStrategy {
class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection, paddingStrategy: PaddingStrategy)
extends S3FileNamingStrategy {

import paddingStrategy._

val format: Format = formatSelection.format

Expand All @@ -83,8 +86,8 @@ class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection) extends
val file = stagingDirectory
.toPath
.resolve(prefix(bucketAndPrefix))
.resolve(topicPartition.topic.value)
.resolve(s"${topicPartition.partition}.${format.entryName.toLowerCase}")
.resolve(padString(topicPartition.topic.value))
.resolve(s"${padString(topicPartition.partition.toString)}.${format.entryName.toLowerCase}")
.resolve(uuid)
.toFile
createFileAndParents(file)
Expand All @@ -98,7 +101,9 @@ class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection) extends
): Either[FatalS3SinkError, RemoteS3PathLocation] =
Try(
bucketAndPrefix.withPath(
s"${prefix(bucketAndPrefix)}/${topicPartitionOffset.topic.value}/${topicPartitionOffset.partition}/${topicPartitionOffset.offset.value}.${format.entryName.toLowerCase}",
s"${prefix(bucketAndPrefix)}/${topicPartitionOffset.topic.value}/${padString(
topicPartitionOffset.partition.toString,
)}/${padString(topicPartitionOffset.offset.value.toString)}.${format.entryName.toLowerCase}",
),
).toEither.left.map(ex => FatalS3SinkError(ex.getMessage, topicPartitionOffset.toTopicPartition))

Expand All @@ -118,12 +123,19 @@ class HierarchicalS3FileNamingStrategy(formatSelection: FormatSelection) extends
bucketAndPrefix: RemoteS3RootLocation,
topicPartition: TopicPartition,
): RemoteS3PathLocation =
bucketAndPrefix.withPath(s"${prefix(bucketAndPrefix)}/${topicPartition.topic.value}/${topicPartition.partition}/")
bucketAndPrefix.withPath(
s"${prefix(bucketAndPrefix)}/${topicPartition.topic.value}/${padString(topicPartition.partition.toString)}/",
)

}

class PartitionedS3FileNamingStrategy(formatSelection: FormatSelection, partitionSelection: PartitionSelection)
extends S3FileNamingStrategy {
class PartitionedS3FileNamingStrategy(
formatSelection: FormatSelection,
paddingStrategy: PaddingStrategy,
partitionSelection: PartitionSelection,
) extends S3FileNamingStrategy {

import paddingStrategy._

val format: Format = formatSelection.format

Expand All @@ -142,7 +154,7 @@ class PartitionedS3FileNamingStrategy(formatSelection: FormatSelection, partitio
.resolve(prefix(bucketAndPrefix))
.resolve(buildPartitionPrefix(partitionValues))
.resolve(topicPartition.topic.value)
.resolve(topicPartition.partition.toString)
.resolve(padString(topicPartition.partition.toString))
.resolve(format.entryName.toLowerCase)
.resolve(uuid)
.toFile
Expand All @@ -167,7 +179,9 @@ class PartitionedS3FileNamingStrategy(formatSelection: FormatSelection, partitio
): Either[FatalS3SinkError, RemoteS3PathLocation] =
Try(
bucketAndPrefix.withPath(
s"${prefix(bucketAndPrefix)}/${buildPartitionPrefix(partitionValues)}/${topicPartitionOffset.topic.value}(${topicPartitionOffset.partition}_${topicPartitionOffset.offset.value}).${format.entryName.toLowerCase}",
s"${prefix(bucketAndPrefix)}/${buildPartitionPrefix(partitionValues)}/${topicPartitionOffset.topic.value}(${padString(
topicPartitionOffset.partition.toString,
)}_${padString(topicPartitionOffset.offset.value.toString)}).${format.entryName.toLowerCase}",
),
).toEither.left.map(ex => FatalS3SinkError(ex.getMessage, topicPartitionOffset.toTopicPartition))

Expand Down Expand Up @@ -196,7 +210,7 @@ class PartitionedS3FileNamingStrategy(formatSelection: FormatSelection, partitio
case partition @ WholeKeyPartitionField() =>
partition -> getPartitionByWholeKeyValue(messageDetail.keySinkData)
case partition @ TopicPartitionField() => partition -> topicPartition.topic.value
case partition @ PartitionPartitionField() => partition -> topicPartition.partition.toString
case partition @ PartitionPartitionField() => partition -> padString(topicPartition.partition.toString)
case partition @ DatePartitionField(_) => partition ->
messageDetail.time.fold(
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.lenses.streamreactor.connect.aws.s3.sink

trait PaddingStrategy {
def padString(padMe: String): String
}

object NoOpPaddingStrategy extends PaddingStrategy {
override def padString(dontPadMe: String): String = dontPadMe
}

case class LeftPadPaddingStrategy(maxDigits: Int, padCharacter: Char) extends PaddingStrategy {
override def padString(padMe: String): String = padMe.reverse.padTo(maxDigits, padCharacter).reverse
}

case class RightPadPaddingStrategy(maxDigits: Int, padCharacter: Char) extends PaddingStrategy {
override def padString(padMe: String): String = padMe.padTo(maxDigits, padCharacter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ object SinkBucketOptions extends LazyLogging {

val partitionSelection = PartitionSelection(kcql)
val namingStrategy = partitionSelection match {
case Some(partSel) => new PartitionedS3FileNamingStrategy(formatSelection, partSel)
case None => new HierarchicalS3FileNamingStrategy(formatSelection)
case Some(partSel) => new PartitionedS3FileNamingStrategy(formatSelection, config.getPaddingStrategy(), partSel)
case None => new HierarchicalS3FileNamingStrategy(formatSelection, config.getPaddingStrategy())
}

val stagingArea = LocalStagingArea(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ class S3ConfigDefTest extends AnyFlatSpec with Matchers {

"S3ConfigDef" should "parse original properties" in {
val resultMap = S3ConfigDef.config.parse(DefaultProps.asJava).asScala
resultMap should have size 24
resultMap should have size 26
DeprecatedProps.filterNot { case (k, _) => k == KCQL_CONFIG }.foreach {
case (k, _) => resultMap.get(k) should be(None)
}
DefaultProps.foreach { case (k, _) => resultMap.keySet.contains(k) should be(true) }
resultMap.keys should contain allElementsOf DefaultProps.keys
}

"S3ConfigDef" should "parse deprecated properties" in {
val resultMap = S3ConfigDef.config.parse(DeprecatedProps.asJava).asScala
resultMap should have size 24
resultMap should have size 26
DeprecatedProps.filterNot { case (k, _) => k == KCQL_CONFIG }.foreach {
case (k, _) => resultMap.get(k) should be(None)
}
DefaultProps.foreach { case (k, _) => resultMap.keySet.contains(k) should be(true) }
resultMap.keys should contain allElementsOf DefaultProps.keys
}

"S3ConfigDef" should "parse merged properties" in {
Expand All @@ -69,6 +69,6 @@ class S3ConfigDefTest extends AnyFlatSpec with Matchers {
DeprecatedProps.filterNot { case (k, _) => k == KCQL_CONFIG }.foreach {
case (k, _) => resultMap.get(k) should be(None)
}
DefaultProps.foreach { case (k, _) => resultMap.keySet.contains(k) should be(true) }
resultMap.keys should contain allElementsOf DefaultProps.keys
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging {
case (k, _) => ignorePropertiesWithSuffix.exists(k.contains(_))
}

docs should have size 33
docs should have size 35
docs.foreach {
case (k, v) => {
logger.info("method: {}, value: {}", k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.lenses.streamreactor.connect.aws.s3.model

import io.lenses.streamreactor.connect.aws.s3.config.FormatSelection
import io.lenses.streamreactor.connect.aws.s3.sink.HierarchicalS3FileNamingStrategy
import io.lenses.streamreactor.connect.aws.s3.sink.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.aws.s3.sink.PartitionedS3FileNamingStrategy
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -30,7 +31,7 @@ class S3StoredFileTest extends AnyFlatSpec with Matchers {
"apply" should "parse hierarchical scheme" in {

implicit val hierarchical: HierarchicalS3FileNamingStrategy =
new HierarchicalS3FileNamingStrategy(FormatSelection.fromString("`JSON`"))
new HierarchicalS3FileNamingStrategy(FormatSelection.fromString("`JSON`"), NoOpPaddingStrategy)

S3StoredFile("dragon-test/myTopicName/1/1.json") should be(Some(S3StoredFile(
"dragon-test/myTopicName/1/1.json",
Expand All @@ -42,6 +43,7 @@ class S3StoredFileTest extends AnyFlatSpec with Matchers {

implicit val partitioned: PartitionedS3FileNamingStrategy = new PartitionedS3FileNamingStrategy(
FormatSelection.fromString("`JSON`"),
NoOpPaddingStrategy,
PartitionSelection(Seq.empty[PartitionField]),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ class CommittedFileNameTest extends AnyFlatSpecLike with Matchers {
ValuePartitionField(PartitionNamePath("partition2")),
))

class HierarchicalJsonTestContext extends TestContext(new HierarchicalS3FileNamingStrategy(FormatSelection(Json)))
class HierarchicalJsonTestContext
extends TestContext(new HierarchicalS3FileNamingStrategy(FormatSelection(Json), NoOpPaddingStrategy))

class PartitionedAvroTestContext
extends TestContext(new PartitionedS3FileNamingStrategy(FormatSelection(Avro), partitions))
extends TestContext(new PartitionedS3FileNamingStrategy(FormatSelection(Avro), NoOpPaddingStrategy, partitions))

"unapply" should "recognise hierarchical filenames in prefix/topic/927/77.json format" in new HierarchicalJsonTestContext {
CommittedFileName.unapply("prefix/topic/927/77.json") should be(Some((Topic("topic"), 927, Offset(77), Json)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.lenses.streamreactor.connect.aws.s3.sink

import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers

class PaddingStrategyTest extends AnyFlatSpecLike with Matchers {

"NoOpPaddingStrategy" should "return string as is" in {
NoOpPaddingStrategy.padString("1") should be("1")
}

"LeftPaddingStrategy" should "pad string left" in {
LeftPadPaddingStrategy(5, '0').padString("2") should be("00002")
}

"RightPaddingStrategy" should "pad string right" in {
RightPadPaddingStrategy(10, '0').padString("3") should be("3000000000")
}
}
Loading

0 comments on commit c4ea086

Please sign in to comment.