Skip to content

Commit

Permalink
handle date fields in the payload. they will translate to epoch time …
Browse files Browse the repository at this point in the history
…to match influxdb supported datatypes
  • Loading branch information
stheppi committed Jan 4, 2018
1 parent 0a2e57d commit 69e019e
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import scala.concurrent.duration.TimeUnit
* @param settings - An instance of [[InfluxSettings]]
*/
class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) extends StrictLogging {
// private val nanoClock = new NanoClock()
// private val nanoClock = new NanoClock()

//We build a map to cache the field paths. Avoids creating it everytime
private val kcqlMap = settings.kcqls.groupBy(_.getSource).map { case (topic, kcqls) =>
Expand Down Expand Up @@ -119,7 +119,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e

kcqls.map { k =>

var tsUnit :Option[TimeUnit] = None
var tsUnit: Option[TimeUnit] = None
val timestamp = k.timestampField.map { implicit fieldPath =>
val tsRaw = ValuesExtractor.extract(map, fieldPath)
TimestampValueCoerce(tsRaw)
Expand Down Expand Up @@ -173,7 +173,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e

kcqls.map { k =>

var tsUnit :Option[TimeUnit] = None
var tsUnit: Option[TimeUnit] = None
val timestamp = k.timestampField.map { implicit fieldPath =>
val tsRaw = ValuesExtractor.extract(json, fieldPath)
TimestampValueCoerce(tsRaw)
Expand All @@ -183,7 +183,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e
}

val measurement = getMeasurement(k) { fieldPath => ValuesExtractor.extract(json, fieldPath) }
implicit val builder = Point.measurement(measurement).time(timestamp,tsUnit.getOrElse(k.kcql.getTimestampUnit))
implicit val builder = Point.measurement(measurement).time(timestamp, tsUnit.getOrElse(k.kcql.getTimestampUnit))

buildPointFields(k,
ValuesExtractor.extract(json, _),
Expand Down Expand Up @@ -231,7 +231,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e
val kcqls = kcqlMap.getOrElse(record.topic(), throw new ConfigException(s"Topic '${record.topic()}' is missing KCQL mapping."))

kcqls.map { k =>
var tsUnit :Option[TimeUnit] = None
var tsUnit: Option[TimeUnit] = None
val timestamp = k.timestampField.map { implicit fieldPath =>
val tsRaw = ValuesExtractor.extract(struct, fieldPath)
TimestampValueCoerce(tsRaw)
Expand Down Expand Up @@ -291,6 +291,7 @@ class InfluxBatchPointsBuilder(settings: InfluxSettings, nanoClock: NanoClock) e
case value: Boolean => builder.addField(field, value)
case value: java.math.BigDecimal => builder.addField(field, value)
case value: String => builder.addField(field, value)
case value: java.util.Date => builder.addField(field, value.getTime)
//we should never reach this since the extractor should not allow it
case value => sys.error(s"Can't select field:'$field' because it leads to value:'$value' (${Option(value).map(_.getClass.getName).getOrElse("")})is not a valid type for InfluxDb.")
}
Expand Down

0 comments on commit 69e019e

Please sign in to comment.