Skip to content

Commit

Permalink
Partition Name Keys & Naming Strategies Merge & Adding Extra Configur…
Browse files Browse the repository at this point in the history
…ation Options (#984)

* Merges naming strategies to reduce code complexity and make consistent.  Ensuring that hierarchical and custom partition modes follow mostly the same code paths and have not diverged as much. Starting addressing issues around the padding. Ensuring the key and value can be included within the partition name for both hierarchical and custom partitioning modes. Adding unit tests.
* Padding strategy configuration.
* update partition selection

* Removing default prefix.  Ensuring source ignores indexes

* Fix from review - treat all negatives as unset

* Adding additional unit testing around config, remove  from property names, change kcql properties parsing to support Map properties, adding checks to ensure both config methods cannot be attempted to be used simultaneously, addressing review comments

* Fixing tests

* Returning DeprecationConfigDefProcessor, fix from review
  • Loading branch information
davidsloan authored Sep 18, 2023
1 parent e8f6897 commit cb30d41
Show file tree
Hide file tree
Showing 68 changed files with 1,762 additions and 829 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class S3CompressionTest
}
}.asserting {
file =>
file.key() should be(s"$prefix/$topic/000000000000/000000000000.$format")
file.key() should be(s"$prefix/$topic/0/000000000000.$format")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class S3Test
assert(files.contents().size() == 1)
}

readKeyToOrder(s3Client, bucketName, "myfiles/orders/000000000000/000000000000.json")
readKeyToOrder(s3Client, bucketName, "myfiles/orders/0/000000000000.json")
}.asserting {
key: Order =>
key should be(order)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ import io.lenses.streamreactor.connect.aws.s3.model._
import io.lenses.streamreactor.connect.aws.s3.model.location.S3Location
import io.lenses.streamreactor.connect.aws.s3.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.aws.s3.sink.commit.Count
import io.lenses.streamreactor.connect.aws.s3.sink.config.PartitionSelection.defaultPartitionSelection
import io.lenses.streamreactor.connect.aws.s3.sink.config.LocalStagingArea
import io.lenses.streamreactor.connect.aws.s3.sink.config.OffsetSeekerOptions
import io.lenses.streamreactor.connect.aws.s3.sink.config.PartitionDisplay.Values
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.sink.config.SinkBucketOptions
import io.lenses.streamreactor.connect.aws.s3.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.aws.s3.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData._
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import org.apache.avro.generic.GenericRecord
Expand Down Expand Up @@ -63,11 +68,22 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
SinkBucketOptions(
TopicName.some,
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(2)),
formatSelection = AvroFormatSelection,
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(AvroFormatSelection, NoOpPaddingStrategy),
commitPolicy = CommitPolicy(Count(2)),
formatSelection = AvroFormatSelection,
keyNamer = new S3KeyNamer(
AvroFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
identity[String],
AvroFormatSelection.extension,
),
new PaddingService(Map[String, PaddingStrategy](
"partition" -> NoOpPaddingStrategy,
"offset" -> LeftPadPaddingStrategy(12, 0),
)),
),
localStagingArea = LocalStagingArea(localRoot),
partitionSelection = None,
partitionSelection = defaultPartitionSelection(Values),
dataStorage = DataStorageSettings.disabled,
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ import io.lenses.streamreactor.connect.aws.s3.model._
import io.lenses.streamreactor.connect.aws.s3.model.location.S3Location
import io.lenses.streamreactor.connect.aws.s3.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.aws.s3.sink.commit.Count
import io.lenses.streamreactor.connect.aws.s3.sink.config.PartitionSelection.defaultPartitionSelection
import io.lenses.streamreactor.connect.aws.s3.sink.config.LocalStagingArea
import io.lenses.streamreactor.connect.aws.s3.sink.config.OffsetSeekerOptions
import io.lenses.streamreactor.connect.aws.s3.sink.config.PartitionDisplay.Values
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.sink.config.SinkBucketOptions
import io.lenses.streamreactor.connect.aws.s3.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.aws.s3.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import org.apache.kafka.connect.data.Struct
import org.scalatest.flatspec.AnyFlatSpec
Expand Down Expand Up @@ -61,11 +66,22 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
SinkBucketOptions(
TopicName.some,
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(1)),
formatSelection = JsonFormatSelection,
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(JsonFormatSelection, NoOpPaddingStrategy),
commitPolicy = CommitPolicy(Count(1)),
formatSelection = JsonFormatSelection,
keyNamer = new S3KeyNamer(
JsonFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
identity[String],
JsonFormatSelection.extension,
),
new PaddingService(Map[String, PaddingStrategy](
"partition" -> NoOpPaddingStrategy,
"offset" -> LeftPadPaddingStrategy(12, 0),
)),
),
localStagingArea = LocalStagingArea(localRoot),
partitionSelection = None,
partitionSelection = defaultPartitionSelection(Values),
dataStorage = DataStorageSettings.disabled,
), // JsonS3Format
),
Expand Down Expand Up @@ -106,10 +122,20 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(3)),
formatSelection = JsonFormatSelection,
fileNamingStrategy =
new HierarchicalS3FileNamingStrategy(JsonFormatSelection, NoOpPaddingStrategy), // JsonS3Format
keyNamer = new S3KeyNamer(
AvroFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
identity[String],
JsonFormatSelection.extension,
),
new PaddingService(Map[String, PaddingStrategy](
"partition" -> NoOpPaddingStrategy,
"offset" -> LeftPadPaddingStrategy(12, 0),
)),
),
localStagingArea = LocalStagingArea(localRoot),
partitionSelection = None,
partitionSelection = defaultPartitionSelection(Values),
dataStorage = DataStorageSettings.disabled,
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ import io.lenses.streamreactor.connect.aws.s3.model._
import io.lenses.streamreactor.connect.aws.s3.model.location.S3Location
import io.lenses.streamreactor.connect.aws.s3.sink.commit.CommitPolicy
import io.lenses.streamreactor.connect.aws.s3.sink.commit.Count
import io.lenses.streamreactor.connect.aws.s3.sink.config.PartitionSelection.defaultPartitionSelection
import io.lenses.streamreactor.connect.aws.s3.sink.config.LocalStagingArea
import io.lenses.streamreactor.connect.aws.s3.sink.config.OffsetSeekerOptions
import io.lenses.streamreactor.connect.aws.s3.sink.config.PartitionDisplay.Values
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3SinkConfig
import io.lenses.streamreactor.connect.aws.s3.sink.config.SinkBucketOptions
import io.lenses.streamreactor.connect.aws.s3.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.aws.s3.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.aws.s3.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData._
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import org.apache.avro.generic.GenericRecord
Expand Down Expand Up @@ -64,11 +69,22 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
SinkBucketOptions(
TopicName.some,
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(2)),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(ParquetFormatSelection, NoOpPaddingStrategy),
commitPolicy = CommitPolicy(Count(2)),
keyNamer = new S3KeyNamer(
ParquetFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
identity[String],
ParquetFormatSelection.extension,
),
new PaddingService(Map[String, PaddingStrategy](
"partition" -> NoOpPaddingStrategy,
"offset" -> LeftPadPaddingStrategy(12, 0),
)),
),
formatSelection = ParquetFormatSelection,
localStagingArea = LocalStagingArea(localRoot),
partitionSelection = None,
partitionSelection = defaultPartitionSelection(Values),
dataStorage = DataStorageSettings.disabled,
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class S3SinkTaskAvroEnvelopeTest
val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS AVRO WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true)",
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS AVRO WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true, 'padding.length.partition'='12', 'padding.length.offset'='12')",
),
).asJava

Expand Down Expand Up @@ -237,7 +237,7 @@ class S3SinkTaskAvroEnvelopeTest
val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS AVRO WITH_FLUSH_INTERVAL = 1 WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true)",
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS AVRO WITH_FLUSH_INTERVAL = 1 WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true,'padding.length.partition'='12', 'padding.length.offset'='12')",
),
).asJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class S3SinkTaskJsonEnvelopeTest
val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true)",
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true,'padding.length.partition'='12', 'padding.length.offset'='12')",
),
).asJava

Expand Down Expand Up @@ -173,7 +173,7 @@ class S3SinkTaskJsonEnvelopeTest
val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true)",
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true,'padding.length.partition'='12', 'padding.length.offset'='12')",
),
).asJava

Expand Down Expand Up @@ -247,7 +247,7 @@ class S3SinkTaskJsonEnvelopeTest
val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true)",
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true,'padding.length.partition'='12', 'padding.length.offset'='12')",
),
).asJava

Expand Down Expand Up @@ -322,7 +322,7 @@ class S3SinkTaskJsonEnvelopeTest
val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=false)",
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=false,'padding.length.partition'='12', 'padding.length.offset'='12')",
),
).asJava

Expand Down Expand Up @@ -390,7 +390,7 @@ class S3SinkTaskJsonEnvelopeTest
val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true)",
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true,'padding.length.partition'='12', 'padding.length.offset'='12')",
),
).asJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class S3SinkTaskParquetEnvelopeTest
val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `PARQUET` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true)",
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `PARQUET` WITH_FLUSH_COUNT = 3 PROPERTIES('store.envelope'=true,'padding.length.partition'='12', 'padding.length.offset'='12')",
),
).asJava

Expand Down
Loading

0 comments on commit cb30d41

Please sign in to comment.