diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala index 7f5c2e2a2..b62b455ae 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3AvroWriterManagerTest.scala @@ -16,6 +16,7 @@ package io.lenses.streamreactor.connect.aws.s3.sink +import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.aws.s3.config.Format.Avro import io.lenses.streamreactor.connect.aws.s3.config.AuthMode import io.lenses.streamreactor.connect.aws.s3.config.AwsClient @@ -57,7 +58,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont ), bucketOptions = Set( SinkBucketOptions( - TopicName, + TopicName.some, bucketAndPrefix, commitPolicy = DefaultCommitPolicy(None, None, Some(2)), formatSelection = FormatSelection(Avro), diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala index ba0094ef3..b6daae104 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3JsonWriterManagerTest.scala @@ -16,6 +16,7 @@ package io.lenses.streamreactor.connect.aws.s3.sink +import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.aws.s3.config.Format.Json import io.lenses.streamreactor.connect.aws.s3.config.AuthMode import io.lenses.streamreactor.connect.aws.s3.config.AwsClient @@ -55,7 +56,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont ), bucketOptions = Set( SinkBucketOptions( - TopicName, + TopicName.some, bucketAndPrefix, commitPolicy = DefaultCommitPolicy(None, None, Some(1)), formatSelection = FormatSelection(Json), @@ -94,7 +95,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont ), bucketOptions = Set( SinkBucketOptions( - TopicName, + TopicName.some, bucketAndPrefix, commitPolicy = DefaultCommitPolicy(None, None, Some(3)), formatSelection = FormatSelection(Json), diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala index e1b0694f8..f3061d403 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3ParquetWriterManagerTest.scala @@ -16,6 +16,7 @@ package io.lenses.streamreactor.connect.aws.s3.sink +import cats.implicits.catsSyntaxOptionId import io.lenses.streamreactor.connect.aws.s3.config.Format.Parquet import io.lenses.streamreactor.connect.aws.s3.config.AuthMode import io.lenses.streamreactor.connect.aws.s3.config.AwsClient @@ -58,7 +59,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC ), bucketOptions = Set( SinkBucketOptions( - TopicName, + TopicName.some, bucketAndPrefix, commitPolicy = DefaultCommitPolicy(None, None, Some(2)), fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Parquet), NoOpPaddingStrategy), diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskTest.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskTest.scala index 85c86d474..550710a2d 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskTest.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTaskTest.scala @@ -117,8 +117,8 @@ class S3SinkTaskTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest toSinkRecord(user, k) } - private def toSinkRecord(user: Struct, k: Int) = - new SinkRecord(TopicName, 1, null, null, schema, user, k.toLong) + private def toSinkRecord(user: Struct, k: Int, topicName: String = TopicName) = + new SinkRecord(topicName, 1, null, null, schema, user, k.toLong) private val keySchema = SchemaBuilder.struct() .field("phonePrefix", SchemaBuilder.string().required().build()) @@ -2081,6 +2081,47 @@ class S3SinkTaskTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest } + "S3SinkTask" should "support multiple topics with wildcard syntax" in { + + val topic2Name = "myTopic2" + + val topic2Records = firstUsers.zipWithIndex.map { + case (user, k) => + toSinkRecord(user, k, topic2Name) + } + + val task = new S3SinkTask() + + val props = DefaultProps + .combine( + Map( + "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from `*` WITH_FLUSH_COUNT = 3", + ), + ).asJava + + task.start(props) + task.open(Seq(new TopicPartition(TopicName, 1)).asJava) + task.open(Seq(new TopicPartition(topic2Name, 1)).asJava) + task.put(records.asJava) + task.put(topic2Records.asJava) + task.close(Seq(new TopicPartition(TopicName, 1)).asJava) + task.close(Seq(new TopicPartition(topic2Name, 1)).asJava) + task.stop() + + Seq(TopicName, topic2Name).foreach { + tName => + listBucketPath(BucketName, s"streamReactorBackups/$tName/1/").size should be(1) + remoteFileAsString(BucketName, s"streamReactorBackups/$tName/1/2.json") should be( + """ + |{"name":"sam","title":"mr","salary":100.43} + |{"name":"laura","title":"ms","salary":429.06} + |{"name":"tom","title":null,"salary":395.44} + |""".stripMargin.filter(_ >= ' '), + ) + } + + } + "S3SinkTask" should "partition by date" in { def toTimestamp(str: String): Long = diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3WriterManager.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3WriterManager.scala index 98b6b5205..77441b73f 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3WriterManager.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3WriterManager.scala @@ -376,6 +376,6 @@ object S3WriterManager extends LazyLogging { } private def bucketOptsForTopic(config: S3SinkConfig, topic: Topic): Option[SinkBucketOptions] = - config.bucketOptions.find(_.sourceTopic == topic.value) + config.bucketOptions.find(bo => bo.sourceTopic.isEmpty || bo.sourceTopic.contains(topic.value)) } diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala index 3ddf5ab81..0edd5de09 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfig.scala @@ -77,7 +77,7 @@ object SinkBucketOptions extends LazyLogging { val stagingArea = LocalStagingArea(config) stagingArea match { case Right(value) => SinkBucketOptions( - kcql.getSource, + Option(kcql.getSource).filterNot(Set("*", "`*`").contains(_)), RemoteS3RootLocation(kcql.getTarget), formatSelection = formatSelection, fileNamingStrategy = namingStrategy, @@ -92,13 +92,13 @@ object SinkBucketOptions extends LazyLogging { } case class SinkBucketOptions( - sourceTopic: String, + sourceTopic: Option[String], bucketAndPrefix: RemoteS3RootLocation, formatSelection: FormatSelection, fileNamingStrategy: S3FileNamingStrategy, partitionSelection: Option[PartitionSelection] = None, commitPolicy: CommitPolicy = DefaultCommitPolicy(Some(defaultFlushSize.toLong), Some(defaultFlushInterval), - Some(defaultFlushCount.toLong)), + Some(defaultFlushCount)), localStagingArea: LocalStagingArea, )