Skip to content

Commit

Permalink
Evaluate the "duplicates" parameter for influxdb api (like the "volks…
Browse files Browse the repository at this point in the history
…zaehler" api already does)
  • Loading branch information
Pascal Bihler committed Sep 13, 2023
1 parent 76e868f commit 46cd825
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 15 deletions.
5 changes: 5 additions & 0 deletions include/api/InfluxDB.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class InfluxDB : public ApiIF {
std::list<Reading> _values;
CurlResponse::Ptr _response;

// Volatile
int64_t _last_timestamp; /**< remember last timestamp */
// duplicate support:
Reading *_lastReadingSent;

typedef struct {
CURL *curl;
struct curl_slist *headers;
Expand Down
75 changes: 60 additions & 15 deletions src/api/InfluxDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
extern Config_Options options;

vz::api::InfluxDB::InfluxDB(const Channel::Ptr &ch, const std::list<Option> &pOptions)
: ApiIF(ch), _response(new vz::api::CurlResponse()) {
: ApiIF(ch), _response(new vz::api::CurlResponse()), _last_timestamp(0), _lastReadingSent(0) {
OptionList optlist;
print(log_debug, "InfluxDB API initialize", ch->name());

Expand Down Expand Up @@ -278,6 +278,10 @@ void vz::api::InfluxDB::send() {
print(log_debug, "cleaned buffer, now %i items", channel()->name(), buf->size());
}

int64_t timestamp = 1;
const int duplicates = channel()->duplicates();
const int duplicates_ms = duplicates * 1000;

// build request body from buffer contents
buf->lock();
for (it = buf->begin(); it != buf->end(); it++) {
Expand All @@ -286,25 +290,66 @@ void vz::api::InfluxDB::send() {
channel()->name());
break;
}

bool sendData = false;

timestamp = it->time_ms();

print(log_finest, "Reading buffer: timestamp %lld value %f", channel()->name(),
it->time_ms(), it->value());
request_body.append(_measurement_name);
if (_send_uuid) {
request_body.append(",uuid=");
request_body.append(channel()->uuid());

print(log_debug, "compare: %lld %lld", channel()->name(), _last_timestamp, timestamp);
// we can only add/consider a timestamp if the ms resolution is not before than from
// previous one:
if (_last_timestamp <= timestamp) {
if (0 == duplicates) { // send all values
sendData = true;
_last_timestamp = timestamp;
} else {
const Reading &r = *it;
// duplicates should be ignored
// but send at least each <duplicates> seconds

if (!_lastReadingSent) { // first one from the duplicate consideration -> send it
sendData = true;
_lastReadingSent = new Reading(r);
_last_timestamp = timestamp;
} else { // one reading sent already. compare
// a) timestamp
// b) duplicate value
if ((timestamp >= (_last_timestamp + duplicates_ms)) ||
(r.value() != _lastReadingSent->value())) {
// send the current one:
sendData = true;
_last_timestamp = timestamp;
*_lastReadingSent = r;
} else {
// ignore it
}
}
}
}
if (!_tags.empty()) {
request_body.append(",");
request_body.append(_tags);

if (sendData) {
request_body.append(_measurement_name);
if (_send_uuid) {
request_body.append(",uuid=");
request_body.append(channel()->uuid());
}
if (!_tags.empty()) {
request_body.append(",");
request_body.append(_tags);
}
std::stringstream value_str;
value_str << " value=" << std::fixed << std::setprecision(6) << it->value();
request_body.append(value_str.str());
request_body.append(" ");
request_body.append(std::to_string(timestamp));
request_body.append("\n"); // each measurement on new line
request_body_lines++;
}
std::stringstream value_str;
value_str << " value=" << std::fixed << std::setprecision(6) << it->value();
request_body.append(value_str.str());
request_body.append(" ");
request_body.append(std::to_string(it->time_ms()));
request_body.append("\n"); // each measurement on new line

it->mark_delete();
request_body_lines++;
}

buf->unlock();
Expand Down

0 comments on commit 46cd825

Please sign in to comment.