diff --git a/kafka-connect-aws-s3/src/main/resources/aws-s3-sink-ascii.txt b/kafka-connect-aws-s3/src/main/resources/aws-s3-sink-ascii.txt new file mode 100644 index 000000000..ce5cf9572 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/resources/aws-s3-sink-ascii.txt @@ -0,0 +1,28 @@ + + ████████▀▀▀▀▀███████████████████████████████████████████████████████████████████ + █████▀ ▀████████████████████████████████████████████████████████████████ + ███▀ ▄█████▄ ▀██████████████████████████████████████████████████████████████ + ███ ▄███████▄ ██████ █████▌ █▌ ████ ███ ▄▄ ██ ███ ▄▄ ███ + ███ █████████ ██████ █████▌ ██████▌ ▀██ ██ ██████ ██████ ███████ + ███ ▀███████▀ ██████ █████▌ ██▌ █▄ █ ███▄▄ ██ ███▄▄ ███ + ████▄ ▄███████ █████▌ ██████▌ ███ ███████ █ ███████████ ██ + █████████ ████████████ ▌ █▌ ████▄ ██▄ ▄██ █▄ ▄███ + █████████ ████████████████████████████████████████████████████████████████████ + █████████ ▄████████████████████████████████████████████████████████████████████ + ████████████████████████████████████████████████████████████████████████████████ + __ _______ _____ ____ + /\ \ / / ____| / ____|___ \ + / \ \ /\ / / (___ | (___ __) | + / /\ \ \/ \/ / \___ \ \___ \ |__ < + / ____ \ /\ / ____) | ____) |___) | + /_/____\_\/ \/ _|_____/ |_____/|____/ + / ____(_) | | + | (___ _ _ __ | | __ + \___ \| | '_ \| |/ / + ____) | | | | | < + |_____/|_|_| |_|_|\_\ _ + / ____| | | + | | ___ _ __ _ __ ___ ___| |_ ___ _ __ + | | / _ \| '_ \| '_ \ / _ \/ __| __/ _ \| '__| + | |___| (_) | | | | | | | __/ (__| || (_) | | + \_____\___/|_| |_|_| |_|\___|\___|\__\___/|_| \ No newline at end of file diff --git a/kafka-connect-aws-s3/src/main/resources/aws-s3-source-ascii.txt b/kafka-connect-aws-s3/src/main/resources/aws-s3-source-ascii.txt new file mode 100644 index 000000000..09e796976 --- /dev/null +++ b/kafka-connect-aws-s3/src/main/resources/aws-s3-source-ascii.txt @@ -0,0 +1,28 @@ + + ████████▀▀▀▀▀███████████████████████████████████████████████████████████████████ + █████▀ ▀████████████████████████████████████████████████████████████████ + ███▀ ▄█████▄ ▀██████████████████████████████████████████████████████████████ + ███ ▄███████▄ ██████ █████▌ █▌ ████ ███ ▄▄ ██ ███ ▄▄ ███ + ███ █████████ ██████ █████▌ ██████▌ ▀██ ██ ██████ ██████ ███████ + ███ ▀███████▀ ██████ █████▌ ██▌ █▄ █ ███▄▄ ██ ███▄▄ ███ + ████▄ ▄███████ █████▌ ██████▌ ███ ███████ █ ███████████ ██ + █████████ ████████████ ▌ █▌ ████▄ ██▄ ▄██ █▄ ▄███ + █████████ ████████████████████████████████████████████████████████████████████ + █████████ ▄████████████████████████████████████████████████████████████████████ + ████████████████████████████████████████████████████████████████████████████████ + __ _______ _____ ____ + /\ \ / / ____| / ____|___ \ + / \ \ /\ / / (___ | (___ __) | + / /\ \ \/ \/ / \___ \ \___ \ |__ < + / ____ \ /\ / ____) | ____) |___) | + /_/____\_\/ \/ |_____/ |_____/|____/ + / ____| + | (___ ___ _ _ _ __ ___ ___ + \___ \ / _ \| | | | '__/ __/ _ \ + ____) | (_) | |_| | | | (_| __/ + |_____/ \___/ \__,_|_| \___\___| _ + / ____| | | + | | ___ _ __ _ __ ___ ___| |_ ___ _ __ + | | / _ \| '_ \| '_ \ / _ \/ __| __/ _ \| '__| + | |___| (_) | | | | | | | __/ (__| || (_) | | + \_____\___/|_| |_|_| |_|\___|\___|\__\___/|_| \ No newline at end of file diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala index 8ffcb0284..8a3055412 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala @@ -18,6 +18,7 @@ package io.lenses.streamreactor.connect.aws.s3.sink import cats.implicits._ import com.datamountaineer.streamreactor.common.errors.ErrorHandler import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import io.lenses.streamreactor.connect.aws.s3.auth.AuthResources import io.lenses.streamreactor.connect.aws.s3.config.S3Config @@ -50,6 +51,9 @@ class S3SinkTask extends SinkTask with ErrorHandler { override def version(): String = manifest.version() override def start(props: util.Map[String, String]): Unit = { + + printAsciiHeader(manifest, "/aws-s3-sink-ascii.txt") + sinkName = getSinkName(props).getOrElse("MissingSinkName") logger.debug(s"[{}] S3SinkTask.start", sinkName) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala index 4173c0278..0a3cafca4 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala @@ -15,6 +15,7 @@ */ package io.lenses.streamreactor.connect.aws.s3.source +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.typesafe.scalalogging.LazyLogging import io.lenses.streamreactor.connect.aws.s3.auth.AuthResources @@ -55,6 +56,9 @@ class S3SourceTask extends SourceTask with LazyLogging { * Start sets up readers for every configured connection in the properties */ override def start(props: util.Map[String, String]): Unit = { + + printAsciiHeader(manifest, "/aws-s3-source-ascii.txt") + sourceName = getSourceName(props).getOrElse("MissingSourceName") logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties") diff --git a/kafka-connect-azure-documentdb/src/main/scala/com/datamountaineer/streamreactor/connect/azure/documentdb/sink/DocumentDbSinkTask.scala b/kafka-connect-azure-documentdb/src/main/scala/com/datamountaineer/streamreactor/connect/azure/documentdb/sink/DocumentDbSinkTask.scala index bd3668b93..8df4b8e66 100644 --- a/kafka-connect-azure-documentdb/src/main/scala/com/datamountaineer/streamreactor/connect/azure/documentdb/sink/DocumentDbSinkTask.scala +++ b/kafka-connect-azure-documentdb/src/main/scala/com/datamountaineer/streamreactor/connect/azure/documentdb/sink/DocumentDbSinkTask.scala @@ -15,6 +15,7 @@ */ package com.datamountaineer.streamreactor.connect.azure.documentdb.sink +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.azure.documentdb.config.DocumentDbConfig @@ -57,10 +58,7 @@ class DocumentDbSinkTask extends SinkTask with StrictLogging { case Success(s) => s } - logger.info(scala.io.Source.fromInputStream( - this.getClass.getResourceAsStream("/documentdb-sink-ascii.txt"), - ).mkString + s" $version") - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/documentdb-sink-ascii.txt") writer = Some(DocumentDbWriter(taskConfig, context)) enableProgress = taskConfig.getBoolean(DocumentDbConfigConstants.PROGRESS_COUNTER_ENABLED) diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/sink/CassandraSinkTask.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/sink/CassandraSinkTask.scala index 233ee9aa7..ffa32eb4e 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/sink/CassandraSinkTask.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/sink/CassandraSinkTask.scala @@ -15,6 +15,7 @@ */ package com.datamountaineer.streamreactor.connect.cassandra.sink +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter @@ -50,10 +51,7 @@ class CassandraSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/cass-sink-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/cass-sink-ascii.txt") val config = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraSourceTask.scala b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraSourceTask.scala index 2e0ba3eaf..bcfd4842d 100644 --- a/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraSourceTask.scala +++ b/kafka-connect-cassandra/src/main/scala/com/datamountaineer/streamreactor/connect/cassandra/source/CassandraSourceTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.cassandra.source import com.datamountaineer.streamreactor.common.queues.QueueHelpers +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import java.util @@ -62,11 +63,7 @@ class CassandraSourceTask extends SourceTask with StrictLogging { * @param props A map of supplied properties. */ override def start(props: util.Map[String, String]): Unit = { - - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/cass-source-ascii.txt")).mkString + version, - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/cass-source-ascii.txt") val config = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-common/src/main/scala/com/datamountaineer/streamreactor/common/utils/AsciiArtPrinter.scala b/kafka-connect-common/src/main/scala/com/datamountaineer/streamreactor/common/utils/AsciiArtPrinter.scala new file mode 100644 index 000000000..d627d6882 --- /dev/null +++ b/kafka-connect-common/src/main/scala/com/datamountaineer/streamreactor/common/utils/AsciiArtPrinter.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datamountaineer.streamreactor.common.utils + +import com.typesafe.scalalogging.LazyLogging + +import java.nio.charset.CodingErrorAction +import scala.io.Codec +import scala.io.Source + +object AsciiArtPrinter extends LazyLogging { + + def printAsciiHeader(manifest: JarManifest, asciiArtResource: String): Unit = { + implicit val codec: Codec = Codec("UTF-8") + codec.onMalformedInput(CodingErrorAction.REPLACE) + codec.onUnmappableCharacter(CodingErrorAction.REPLACE) + logger.info( + Source.fromInputStream( + getClass.getResourceAsStream(asciiArtResource), + ).mkString + s" ${manifest.version()}", + ) + logger.info(manifest.printManifest()) + } +} diff --git a/kafka-connect-elastic6/src/main/scala/com/datamountaineer/streamreactor/connect/elastic6/ElasticSinkTask.scala b/kafka-connect-elastic6/src/main/scala/com/datamountaineer/streamreactor/connect/elastic6/ElasticSinkTask.scala index b97d38b51..a473cc605 100644 --- a/kafka-connect-elastic6/src/main/scala/com/datamountaineer/streamreactor/connect/elastic6/ElasticSinkTask.scala +++ b/kafka-connect-elastic6/src/main/scala/com/datamountaineer/streamreactor/connect/elastic6/ElasticSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.elastic6 import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.elastic6.config.ElasticConfig @@ -40,10 +41,7 @@ class ElasticSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/elastic-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/elastic-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-elastic7/src/main/scala/com/datamountaineer/streamreactor/connect/elastic7/ElasticSinkTask.scala b/kafka-connect-elastic7/src/main/scala/com/datamountaineer/streamreactor/connect/elastic7/ElasticSinkTask.scala index 22cdd4142..da8be15ef 100644 --- a/kafka-connect-elastic7/src/main/scala/com/datamountaineer/streamreactor/connect/elastic7/ElasticSinkTask.scala +++ b/kafka-connect-elastic7/src/main/scala/com/datamountaineer/streamreactor/connect/elastic7/ElasticSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.elastic7 import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.elastic7.config.ElasticConfig @@ -40,10 +41,7 @@ class ElasticSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/elastic-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/elastic-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/FtpSourceConnector.scala b/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/FtpSourceConnector.scala index 9e9830510..b3b2b8be4 100644 --- a/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/FtpSourceConnector.scala +++ b/kafka-connect-ftp/src/main/scala/com/datamountaineer/streamreactor/connect/ftp/source/FtpSourceConnector.scala @@ -15,6 +15,7 @@ */ package com.datamountaineer.streamreactor.connect.ftp.source +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.typesafe.scalalogging.StrictLogging import org.apache.kafka.connect.connector.Task @@ -44,9 +45,9 @@ class FtpSourceConnector extends SourceConnector with StrictLogging { logger.info("stop") override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/ftp-source-ascii.txt")).mkString + s" $version", - ) + + printAsciiHeader(manifest, "/ftp-source-ascii.txt") + logger.info(s"start FtpSourceConnector") configProps = Some(props) diff --git a/kafka-connect-hazelcast/src/main/scala/com/datamountaineer/streamreactor/connect/hazelcast/sink/HazelCastSinkTask.scala b/kafka-connect-hazelcast/src/main/scala/com/datamountaineer/streamreactor/connect/hazelcast/sink/HazelCastSinkTask.scala index 02b32a381..de5252dbf 100644 --- a/kafka-connect-hazelcast/src/main/scala/com/datamountaineer/streamreactor/connect/hazelcast/sink/HazelCastSinkTask.scala +++ b/kafka-connect-hazelcast/src/main/scala/com/datamountaineer/streamreactor/connect/hazelcast/sink/HazelCastSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.hazelcast.sink import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.hazelcast.config.HazelCastSinkConfig @@ -45,10 +46,7 @@ class HazelCastSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/hazelcast-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/hazelcast-ascii.txt") if (Option(System.getProperty("hazelcast.logging.type")).isEmpty) { System.setProperty("hazelcast.logging.type", "slf4j") diff --git a/kafka-connect-hbase/src/main/scala/com/datamountaineer/streamreactor/connect/hbase/HbaseSinkTask.scala b/kafka-connect-hbase/src/main/scala/com/datamountaineer/streamreactor/connect/hbase/HbaseSinkTask.scala index 7095738bb..a894213d6 100644 --- a/kafka-connect-hbase/src/main/scala/com/datamountaineer/streamreactor/connect/hbase/HbaseSinkTask.scala +++ b/kafka-connect-hbase/src/main/scala/com/datamountaineer/streamreactor/connect/hbase/HbaseSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.hbase import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.hbase.config.ConfigurationBuilder @@ -53,8 +54,9 @@ class HbaseSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info(scala.io.Source.fromInputStream(getClass.getResourceAsStream("/hbase-ascii.txt")).mkString) - logger.info(manifest.printManifest()) + + printAsciiHeader(manifest, "/hbase-ascii.txt") + val conf = if (context.configs().isEmpty) props else context.configs() HBaseConfig.config.parse(conf) diff --git a/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/InfluxSinkTask.scala b/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/InfluxSinkTask.scala index 3441caaba..0b126e552 100644 --- a/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/InfluxSinkTask.scala +++ b/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/InfluxSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.influx import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.influx.config.InfluxConfig @@ -48,10 +49,7 @@ class InfluxSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/influx-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/influx-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-jms/src/main/scala/com/datamountaineer/streamreactor/connect/jms/sink/JMSSinkTask.scala b/kafka-connect-jms/src/main/scala/com/datamountaineer/streamreactor/connect/jms/sink/JMSSinkTask.scala index 354c964d7..61d52166d 100644 --- a/kafka-connect-jms/src/main/scala/com/datamountaineer/streamreactor/connect/jms/sink/JMSSinkTask.scala +++ b/kafka-connect-jms/src/main/scala/com/datamountaineer/streamreactor/connect/jms/sink/JMSSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.jms.sink import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.jms.config.JMSConfig @@ -47,10 +48,7 @@ class JMSSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/jms-sink-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/jms-sink-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() JMSConfig.config.parse(conf) diff --git a/kafka-connect-jms/src/main/scala/com/datamountaineer/streamreactor/connect/jms/source/JMSSourceTask.scala b/kafka-connect-jms/src/main/scala/com/datamountaineer/streamreactor/connect/jms/source/JMSSourceTask.scala index 9395155d8..9cf2daad7 100644 --- a/kafka-connect-jms/src/main/scala/com/datamountaineer/streamreactor/connect/jms/source/JMSSourceTask.scala +++ b/kafka-connect-jms/src/main/scala/com/datamountaineer/streamreactor/connect/jms/source/JMSSourceTask.scala @@ -15,6 +15,7 @@ */ package com.datamountaineer.streamreactor.connect.jms.source +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.jms.config.JMSConfig @@ -55,10 +56,7 @@ class JMSSourceTask extends SourceTask with StrictLogging { private var evictThreshold: Int = 0 override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/jms-source-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/jms-source-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-kudu/src/main/scala/com/datamountaineer/streamreactor/connect/kudu/sink/KuduSinkTask.scala b/kafka-connect-kudu/src/main/scala/com/datamountaineer/streamreactor/connect/kudu/sink/KuduSinkTask.scala index daf6e1259..80b77ae51 100644 --- a/kafka-connect-kudu/src/main/scala/com/datamountaineer/streamreactor/connect/kudu/sink/KuduSinkTask.scala +++ b/kafka-connect-kudu/src/main/scala/com/datamountaineer/streamreactor/connect/kudu/sink/KuduSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.kudu.sink import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.kudu.config.KuduConfig @@ -44,10 +45,7 @@ class KuduSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/kudu-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/kudu-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() KuduConfig.config.parse(conf) diff --git a/kafka-connect-mongodb/src/main/scala/com/datamountaineer/streamreactor/connect/mongodb/sink/MongoSinkTask.scala b/kafka-connect-mongodb/src/main/scala/com/datamountaineer/streamreactor/connect/mongodb/sink/MongoSinkTask.scala index 187b6bcb5..ca9049ff2 100644 --- a/kafka-connect-mongodb/src/main/scala/com/datamountaineer/streamreactor/connect/mongodb/sink/MongoSinkTask.scala +++ b/kafka-connect-mongodb/src/main/scala/com/datamountaineer/streamreactor/connect/mongodb/sink/MongoSinkTask.scala @@ -15,6 +15,7 @@ */ package com.datamountaineer.streamreactor.connect.mongodb.sink +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.mongodb.config.MongoConfig @@ -54,10 +55,7 @@ class MongoSinkTask extends SinkTask with StrictLogging { val conf = if (context.configs().isEmpty) props else context.configs() - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/mongo-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/mongo-ascii.txt") val taskConfig = Try(MongoConfig(conf)) match { case Failure(f) => throw new ConnectException("Couldn't start Mongo Sink due to configuration error.", f) diff --git a/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/sink/MqttSinkTask.scala b/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/sink/MqttSinkTask.scala index 5b06a7770..74ffdafde 100644 --- a/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/sink/MqttSinkTask.scala +++ b/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/sink/MqttSinkTask.scala @@ -17,6 +17,7 @@ package com.datamountaineer.streamreactor.connect.mqtt.sink import com.datamountaineer.streamreactor.common.converters.sink.Converter import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.mqtt.config.MqttConfigConstants @@ -47,10 +48,7 @@ class MqttSinkTask extends SinkTask with StrictLogging { private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/mqtt-sink-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/mqtt-sink-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/source/MqttSourceTask.scala b/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/source/MqttSourceTask.scala index e54719f04..46f13fb4f 100644 --- a/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/source/MqttSourceTask.scala +++ b/kafka-connect-mqtt/src/main/scala/com/datamountaineer/streamreactor/connect/mqtt/source/MqttSourceTask.scala @@ -15,21 +15,21 @@ */ package com.datamountaineer.streamreactor.connect.mqtt.source -import com.datamountaineer.streamreactor.connect.converters.source.Converter import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter - -import java.io.File -import java.util +import com.datamountaineer.streamreactor.connect.converters.source.Converter import com.datamountaineer.streamreactor.connect.mqtt.config.MqttConfigConstants import com.datamountaineer.streamreactor.connect.mqtt.config.MqttSourceConfig import com.datamountaineer.streamreactor.connect.mqtt.config.MqttSourceSettings import com.datamountaineer.streamreactor.connect.mqtt.connection.MqttClientConnectionFn +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.typesafe.scalalogging.StrictLogging import org.apache.kafka.common.config.ConfigException import org.apache.kafka.connect.source.SourceRecord import org.apache.kafka.connect.source.SourceTask +import java.io.File +import java.util import scala.jdk.CollectionConverters.ListHasAsScala import scala.jdk.CollectionConverters.MapHasAsScala import scala.util.Failure @@ -43,11 +43,7 @@ class MqttSourceTask extends SourceTask with StrictLogging { private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation) override def start(props: util.Map[String, String]): Unit = { - - logger.info(scala.io.Source.fromInputStream( - this.getClass.getResourceAsStream("/mqtt-source-ascii.txt"), - ).mkString + s" $version") - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/mqtt-source-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-pulsar/src/main/scala/com/datamountaineer/streamreactor/connect/pulsar/sink/PulsarSinkTask.scala b/kafka-connect-pulsar/src/main/scala/com/datamountaineer/streamreactor/connect/pulsar/sink/PulsarSinkTask.scala index 80bc259fd..c86fc21a8 100644 --- a/kafka-connect-pulsar/src/main/scala/com/datamountaineer/streamreactor/connect/pulsar/sink/PulsarSinkTask.scala +++ b/kafka-connect-pulsar/src/main/scala/com/datamountaineer/streamreactor/connect/pulsar/sink/PulsarSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.pulsar.sink import com.datamountaineer.streamreactor.common.errors.ErrorPolicyEnum +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.pulsar.config.PulsarConfigConstants @@ -44,10 +45,7 @@ class PulsarSinkTask extends SinkTask with StrictLogging { private var settings: Option[PulsarSinkSettings] = None override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/pulsar-sink-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/pulsar-sink-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-pulsar/src/main/scala/com/datamountaineer/streamreactor/connect/pulsar/source/PulsarSourceTask.scala b/kafka-connect-pulsar/src/main/scala/com/datamountaineer/streamreactor/connect/pulsar/source/PulsarSourceTask.scala index 8599e05ad..b3bfce98b 100644 --- a/kafka-connect-pulsar/src/main/scala/com/datamountaineer/streamreactor/connect/pulsar/source/PulsarSourceTask.scala +++ b/kafka-connect-pulsar/src/main/scala/com/datamountaineer/streamreactor/connect/pulsar/source/PulsarSourceTask.scala @@ -15,6 +15,7 @@ */ package com.datamountaineer.streamreactor.connect.pulsar.source +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.converters.source.Converter @@ -46,11 +47,7 @@ class PulsarSourceTask extends SourceTask with StrictLogging { @nowarn override def start(props: util.Map[String, String]): Unit = { - - logger.info(scala.io.Source.fromInputStream( - this.getClass.getResourceAsStream("/pulsar-source-ascii.txt"), - ).mkString + s" $version") - logger.info(manifest.printManifest()) + printAsciiHeader(manifest, "/pulsar-source-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs() diff --git a/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/RedisSinkTask.scala b/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/RedisSinkTask.scala index b73ba7a7f..2337c77df 100644 --- a/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/RedisSinkTask.scala +++ b/kafka-connect-redis/src/main/scala/com/datamountaineer/streamreactor/connect/redis/sink/RedisSinkTask.scala @@ -16,6 +16,7 @@ package com.datamountaineer.streamreactor.connect.redis.sink import com.datamountaineer.streamreactor.common.errors.RetryErrorPolicy +import com.datamountaineer.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader import com.datamountaineer.streamreactor.common.utils.JarManifest import com.datamountaineer.streamreactor.common.utils.ProgressCounter import com.datamountaineer.streamreactor.connect.redis.sink.config.RedisConfig @@ -48,10 +49,8 @@ class RedisSinkTask extends SinkTask with StrictLogging { * Parse the configurations and setup the writer */ override def start(props: util.Map[String, String]): Unit = { - logger.info( - scala.io.Source.fromInputStream(getClass.getResourceAsStream("/redis-ascii.txt")).mkString + s" $version", - ) - logger.info(manifest.printManifest()) + + printAsciiHeader(manifest, "/redis-ascii.txt") val conf = if (context.configs().isEmpty) props else context.configs()