Skip to content

Commit

Permalink
Bringing the fixes for Redis on master (#1012)
Browse files Browse the repository at this point in the history
* Bringing the fixes for Redis on master

* Adding missing close method.

* Fix the compilation error

* Avoid setting the base class or the tests need to convert back to GenericContainer

* Removed unused import

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Dec 1, 2023
1 parent efa059d commit 65d3aff
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings
import com.dimafeng.testcontainers.ForAllTestContainer
import com.dimafeng.testcontainers.GenericContainer
import com.google.gson.Gson
import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
Expand Down Expand Up @@ -66,8 +67,7 @@ class RedisCacheTest
val props = (baseProps + (RedisConfigConstants.KCQL_CONFIG -> QUERY_ALL)).asJava
val config = RedisConfig(props)
val settings = RedisSinkSettings(config)
val writer = new RedisCache(settings)
writer.createClient(settings)
val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings))

val json =
"""
Expand Down Expand Up @@ -99,8 +99,7 @@ class RedisCacheTest
val props = (baseProps + (RedisConfigConstants.KCQL_CONFIG -> QUERY_ALL)).asJava
val config = RedisConfig(props)
val settings = RedisSinkSettings(config)
val writer = new RedisCache(settings)
writer.createClient(settings)
val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings))

val childSchema = SchemaBuilder.struct().name("com.example.Child")
.field("firstName", Schema.STRING_SCHEMA)
Expand Down Expand Up @@ -154,8 +153,7 @@ class RedisCacheTest
val props = (baseProps + (RedisConfigConstants.KCQL_CONFIG -> QUERY_ALL)).asJava
val config = RedisConfig(props)
val settings = RedisSinkSettings(config)
val writer = new RedisCache(settings)
writer.createClient(settings)
val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings))

val schema = SchemaBuilder.struct().name("com.example.Person")
.field("firstName", Schema.STRING_SCHEMA)
Expand Down Expand Up @@ -190,8 +188,7 @@ class RedisCacheTest

val config = RedisConfig(props)
val settings = RedisSinkSettings(config)
val writer = new RedisCache(settings)
writer.createClient(settings)
val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings))

val schema = SchemaBuilder.struct().name("com.example.Person")
.field("firstName", Schema.STRING_SCHEMA)
Expand Down Expand Up @@ -261,9 +258,8 @@ class RedisCacheTest
val props = base_Props.asJava
val config = RedisConfig(props)
val settings = RedisSinkSettings(config)
val writer = new RedisCache(settings)
val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings))

writer.createClient(settings)
writer.write(Seq(nickRecord))

val key = nick.get("firstName").toString + RedisConfigConstants.REDIS_PK_DELIMITER_DEFAULT_VALUE + nickJr.get(
Expand All @@ -280,8 +276,7 @@ class RedisCacheTest
val props = (base_Props + (RedisConfigConstants.REDIS_PK_DELIMITER -> delimiter)).asJava
val config = RedisConfig(props)
val settings = RedisSinkSettings(config)
val writer = new RedisCache(settings)
writer.createClient(settings)
val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings))

writer.write(Seq(nickRecord))

Expand All @@ -297,8 +292,7 @@ class RedisCacheTest
val props = base_Props.asJava
val config = RedisConfig(props)
val settings = RedisSinkSettings(config)
val writer = new RedisCache(settings)
writer.createClient(settings)
val writer = new RedisCache(settings, JedisClientBuilder.createClient(settings))

writer.write(Seq(nickRecord))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisConnectionInfo
import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings
import com.dimafeng.testcontainers.ForAllTestContainer
import com.dimafeng.testcontainers.GenericContainer
import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
Expand Down Expand Up @@ -42,8 +43,7 @@ class RedisGeoAddTest extends AnyWordSpec with Matchers with MockitoSugar with F
val config = RedisConfig(props)
val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None)
val settings = RedisSinkSettings(config)
val writer = new RedisGeoAdd(settings)
writer.createClient(settings)
val writer = new RedisGeoAdd(settings, JedisClientBuilder.createClient(settings))

val schema = SchemaBuilder.struct().name("com.example.Cpu")
.field("longitude", Schema.STRING_SCHEMA)
Expand Down Expand Up @@ -86,8 +86,7 @@ class RedisGeoAddTest extends AnyWordSpec with Matchers with MockitoSugar with F
val config = RedisConfig(props)
val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None)
val settings = RedisSinkSettings(config)
val writer = new RedisGeoAdd(settings)
writer.createClient(settings)
val writer = new RedisGeoAdd(settings, JedisClientBuilder.createClient(settings))

val schema = SchemaBuilder.struct().name("com.example.Cpu")
.field("longitude", Schema.STRING_SCHEMA)
Expand Down Expand Up @@ -132,8 +131,7 @@ class RedisGeoAddTest extends AnyWordSpec with Matchers with MockitoSugar with F
val config = RedisConfig(props)
val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None)
val settings = RedisSinkSettings(config)
val writer = new RedisGeoAdd(settings)
writer.createClient(settings)
val writer = new RedisGeoAdd(settings, JedisClientBuilder.createClient(settings))

val schema = SchemaBuilder.struct().name("com.example.Cpu")
.field("lng", Schema.STRING_SCHEMA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisConnectionInfo
import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings
import com.dimafeng.testcontainers
import com.dimafeng.testcontainers.ForAllTestContainer
import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
Expand Down Expand Up @@ -57,8 +58,7 @@ class RedisInsertSortedSetTest extends AnyWordSpec with Matchers with MockitoSug
val config = RedisConfig(props)
val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None)
val settings = RedisSinkSettings(config)
val writer = new RedisInsertSortedSet(settings)
writer.createClient(settings)
val writer = new RedisInsertSortedSet(settings, JedisClientBuilder.createClient(settings))

val schema = SchemaBuilder.struct().name("com.example.Cpu")
.field("type", Schema.STRING_SCHEMA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.lenses.streamreactor.connect.redis.sink.writer

import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder
import io.lenses.streamreactor.connect.redis.sink.RedisSinkTask
import io.lenses.streamreactor.connect.redis.sink.config.RedisConfig
import io.lenses.streamreactor.connect.redis.sink.config.RedisConfigConstants
Expand Down Expand Up @@ -61,8 +62,7 @@ class RedisMultipleSortedSetsTest extends AnyWordSpec with Matchers with Mockito

val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None)
val settings = RedisSinkSettings(config)
val writer = new RedisMultipleSortedSets(settings)
writer.createClient(settings)
val writer = new RedisMultipleSortedSets(settings, JedisClientBuilder.createClient(settings))

val schema = SchemaBuilder.struct().name("com.example.device")
.field("sensorID", Schema.STRING_SCHEMA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings
import com.dimafeng.testcontainers.ForAllTestContainer
import com.dimafeng.testcontainers.GenericContainer
import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.redis.sink.JedisClientBuilder
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
Expand Down Expand Up @@ -60,8 +61,7 @@ class RedisPubSubTest extends AnyWordSpec with Matchers with MockitoSugar with L
val config = RedisConfig(props)
val connectionInfo = new RedisConnectionInfo("localhost", container.mappedPort(6379), None)
val settings = RedisSinkSettings(config)
val writer = new RedisPubSub(settings)
writer.createClient(settings)
val writer = new RedisPubSub(settings, JedisClientBuilder.createClient(settings))

val schema = SchemaBuilder.struct().name("com.example.Cpu")
.field("type", Schema.STRING_SCHEMA)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.redis.sink

import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings
import redis.clients.jedis.Jedis

import java.io.File
import java.io.FileNotFoundException

object JedisClientBuilder {
def createClient(sinkSettings: RedisSinkSettings): Jedis = {
val connection = sinkSettings.connectionInfo

if (connection.isSslConnection) {
connection.keyStoreFilepath match {
case Some(path) =>
if (!new File(path).exists) {
throw new FileNotFoundException(s"Keystore not found in: [$path]")
}

System.setProperty("javax.net.ssl.keyStorePassword", connection.keyStorePassword.getOrElse(""))
System.setProperty("javax.net.ssl.keyStore", path)
System.setProperty("javax.net.ssl.keyStoreType", connection.keyStoreType.getOrElse("jceks"))

case None =>
}

connection.trustStoreFilepath match {
case Some(path) =>
if (!new File(path).exists) {
throw new FileNotFoundException(s"Truststore not found in: $path")
}

System.setProperty("javax.net.ssl.trustStorePassword", connection.trustStorePassword.getOrElse(""))
System.setProperty("javax.net.ssl.trustStore", path)
System.setProperty("javax.net.ssl.trustStoreType", connection.trustStoreType.getOrElse("jceks"))

case None =>
}
}

val jedis = new Jedis(connection.host, connection.port, connection.isSslConnection)
connection.password.foreach(p => jedis.auth(p))
jedis
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import io.lenses.streamreactor.connect.redis.sink.config.RedisConfigConstants
import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings
import io.lenses.streamreactor.connect.redis.sink.writer._
import com.typesafe.scalalogging.StrictLogging
import io.lenses.streamreactor.common.sink.DbWriter
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.connect.sink.SinkRecord
Expand All @@ -40,7 +41,7 @@ import scala.jdk.CollectionConverters.ListHasAsScala
* target sink
*/
class RedisSinkTask extends SinkTask with StrictLogging {
var writer: List[RedisWriter] = List[RedisWriter]()
var writer: List[DbWriter] = List[DbWriter]()
private val progressCounter = new ProgressCounter
private var enableProgress: Boolean = false
private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
Expand Down Expand Up @@ -83,35 +84,30 @@ class RedisSinkTask extends SinkTask with StrictLogging {
val mode_GEOADD = filterGeoAddMode(settings)

val mode_STREAM = filterStream(settings)

val jedis = JedisClientBuilder.createClient(settings)
//-- Start as many writers as required
writer = (modeCache.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${modeCache.kcqlSettings.size}] KCQLs with Redis Cache mode")
val writer = new RedisCache(modeCache)
writer.createClient(settings)
val writer = new RedisCache(modeCache, jedis)
List(writer)
} ++ mode_INSERT_SS.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting ${mode_INSERT_SS.kcqlSettings.size}] KCQLs with Redis Insert Sorted Set mode")
val writer = new RedisInsertSortedSet(mode_INSERT_SS)
writer.createClient(settings)
val writer = new RedisInsertSortedSet(mode_INSERT_SS, jedis)
List(writer)
} ++ mode_PUBSUB.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${mode_PUBSUB.kcqlSettings.size}] KCQLs with Redis PubSub mode")
val writer = new RedisPubSub(mode_PUBSUB)
writer.createClient(settings)
val writer = new RedisPubSub(mode_PUBSUB, jedis)
List(writer)
} ++ mode_PK_SS.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${mode_PK_SS.kcqlSettings.size}] KCQLs with Redis Multiple Sorted Sets mode")
val writer = new RedisMultipleSortedSets(mode_PK_SS)
writer.createClient(settings)
val writer = new RedisMultipleSortedSets(mode_PK_SS, jedis)
List(writer)
} ++ mode_GEOADD.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${mode_GEOADD.kcqlSettings.size}] KCQLs with Redis Geo Add mode")
List(new RedisGeoAdd(mode_GEOADD))
List(new RedisGeoAdd(mode_GEOADD, jedis))
} ++ mode_STREAM.kcqlSettings.headOption.map { _ =>
logger.info(s"Starting [${mode_STREAM.kcqlSettings.size}] KCQLs with Redis Stream mode")
val writer = new RedisStreams(mode_STREAM)
writer.createClient(settings)
val writer = new RedisStreams(mode_STREAM, jedis)
List(writer)
}).flatten.toList

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@
*/
package io.lenses.streamreactor.connect.redis.sink.writer

import com.typesafe.scalalogging.StrictLogging
import io.lenses.kcql.Kcql
import io.lenses.streamreactor.common.config.base.settings.Projections
import io.lenses.streamreactor.common.errors.ErrorHandler
import io.lenses.streamreactor.common.schemas.SinkRecordConverterHelper.SinkRecordExtension
import io.lenses.streamreactor.common.schemas.StructHelper
import io.lenses.streamreactor.common.sink.DbWriter
import io.lenses.streamreactor.connect.json.SimpleJsonConverter
import io.lenses.streamreactor.connect.redis.sink.config.RedisKCQLSetting
import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import redis.clients.jedis.Jedis

import scala.jdk.CollectionConverters.ListHasAsScala
import scala.util.Failure
Expand All @@ -40,7 +44,8 @@ import scala.util.Try
* INSERT INTO FX- SELECT price from yahoo-fx PK symbol
* SELECT price from yahoo-fx PK symbol WITHEXTRACT
*/
class RedisCache(sinkSettings: RedisSinkSettings) extends RedisWriter {
class RedisCache(sinkSettings: RedisSinkSettings, jedis: Jedis) extends DbWriter with StrictLogging with ErrorHandler {
initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy)

private lazy val simpleJsonConverter = new SimpleJsonConverter()
val configs: Set[Kcql] = sinkSettings.kcqlSettings.map(_.kcqlConfig)
Expand Down Expand Up @@ -133,4 +138,5 @@ class RedisCache(sinkSettings: RedisSinkSettings) extends RedisWriter {
}
logger.debug(s"Wrote [${sinkRecords.size}] rows for topic [$topic]")
}
override def close(): Unit = jedis.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,32 @@
*/
package io.lenses.streamreactor.connect.redis.sink.writer

import com.typesafe.scalalogging.StrictLogging
import io.lenses.kcql.Kcql
import io.lenses.streamreactor.common.config.base.settings.Projections
import io.lenses.streamreactor.common.errors.ErrorHandler
import io.lenses.streamreactor.common.schemas.SinkRecordConverterHelper.SinkRecordExtension
import io.lenses.streamreactor.common.schemas.StructHelper
import io.lenses.streamreactor.common.sink.DbWriter
import io.lenses.streamreactor.connect.json.SimpleJsonConverter
import io.lenses.streamreactor.connect.redis.sink.config.RedisKCQLSetting
import io.lenses.streamreactor.connect.redis.sink.config.RedisSinkSettings
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import redis.clients.jedis.Jedis

import scala.jdk.CollectionConverters.ListHasAsScala
import scala.util.control.Exception.allCatch
import scala.util.Failure
import scala.util.Success
import scala.util.Try

class RedisGeoAdd(sinkSettings: RedisSinkSettings) extends RedisWriter with GeoAddSupport {
class RedisGeoAdd(sinkSettings: RedisSinkSettings, jedis: Jedis)
extends DbWriter
with StrictLogging
with ErrorHandler
with GeoAddSupport {
initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy)

private lazy val simpleJsonConverter = new SimpleJsonConverter()

Expand Down Expand Up @@ -135,4 +144,6 @@ class RedisGeoAdd(sinkSettings: RedisSinkSettings) extends RedisWriter with GeoA
}

def isDoubleNumber(s: String): Boolean = (allCatch opt s.toDouble).isDefined

override def close(): Unit = jedis.close()
}
Loading

0 comments on commit 65d3aff

Please sign in to comment.