From 69e019ea6cead788294c44d445ece9b75a3d0266 Mon Sep 17 00:00:00 2001 From: stefan bocutiu Date: Thu, 4 Jan 2018 00:05:43 +0000 Subject: [PATCH] handle date fields in the payload. they will translate to epoch time to match influxdb supported datatypes --- .../influx/writers/InfluxBatchPointsBuilder.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxBatchPointsBuilder.scala b/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxBatchPointsBuilder.scala index 75c33521a..af66e4408 100644 --- a/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxBatchPointsBuilder.scala +++ b/kafka-connect-influxdb/src/main/scala/com/datamountaineer/streamreactor/connect/influx/writers/InfluxBatchPointsBuilder.scala @@ -42,7 +42,7 @@ import scala.concurrent.duration.TimeUnit * @param settings - An instance of [[InfluxSettings]] */ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) extends StrictLogging { -// private val nanoClock = new NanoClock() + // private val nanoClock = new NanoClock() //We build a map to cache the field paths. Avoids creating it everytime private val kcqlMap = settings.kcqls.groupBy(_.getSource).map { case (topic, kcqls) => @@ -119,7 +119,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e kcqls.map { k => - var tsUnit :Option[TimeUnit] = None + var tsUnit: Option[TimeUnit] = None val timestamp = k.timestampField.map { implicit fieldPath => val tsRaw = ValuesExtractor.extract(map, fieldPath) TimestampValueCoerce(tsRaw) @@ -173,7 +173,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e kcqls.map { k => - var tsUnit :Option[TimeUnit] = None + var tsUnit: Option[TimeUnit] = None val timestamp = k.timestampField.map { implicit fieldPath => val tsRaw = ValuesExtractor.extract(json, fieldPath) TimestampValueCoerce(tsRaw) @@ -183,7 +183,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e } val measurement = getMeasurement(k) { fieldPath => ValuesExtractor.extract(json, fieldPath) } - implicit val builder = Point.measurement(measurement).time(timestamp,tsUnit.getOrElse(k.kcql.getTimestampUnit)) + implicit val builder = Point.measurement(measurement).time(timestamp, tsUnit.getOrElse(k.kcql.getTimestampUnit)) buildPointFields(k, ValuesExtractor.extract(json, _), @@ -231,7 +231,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e val kcqls = kcqlMap.getOrElse(record.topic(), throw new ConfigException(s"Topic '${record.topic()}' is missing KCQL mapping.")) kcqls.map { k => - var tsUnit :Option[TimeUnit] = None + var tsUnit: Option[TimeUnit] = None val timestamp = k.timestampField.map { implicit fieldPath => val tsRaw = ValuesExtractor.extract(struct, fieldPath) TimestampValueCoerce(tsRaw) @@ -291,6 +291,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e case value: Boolean => builder.addField(field, value) case value: java.math.BigDecimal => builder.addField(field, value) case value: String => builder.addField(field, value) + case value: java.util.Date => builder.addField(field, value.getTime) //we should never reach this since the extractor should not allow it case value => sys.error(s"Can't select field:'$field' because it leads to value:'$value' (${Option(value).map(_.getClass.getName).getOrElse("")})is not a valid type for InfluxDb.") }