diff --git a/lib/sink/ElasticSinkConnector.js b/lib/sink/ElasticSinkConnector.js index 5bec0ef..6adb8f0 100644 --- a/lib/sink/ElasticSinkConnector.js +++ b/lib/sink/ElasticSinkConnector.js @@ -23,6 +23,7 @@ class ElasticSinkConnector extends SinkConnector { index: this.properties.restSink.index, type: this.properties.restSink.type, mode: this.properties.restSink.mode, + indexTimestampFieldName: this.properties.restSink.indexTimestampFieldName, }; callback(null, taskConfig); diff --git a/lib/sink/ElasticSinkTask.js b/lib/sink/ElasticSinkTask.js index 853c47b..01a8e8a 100644 --- a/lib/sink/ElasticSinkTask.js +++ b/lib/sink/ElasticSinkTask.js @@ -15,6 +15,7 @@ class ElasticSinkTask extends SinkTask { index, type, batchSize, + indexTimestampFieldName, } = this.properties; this.esClient = esClient; @@ -22,6 +23,7 @@ class ElasticSinkTask extends SinkTask { this.index = index; this.type = type; this.batchSize = batchSize; + this.indexTimestampFieldName = indexTimestampFieldName; this.upserts = new Map(); this.deletes = new Set(); @@ -57,6 +59,9 @@ class ElasticSinkTask extends SinkTask { let id = record[this.idProperty]; body.push({ "update": {"_id": id, "_index": this.index}}); delete record[this.idProperty]; + if (this.indexTimestampFieldName && record.doc) { + record.doc[this.indexTimestampFieldName] = (new Date()).getTime(); + } body.push(record); } }