Skip to content

Commit

Permalink
Enable wildcard syntax to support multiple topics without additional …
Browse files Browse the repository at this point in the history
…configuration. (#920)

Co-authored-by: David Sloan <[email protected]>
  • Loading branch information
davidsloan and davidsloan authored Feb 10, 2023
1 parent c4ea086 commit 53de6af
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.lenses.streamreactor.connect.aws.s3.sink

import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.aws.s3.config.Format.Avro
import io.lenses.streamreactor.connect.aws.s3.config.AuthMode
import io.lenses.streamreactor.connect.aws.s3.config.AwsClient
Expand Down Expand Up @@ -57,7 +58,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
),
bucketOptions = Set(
SinkBucketOptions(
TopicName,
TopicName.some,
bucketAndPrefix,
commitPolicy = DefaultCommitPolicy(None, None, Some(2)),
formatSelection = FormatSelection(Avro),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.lenses.streamreactor.connect.aws.s3.sink

import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.aws.s3.config.Format.Json
import io.lenses.streamreactor.connect.aws.s3.config.AuthMode
import io.lenses.streamreactor.connect.aws.s3.config.AwsClient
Expand Down Expand Up @@ -55,7 +56,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
),
bucketOptions = Set(
SinkBucketOptions(
TopicName,
TopicName.some,
bucketAndPrefix,
commitPolicy = DefaultCommitPolicy(None, None, Some(1)),
formatSelection = FormatSelection(Json),
Expand Down Expand Up @@ -94,7 +95,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
),
bucketOptions = Set(
SinkBucketOptions(
TopicName,
TopicName.some,
bucketAndPrefix,
commitPolicy = DefaultCommitPolicy(None, None, Some(3)),
formatSelection = FormatSelection(Json),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.lenses.streamreactor.connect.aws.s3.sink

import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.aws.s3.config.Format.Parquet
import io.lenses.streamreactor.connect.aws.s3.config.AuthMode
import io.lenses.streamreactor.connect.aws.s3.config.AwsClient
Expand Down Expand Up @@ -58,7 +59,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
),
bucketOptions = Set(
SinkBucketOptions(
TopicName,
TopicName.some,
bucketAndPrefix,
commitPolicy = DefaultCommitPolicy(None, None, Some(2)),
fileNamingStrategy = new HierarchicalS3FileNamingStrategy(FormatSelection(Parquet), NoOpPaddingStrategy),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ class S3SinkTaskTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest
toSinkRecord(user, k)
}

private def toSinkRecord(user: Struct, k: Int) =
new SinkRecord(TopicName, 1, null, null, schema, user, k.toLong)
private def toSinkRecord(user: Struct, k: Int, topicName: String = TopicName) =
new SinkRecord(topicName, 1, null, null, schema, user, k.toLong)

private val keySchema = SchemaBuilder.struct()
.field("phonePrefix", SchemaBuilder.string().required().build())
Expand Down Expand Up @@ -2081,6 +2081,47 @@ class S3SinkTaskTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest

}

"S3SinkTask" should "support multiple topics with wildcard syntax" in {

val topic2Name = "myTopic2"

val topic2Records = firstUsers.zipWithIndex.map {
case (user, k) =>
toSinkRecord(user, k, topic2Name)
}

val task = new S3SinkTask()

val props = DefaultProps
.combine(
Map(
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from `*` WITH_FLUSH_COUNT = 3",
),
).asJava

task.start(props)
task.open(Seq(new TopicPartition(TopicName, 1)).asJava)
task.open(Seq(new TopicPartition(topic2Name, 1)).asJava)
task.put(records.asJava)
task.put(topic2Records.asJava)
task.close(Seq(new TopicPartition(TopicName, 1)).asJava)
task.close(Seq(new TopicPartition(topic2Name, 1)).asJava)
task.stop()

Seq(TopicName, topic2Name).foreach {
tName =>
listBucketPath(BucketName, s"streamReactorBackups/$tName/1/").size should be(1)
remoteFileAsString(BucketName, s"streamReactorBackups/$tName/1/2.json") should be(
"""
|{"name":"sam","title":"mr","salary":100.43}
|{"name":"laura","title":"ms","salary":429.06}
|{"name":"tom","title":null,"salary":395.44}
|""".stripMargin.filter(_ >= ' '),
)
}

}

"S3SinkTask" should "partition by date" in {

def toTimestamp(str: String): Long =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,6 @@ object S3WriterManager extends LazyLogging {
}

private def bucketOptsForTopic(config: S3SinkConfig, topic: Topic): Option[SinkBucketOptions] =
config.bucketOptions.find(_.sourceTopic == topic.value)
config.bucketOptions.find(bo => bo.sourceTopic.isEmpty || bo.sourceTopic.contains(topic.value))

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object SinkBucketOptions extends LazyLogging {
val stagingArea = LocalStagingArea(config)
stagingArea match {
case Right(value) => SinkBucketOptions(
kcql.getSource,
Option(kcql.getSource).filterNot(Set("*", "`*`").contains(_)),
RemoteS3RootLocation(kcql.getTarget),
formatSelection = formatSelection,
fileNamingStrategy = namingStrategy,
Expand All @@ -92,13 +92,13 @@ object SinkBucketOptions extends LazyLogging {
}

case class SinkBucketOptions(
sourceTopic: String,
sourceTopic: Option[String],
bucketAndPrefix: RemoteS3RootLocation,
formatSelection: FormatSelection,
fileNamingStrategy: S3FileNamingStrategy,
partitionSelection: Option[PartitionSelection] = None,
commitPolicy: CommitPolicy = DefaultCommitPolicy(Some(defaultFlushSize.toLong), Some(defaultFlushInterval),
Some(defaultFlushCount.toLong)),
Some(defaultFlushCount)),
localStagingArea: LocalStagingArea,
)

Expand Down

0 comments on commit 53de6af

Please sign in to comment.