diff --git a/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxDbWriter.scala b/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxDbWriter.scala index a68e4c30e..54c79a5bb 100644 --- a/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxDbWriter.scala +++ b/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxDbWriter.scala @@ -15,14 +15,15 @@ */ package com.datamountaineer.streamreactor.connect.influx.writers +import cats.implicits._ import com.datamountaineer.streamreactor.common.errors.ErrorHandler import com.datamountaineer.streamreactor.common.sink.DbWriter -import com.datamountaineer.streamreactor.connect.influx.config.InfluxSettings import com.datamountaineer.streamreactor.connect.influx.NanoClock import com.datamountaineer.streamreactor.connect.influx.ValidateStringParameterFn +import com.datamountaineer.streamreactor.connect.influx.config.InfluxSettings +import com.influxdb.client.InfluxDBClientFactory import com.typesafe.scalalogging.StrictLogging import org.apache.kafka.connect.sink.SinkRecord -import com.influxdb.client.InfluxDBClientFactory import scala.jdk.CollectionConverters.SeqHasAsJava import scala.util.Try @@ -59,5 +60,10 @@ class InfluxDbWriter(settings: InfluxSettings) extends DbWriter with StrictLoggi ) } - override def close(): Unit = {} + override def close(): Unit = { + Try(influxDB.close()).toEither.leftMap { + exception => logger.error("Error closing influxDB", exception) + } + () + } }