From e8f68970514aebdc0c612b9a4a8f05fc6724dd97 Mon Sep 17 00:00:00 2001 From: OmalCooray Date: Mon, 18 Sep 2023 13:40:53 +0530 Subject: [PATCH] fix(influxdb): Modify influxDbWriter to address the memory issue (#986) During JVM profiling, I noticed the memory usage of the influxdb sink connector consistently increasing. Investigation revealed that the influxDbWriter wasn't closing properly, leading to the memory issue. Co-authored-by: Omal --- .../connect/influx/writers/InfluxDbWriter.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxDbWriter.scala b/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxDbWriter.scala index 54c79a5bb..0ff40898e 100644 --- a/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxDbWriter.scala +++ b/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxDbWriter.scala @@ -52,11 +52,14 @@ class InfluxDbWriter(settings: InfluxSettings) extends DbWriter with StrictLoggi .build(records) .flatMap { batchPoints => logger.debug(s"Writing ${batchPoints.length} points to the database...") - for { - writer <- Try(influxDB.makeWriteApi()) - writeRes <- Try(writer.writePoints(batchPoints.asJava)) - } yield writeRes - }.map(_ => logger.debug("Writing complete")), + val writer = influxDB.makeWriteApi() + val writeAttempt = Try { + writer.writePoints(batchPoints.asJava) + } + writer.close() + writeAttempt + } + .map(_ => logger.debug("Writing complete")), ) }