diff --git a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/cloud/common/sink/CoreSinkTaskTestCases.scala b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/cloud/common/sink/CoreSinkTaskTestCases.scala index d64891a9f..2afdc07d9 100644 --- a/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/cloud/common/sink/CoreSinkTaskTestCases.scala +++ b/kafka-connect-aws-s3/src/it/scala/io/lenses/streamreactor/connect/cloud/common/sink/CoreSinkTaskTestCases.scala @@ -160,24 +160,6 @@ abstract class CoreSinkTaskTestCases[SM <: FileMetadata, SI <: StorageInterface[ } - unitUnderTest should "throw error if prefix contains a slash" in { - - val task = createSinkTask() - - val prefixWithSlashes = "my/prefix/that/is/a/path" - val props = (defaultProps - + ( - s"$prefix.kcql" -> s"insert into $BucketName:$prefixWithSlashes select * from $TopicName WITH_FLUSH_INTERVAL = 1", - )).asJava - - val intercepted = intercept[IllegalArgumentException] { - task.start(props) - } - - intercepted.getMessage should be("Nested prefix not currently supported") - - } - unitUnderTest should "flush for every record when configured flush count size of 1" in { val props = diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/model/location/S3LocationValidator.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/model/location/S3LocationValidator.scala index b1734b1ab..ce977d2ab 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/model/location/S3LocationValidator.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/model/location/S3LocationValidator.scala @@ -24,11 +24,10 @@ import scala.util.Try object S3LocationValidator extends CloudLocationValidator { - def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] = + def validate(location: CloudLocation): Validated[Throwable, CloudLocation] = Validated.fromEither( for { _ <- validateBucketName(location.bucket).toEither - _ <- validatePrefix(allowSlash, location.prefix).toEither } yield location, ) @@ -39,12 +38,4 @@ object S3LocationValidator extends CloudLocationValidator { }, ) - private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] = - Validated.fromEither( - Either.cond( - allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))), - (), - new IllegalArgumentException("Nested prefix not currently supported"), - ), - ) } diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala index dc680e325..4acc01296 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala @@ -119,7 +119,7 @@ object SourceBucketOptions { config.getKCQL.map { kcql: Kcql => for { - source <- CloudLocation.splitAndValidate(kcql.getSource, allowSlash = true) + source <- CloudLocation.splitAndValidate(kcql.getSource) format <- FormatSelection.fromKcql(kcql, CloudSourcePropsSchema.schema) sourceProps = CloudSourceProps.fromKcql(kcql) diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/model/CloudLocationTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/model/CloudLocationTest.scala index 50b9884a4..400ca5738 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/model/CloudLocationTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/model/CloudLocationTest.scala @@ -26,29 +26,26 @@ import org.scalatest.matchers.should.Matchers class CloudLocationTest extends AnyFlatSpec with Matchers with EitherValues { implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator - "bucketAndPrefix" should "reject prefixes with slashes" in { - expectException( - CloudLocation.splitAndValidate("bucket:/slash", allowSlash = false), - "Nested prefix not currently supported", - ) + "bucketAndPrefix" should "accept prefixes with slashes" in { + + CloudLocation.splitAndValidate("bucket:/slash").value should be(CloudLocation("bucket", "/slash".some)) + } "bucketAndPrefix" should "split the bucket and prefix" in { - CloudLocation.splitAndValidate("bucket:prefix", allowSlash = false).value should be(CloudLocation("bucket", - "prefix".some, - )) + CloudLocation.splitAndValidate("bucket:prefix").value should be(CloudLocation("bucket", "prefix".some)) } "bucketAndPrefix" should "fail if given too many components to split" in { expectException( - CloudLocation.splitAndValidate("bucket:path:whatIsThis", false), + CloudLocation.splitAndValidate("bucket:path:whatIsThis"), "Invalid number of arguments provided to create BucketAndPrefix", ) } "bucketAndPrefix" should "fail if not a valid bucket name" in { expectException( - CloudLocation.splitAndValidate("bucket-police-refu$e-this-name:path", allowSlash = true), + CloudLocation.splitAndValidate("bucket-police-refu$e-this-name:path"), "Bucket name should not contain '$'", ) } diff --git a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/model/location/DatalakeLocationValidator.scala b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/model/location/DatalakeLocationValidator.scala index 017d5aaba..dc6355a9e 100644 --- a/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/model/location/DatalakeLocationValidator.scala +++ b/kafka-connect-azure-datalake/src/main/scala/io/lenses/streamreactor/connect/datalake/model/location/DatalakeLocationValidator.scala @@ -22,11 +22,10 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation object DatalakeLocationValidator extends CloudLocationValidator { private val ContainerNamePattern = "^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$".r - def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] = + def validate(location: CloudLocation): Validated[Throwable, CloudLocation] = Validated.fromEither( for { _ <- validateBucketName(location.bucket).toEither - _ <- validatePrefix(allowSlash, location.prefix).toEither } yield location, ) @@ -37,12 +36,4 @@ object DatalakeLocationValidator extends CloudLocationValidator { Validated.Invalid(new IllegalArgumentException("Nested prefix not currently supported")) } - private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] = - Validated.fromEither( - Either.cond( - allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))), - (), - new IllegalArgumentException("Nested prefix not currently supported"), - ), - ) } diff --git a/kafka-connect-azure-datalake/src/test/scala/io/lenses/streamreactor/connect/datalake/model/location/DatalakeLocationValidatorTest.scala b/kafka-connect-azure-datalake/src/test/scala/io/lenses/streamreactor/connect/datalake/model/location/DatalakeLocationValidatorTest.scala index 19ce0a0b0..0f8b6639e 100644 --- a/kafka-connect-azure-datalake/src/test/scala/io/lenses/streamreactor/connect/datalake/model/location/DatalakeLocationValidatorTest.scala +++ b/kafka-connect-azure-datalake/src/test/scala/io/lenses/streamreactor/connect/datalake/model/location/DatalakeLocationValidatorTest.scala @@ -28,25 +28,19 @@ class DatalakeLocationValidatorTest extends AnyFunSuite with Matchers { test("DatalakeLocationValidator should validate a valid bucket name") { val location = CloudLocation("valid-bucket-name", none, "valid-path".some) - val result = DatalakeLocationValidator.validate(location, allowSlash = false) + val result = DatalakeLocationValidator.validate(location) result shouldBe Validated.Valid(location) } test("DatalakeLocationValidator should return an error for an invalid bucket name") { val location = CloudLocation("invalid_bucket_name", none, "valid-path".some) - val result = DatalakeLocationValidator.validate(location, allowSlash = false) + val result = DatalakeLocationValidator.validate(location) result shouldBe a[Validated.Invalid[_]] } - test("DatalakeLocationValidator should return an error if prefix contains a slash when not allowed") { + test("DatalakeLocationValidator should allow prefix with a slash in") { val location = CloudLocation("valid-bucket-name", "prefix/".some, "valid-path".some) - val result = DatalakeLocationValidator.validate(location, allowSlash = false) - result shouldBe a[Validated.Invalid[_]] - } - - test("DatalakeLocationValidator should allow prefix with a slash when allowed") { - val location = CloudLocation("valid-bucket-name", "prefix/".some, "valid-path".some) - val result = DatalakeLocationValidator.validate(location, allowSlash = true) + val result = DatalakeLocationValidator.validate(location) result shouldBe Validated.Valid(location) } } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala index de789a2af..04dc51168 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocation.scala @@ -53,8 +53,8 @@ case class CloudLocation( def prefixOrDefault(): String = prefix.getOrElse("") - private def validate(allowSlash: Boolean): Validated[Throwable, CloudLocation] = - cloudLocationValidator.validate(this, allowSlash) + private def validate(): Validated[Throwable, CloudLocation] = + cloudLocationValidator.validate(this) override def toString: String = { val prefixStr = prefix.map(p => s"$p/").getOrElse("") @@ -69,16 +69,15 @@ case class CloudLocation( case object CloudLocation { def splitAndValidate( bucketAndPrefix: String, - allowSlash: Boolean, )( implicit validator: CloudLocationValidator, ): Either[Throwable, CloudLocation] = bucketAndPrefix.split(":") match { case Array(bucket) => - CloudLocation(bucket, None).validate(allowSlash).toEither + CloudLocation(bucket, None).validate().toEither case Array(bucket, path) => - CloudLocation(bucket, Some(path)).validate(allowSlash).toEither + CloudLocation(bucket, Some(path)).validate().toEither case _ => new IllegalArgumentException("Invalid number of arguments provided to create BucketAndPrefix").asLeft } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocationValidator.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocationValidator.scala index 4e1e09d4e..faa296825 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocationValidator.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/model/location/CloudLocationValidator.scala @@ -18,6 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.model.location import cats.data.Validated trait CloudLocationValidator { - def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] + def validate(location: CloudLocation): Validated[Throwable, CloudLocation] } diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/CloudSinkBucketOptions.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/CloudSinkBucketOptions.scala index 58505fd27..04173ff12 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/CloudSinkBucketOptions.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/CloudSinkBucketOptions.scala @@ -64,7 +64,7 @@ object CloudSinkBucketOptions extends LazyLogging { } keyNamer = CloudKeyNamer(formatSelection, partitionSelection, fileNamer, paddingService) stagingArea <- config.getLocalStagingArea() - target <- CloudLocation.splitAndValidate(kcql.getTarget, allowSlash = false) + target <- CloudLocation.splitAndValidate(kcql.getTarget) storageSettings <- DataStorageSettings.from(sinkProps) _ <- validateEnvelopeAndFormat(formatSelection, storageSettings) commitPolicy = config.commitPolicy(kcql) diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/utils/SampleData.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/utils/SampleData.scala index 3431fd39f..c38ea6931 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/utils/SampleData.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/utils/SampleData.scala @@ -37,7 +37,7 @@ import scala.jdk.CollectionConverters.MapHasAsScala object SampleData extends Matchers { - implicit val cloudLocationValidator: CloudLocationValidator = (s3Location: CloudLocation, allowSlash: Boolean) => + implicit val cloudLocationValidator: CloudLocationValidator = (s3Location: CloudLocation) => Validated.fromEither(Right(s3Location)) val topic: Topic = Topic("niceTopic") diff --git a/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/cloud/common/sink/CoreSinkTaskTestCases.scala b/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/cloud/common/sink/CoreSinkTaskTestCases.scala index d2276d788..390a5a6cf 100644 --- a/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/cloud/common/sink/CoreSinkTaskTestCases.scala +++ b/kafka-connect-gcp-storage/src/it/scala/io/lenses/streamreactor/connect/cloud/common/sink/CoreSinkTaskTestCases.scala @@ -160,24 +160,6 @@ abstract class CoreSinkTaskTestCases[SM <: FileMetadata, SI <: StorageInterface[ } - unitUnderTest should "throw error if prefix contains a slash" in { - - val task = createSinkTask() - - val prefixWithSlashes = "my/prefix/that/is/a/path" - val props = (defaultProps - + ( - s"$prefix.kcql" -> s"insert into $BucketName:$prefixWithSlashes select * from $TopicName WITH_FLUSH_INTERVAL = 1", - )).asJava - - val intercepted = intercept[IllegalArgumentException] { - task.start(props) - } - - intercepted.getMessage should be("Nested prefix not currently supported") - - } - unitUnderTest should "flush for every record when configured flush count size of 1" in { val props = diff --git a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/model/location/GCPStorageLocationValidator.scala b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/model/location/GCPStorageLocationValidator.scala index 3cb052aef..eb03af708 100644 --- a/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/model/location/GCPStorageLocationValidator.scala +++ b/kafka-connect-gcp-storage/src/main/scala/io/lenses/streamreactor/connect/gcp/storage/model/location/GCPStorageLocationValidator.scala @@ -22,11 +22,10 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation object GCPStorageLocationValidator extends CloudLocationValidator { private val ContainerNamePattern = "^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$".r - def validate(location: CloudLocation, allowSlash: Boolean): Validated[Throwable, CloudLocation] = + def validate(location: CloudLocation): Validated[Throwable, CloudLocation] = Validated.fromEither( for { _ <- validateBucketName(location.bucket).toEither - _ <- validatePrefix(allowSlash, location.prefix).toEither } yield location, ) @@ -37,12 +36,4 @@ object GCPStorageLocationValidator extends CloudLocationValidator { Validated.Invalid(new IllegalArgumentException("Nested prefix not currently supported")) } - private def validatePrefix(allowSlash: Boolean, prefix: Option[String]): Validated[Throwable, Unit] = - Validated.fromEither( - Either.cond( - allowSlash || (!allowSlash && !prefix.exists(_.contains("/"))), - (), - new IllegalArgumentException("Nested prefix not currently supported"), - ), - ) } diff --git a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/model/location/GCPStorageLocationValidatorTest.scala b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/model/location/GCPStorageLocationValidatorTest.scala index 96f434cab..8995c9293 100644 --- a/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/model/location/GCPStorageLocationValidatorTest.scala +++ b/kafka-connect-gcp-storage/src/test/scala/io/lenses/streamreactor/connect/gcp/storage/model/location/GCPStorageLocationValidatorTest.scala @@ -25,35 +25,35 @@ class GCPStorageLocationValidatorTest extends AnyFunSuite with Matchers with Val test("validate should succeed for a valid CloudLocation") { val validLocation = CloudLocation("valid-bucket", Some("valid-prefix")) - val result = GCPStorageLocationValidator.validate(validLocation, allowSlash = true) + val result = GCPStorageLocationValidator.validate(validLocation) result.value should be(validLocation) } test("validate should fail for an invalid bucket name") { val invalidLocation = CloudLocation("invalid@bucket", Some("valid-prefix")) val result: Validated[Throwable, CloudLocation] = - GCPStorageLocationValidator.validate(invalidLocation, allowSlash = true) + GCPStorageLocationValidator.validate(invalidLocation) result.leftValue.getMessage should be("Nested prefix not currently supported") } - test("validate should fail for an invalid prefix with slashes not allowed") { - val invalidLocation = CloudLocation("valid-bucket", Some("invalid/prefix")) + test("validate should fail for a prefix with slashes") { + val invalidLocation = CloudLocation("valid-bucket", Some("slash/prefix")) val result: Validated[Throwable, CloudLocation] = - GCPStorageLocationValidator.validate(invalidLocation, allowSlash = false) - result.leftValue.getMessage should be("Nested prefix not currently supported") + GCPStorageLocationValidator.validate(invalidLocation) + result.value should be(CloudLocation("valid-bucket", Some("slash/prefix"))) } test("validate should succeed for a valid prefix with slashes not allowed") { val validLocation = CloudLocation("valid-bucket", Some("valid-prefix")) val result: Validated[Throwable, CloudLocation] = - GCPStorageLocationValidator.validate(validLocation, allowSlash = false) + GCPStorageLocationValidator.validate(validLocation) result.value should be(validLocation) } test("validate should succeed for a valid prefix with slashes allowed") { val validLocation = CloudLocation("valid-bucket", Some("valid-prefix")) val result: Validated[Throwable, CloudLocation] = - GCPStorageLocationValidator.validate(validLocation, allowSlash = true) + GCPStorageLocationValidator.validate(validLocation) result.value should be(validLocation) } }