diff --git a/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/writer/RedisWriter.scala b/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/writer/RedisWriter.scala index 13c20a769..913083c99 100755 --- a/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/writer/RedisWriter.scala +++ b/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/writer/RedisWriter.scala @@ -37,32 +37,35 @@ abstract class RedisWriter extends DbWriter with StrictLogging with ConverterUti val connection = sinkSettings.connectionInfo if (connection.isSslConnection) { + val keyStoreFilepath = connection.keyStoreFilepath match { + case Some(path) => + if (!new File(path).exists) { + throw new FileNotFoundException(s"Keystore Certificate not found in: $path") + } - val keyStoreFilepath = connection.keyStoreFilepath.getOrElse("") + System.setProperty("javax.net.ssl.keyStorePassword", connection.keyStorePassword.getOrElse("")) + System.setProperty("javax.net.ssl.keyStore", path) + System.setProperty("javax.net.ssl.keyStoreType", connection.keyStoreType.getOrElse("jceks")) - if (!new File(keyStoreFilepath).exists) { - throw new FileNotFoundException(s"Keystore Certificate not found in: $keyStoreFilepath") + case None => } - val trustStoreFilepath = connection.trustStoreFilepath.getOrElse("") + connection.trustStoreFilepath match { + case Some(path) => + if (!new File(path).exists) { + throw new FileNotFoundException(s"Truststore Certificate not found in: $path") + } - if (!new File(trustStoreFilepath).exists) { - throw new FileNotFoundException(s"Truststore Certificate not found in: $trustStoreFilepath") - } - - System.setProperty("javax.net.ssl.keyStorePassword", connection.keyStorePassword.getOrElse("")) - System.setProperty("javax.net.ssl.keyStore", keyStoreFilepath) - System.setProperty("javax.net.ssl.keyStoreType", connection.keyStoreType.getOrElse("jceks")) + System.setProperty("javax.net.ssl.trustStorePassword", connection.trustStorePassword.getOrElse("")) + System.setProperty("javax.net.ssl.trustStore", path) + System.setProperty("javax.net.ssl.trustStoreType", connection.trustStoreType.getOrElse("jceks")) - System.setProperty("javax.net.ssl.trustStorePassword", connection.trustStorePassword.getOrElse("")) - System.setProperty("javax.net.ssl.trustStore", trustStoreFilepath) - System.setProperty("javax.net.ssl.trustStoreType", connection.trustStoreType.getOrElse("jceks")) + case None => + } } jedis = new Jedis(connection.host, connection.port, connection.isSslConnection) - connection.password.foreach(p => jedis.auth(p)) - //initialize error tracker initialize(sinkSettings.taskRetries, sinkSettings.errorPolicy) }