Skip to content

Commit

Permalink
out_pgsql: added column tag and time to PostgreSQL table, this will a…
Browse files Browse the repository at this point in the history
…llow users (#2169)

to better partition the table and distribute a big amount of data.

Signed-off-by: Jonathan Gonzalez V <[email protected]>
  • Loading branch information
sxd authored and edsiper committed May 11, 2020
1 parent 5f3aa00 commit 00eff02
Showing 1 changed file with 54 additions and 8 deletions.
62 changes: 54 additions & 8 deletions plugins/out_pgsql/pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ static int cb_pgsql_init(struct flb_output_instance *ins,
flb_info("[out_pgsql] we check that the table %s "
"exists, if not we create it", ctx->db_table);

str_len = 62 + flb_sds_len(ctx->db_table);
str_len = 72 + flb_sds_len(ctx->db_table);

query = flb_malloc(str_len);
if(query == NULL) {
Expand All @@ -181,8 +181,13 @@ static int cb_pgsql_init(struct flb_output_instance *ins,
return -1;
}

snprintf(query, str_len, "CREATE TABLE IF NOT EXISTS %s (data jsonb);",
/* Maybe use the timestamp with the TZ specefied */
/* in the postgresql connection? */
snprintf(query, str_len,
"CREATE TABLE IF NOT EXISTS %s "
"(tag varchar, time timestamp, data jsonb);",
ctx->db_table);
flb_info("[out_pgsql] %s", query);
res = PQexec(ctx->conn, query);
flb_free(query);

Expand Down Expand Up @@ -216,6 +221,7 @@ static void cb_pgsql_flush(const void *data, size_t bytes,
char *tmp = NULL;
PGresult *res = NULL;
char *query = NULL;
flb_sds_t tag_escaped = NULL;
size_t str_len;

if(PQconsumeInput(ctx->conn) == 0 && PQisBusy(ctx->conn) == 1) {
Expand Down Expand Up @@ -247,43 +253,83 @@ static void cb_pgsql_flush(const void *data, size_t bytes,

tmp = PQescapeLiteral(ctx->conn, json, flb_sds_len(json));
flb_sds_destroy(json);

if(!tmp) {
flb_errno();
flb_error("[out_pgsql] Can't escape json string: %s", json);
PQfreemem(tmp);
flb_error("[out_pgsql] Can't escape json string");
FLB_OUTPUT_RETURN(FLB_RETRY);
}

json = flb_sds_create(tmp);
PQfreemem(tmp);
if(!json) {
flb_errno();
FLB_OUTPUT_RETURN(FLB_RETRY);
}

tmp = PQescapeLiteral(ctx->conn, tag, tag_len);
if(!tmp) {
flb_errno();
flb_sds_destroy(json);
PQfreemem(tmp);
flb_error("[out_pgsql] Can't escape tag string: %s", tag);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

tag_escaped = flb_sds_create(tmp);
PQfreemem(tmp);
if(!tag_escaped) {
flb_errno();
flb_sds_destroy(json);
FLB_OUTPUT_RETURN(FLB_RETRY);
}

str_len = 60 + flb_sds_len(json) + flb_sds_len(ctx->db_table);
str_len = 100 + flb_sds_len(json)
+ flb_sds_len(tag_escaped)
+ flb_sds_len(ctx->db_table)
+ flb_sds_len(ctx->timestamp_key);
query = flb_malloc(str_len);

if (query == NULL) {
flb_errno();
flb_sds_destroy(json);
flb_sds_destroy(tag_escaped);
FLB_OUTPUT_RETURN(FLB_RETRY);
}


/*
We should call PQgetResult() until get NULL
This fix some issues in this previous release
in a future release we will provide two modes
sync and async to send data to PostgreSQL
*/
res = PQgetResult(ctx->conn);
while(res != NULL) {
PQclear(res);
res = PQgetResult(ctx->conn);
}

snprintf(query, str_len,
"INSERT INTO %s SELECT * FROM json_array_elements(%s);",
ctx->db_table, json);
"INSERT INTO %s "
"SELECT %s, "
"to_timestamp(CAST(value->>'%s' as FLOAT)), * "
"FROM json_array_elements(%s);",
ctx->db_table, tag_escaped, ctx->timestamp_key, json);

PQsendQuery(ctx->conn, query);
flb_free(query);
flb_sds_destroy(json);
flb_sds_destroy(tag_escaped);

PQflush(ctx->conn);

if(PQisBusy(ctx->conn) == 0) {
res = PQgetResult(ctx->conn);
if(PQresultStatus(res) != PGRES_COMMAND_OK) {
flb_warn("[out_pgsql] %s", PQerrorMessage(ctx->conn));
flb_debug("[out_pgsql] %s", PQerrorMessage(ctx->conn));
PQclear(res);
FLB_OUTPUT_RETURN(FLB_RETRY);
}
PQclear(res);
}
Expand Down

0 comments on commit 00eff02

Please sign in to comment.