Skip to content

Commit e17679c

Browse files
committed
cloudwatch_logs: remove sequence tokens from API calls
Signed-off-by: Matthew Fala <[email protected]>
1 parent cfca7ca commit e17679c

File tree

4 files changed

+48
-144
lines changed

4 files changed

+48
-144
lines changed

plugins/out_cloudwatch_logs/cloudwatch_api.c

+11-103
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@
5050
#include "cloudwatch_api.h"
5151

5252
#define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException"
53-
#define ERR_CODE_INVALID_SEQUENCE_TOKEN "InvalidSequenceTokenException"
5453
#define ERR_CODE_NOT_FOUND "ResourceNotFoundException"
55-
#define ERR_CODE_DATA_ALREADY_ACCEPTED "DataAlreadyAcceptedException"
5654

5755
#define AMZN_REQUEST_ID_HEADER "x-amzn-RequestId"
5856

@@ -229,23 +227,6 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf,
229227
goto error;
230228
}
231229

232-
if (stream->sequence_token) {
233-
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
234-
"\"sequenceToken\":\"", 17)) {
235-
goto error;
236-
}
237-
238-
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
239-
stream->sequence_token, 0)) {
240-
goto error;
241-
}
242-
243-
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
244-
"\",", 2)) {
245-
goto error;
246-
}
247-
}
248-
249230
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,
250231
"\"logEvents\":[", 13)) {
251232
goto error;
@@ -493,9 +474,6 @@ void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) {
493474
if (buf->current_stream != NULL) {
494475
buf->data_size += strlen(buf->current_stream->name);
495476
buf->data_size += strlen(buf->current_stream->group);
496-
if (buf->current_stream->sequence_token) {
497-
buf->data_size += strlen(buf->current_stream->sequence_token);
498-
}
499477
}
500478
}
501479

@@ -1153,7 +1131,6 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream
11531131
struct flb_aws_client *cw_client;
11541132
flb_sds_t body;
11551133
flb_sds_t tmp;
1156-
flb_sds_t error;
11571134

11581135
flb_plg_info(ctx->ins, "Setting retention policy on log group %s to %dd", stream->group, ctx->log_retention_days);
11591136

@@ -1196,17 +1173,9 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream
11961173

11971174
/* Check error */
11981175
if (c->resp.payload_size > 0) {
1199-
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
1200-
if (error != NULL) {
1201-
/* some other error occurred; notify user */
1202-
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
1203-
"PutRetentionPolicy", ctx->ins);
1204-
flb_sds_destroy(error);
1205-
}
1206-
else {
1207-
/* error can not be parsed, print raw response to debug */
1208-
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
1209-
}
1176+
/* some error occurred; notify user */
1177+
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
1178+
"PutRetentionPolicy", ctx->ins);
12101179
}
12111180
}
12121181

@@ -1287,8 +1256,8 @@ int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream)
12871256
flb_sds_destroy(error);
12881257
}
12891258
else {
1290-
/* error can not be parsed, print raw response to debug */
1291-
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
1259+
/* error can not be parsed, print raw response */
1260+
flb_plg_warn(ctx->ins, "Raw response: %s", c->resp.payload);
12921261
}
12931262
}
12941263
}
@@ -1402,8 +1371,8 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
14021371
flb_sds_destroy(error);
14031372
}
14041373
else {
1405-
/* error can not be parsed, print raw response to debug */
1406-
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
1374+
/* error can not be parsed, print raw response */
1375+
flb_plg_warn(ctx->ins, "Raw response: %s", c->resp.payload);
14071376
}
14081377
}
14091378
}
@@ -1417,8 +1386,7 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
14171386
}
14181387

14191388
/*
1420-
* Returns -1 on failure, 0 on success, and 1 for a sequence token error,
1421-
* which means the caller can retry.
1389+
* Returns -1 on failure, 0 on success
14221390
*/
14231391
int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
14241392
struct log_stream *stream, size_t payload_size)
@@ -1427,7 +1395,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
14271395
struct flb_http_client *c = NULL;
14281396
struct flb_aws_client *cw_client;
14291397
flb_sds_t tmp;
1430-
flb_sds_t error;
14311398
int num_headers = 1;
14321399
int retry = FLB_TRUE;
14331400

@@ -1460,8 +1427,7 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
14601427
if (c->resp.data == NULL || c->resp.data_len == 0 || strstr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) {
14611428
/* code was 200, but response is invalid, treat as failure */
14621429
if (c->resp.data != NULL) {
1463-
flb_plg_debug(ctx->ins, "Could not find sequence token in "
1464-
"response: response body is empty: full data: `%.*s`", c->resp.data_len, c->resp.data);
1430+
flb_plg_debug(ctx->ins, "Invalid response: full data: `%.*s`", c->resp.data_len, c->resp.data);
14651431
}
14661432
flb_http_client_destroy(c);
14671433

@@ -1474,73 +1440,15 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
14741440
AMZN_REQUEST_ID_HEADER);
14751441
return -1;
14761442
}
1477-
1478-
1479-
/* success */
1480-
if (c->resp.payload_size > 0) {
1481-
flb_plg_debug(ctx->ins, "Sent events to %s", stream->name);
1482-
tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
1483-
"nextSequenceToken");
1484-
if (tmp) {
1485-
if (stream->sequence_token != NULL) {
1486-
flb_sds_destroy(stream->sequence_token);
1487-
}
1488-
stream->sequence_token = tmp;
1489-
1490-
flb_http_client_destroy(c);
1491-
return 0;
1492-
}
1493-
else {
1494-
flb_plg_error(ctx->ins, "Could not find sequence token in "
1495-
"response: %s", c->resp.payload);
1496-
}
1497-
}
14981443

14991444
flb_http_client_destroy(c);
15001445
return 0;
15011446
}
15021447

15031448
/* Check error */
15041449
if (c->resp.payload_size > 0) {
1505-
error = flb_aws_error(c->resp.payload, c->resp.payload_size);
1506-
if (error != NULL) {
1507-
if (strcmp(error, ERR_CODE_INVALID_SEQUENCE_TOKEN) == 0) {
1508-
/*
1509-
* This case will happen when we do not know the correct
1510-
* sequence token; we can find it in the error response
1511-
* and retry.
1512-
*/
1513-
flb_plg_debug(ctx->ins, "Sequence token was invalid, "
1514-
"will retry");
1515-
tmp = flb_json_get_val(c->resp.payload, c->resp.payload_size,
1516-
"expectedSequenceToken");
1517-
if (tmp) {
1518-
if (stream->sequence_token != NULL) {
1519-
flb_sds_destroy(stream->sequence_token);
1520-
}
1521-
stream->sequence_token = tmp;
1522-
flb_sds_destroy(error);
1523-
flb_http_client_destroy(c);
1524-
/* tell the caller to retry */
1525-
return 1;
1526-
}
1527-
} else if (strcmp(error, ERR_CODE_DATA_ALREADY_ACCEPTED) == 0) {
1528-
/* not sure what causes this but it counts as success */
1529-
flb_plg_info(ctx->ins, "Got %s, a previous retry must have succeeded asychronously", ERR_CODE_DATA_ALREADY_ACCEPTED);
1530-
flb_sds_destroy(error);
1531-
flb_http_client_destroy(c);
1532-
/* success */
1533-
return 0;
1534-
}
1535-
/* some other error occurred; notify user */
1536-
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
1537-
"PutLogEvents", ctx->ins);
1538-
flb_sds_destroy(error);
1539-
}
1540-
else {
1541-
/* error could not be parsed, print raw response to debug */
1542-
flb_plg_debug(ctx->ins, "Raw response: %s", c->resp.payload);
1543-
}
1450+
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
1451+
"PutLogEvents", ctx->ins);
15441452
}
15451453
}
15461454

plugins/out_cloudwatch_logs/cloudwatch_logs.c

+32-36
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
5656
const char *tmp;
5757
char *session_name = NULL;
5858
struct flb_cloudwatch *ctx = NULL;
59-
struct cw_flush *buf = NULL;
6059
int ret;
6160
flb_sds_t tmp_sds = NULL;
6261
(void) config;
@@ -348,50 +347,53 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins,
348347
flb_output_upstream_set(upstream, ctx->ins);
349348
ctx->cw_client->host = ctx->endpoint;
350349

351-
/* alloc the payload/processing buffer */
350+
/* Export context */
351+
flb_output_set_context(ins, ctx);
352+
353+
return 0;
354+
355+
error:
356+
flb_free(session_name);
357+
flb_plg_error(ctx->ins, "Initialization failed");
358+
flb_cloudwatch_ctx_destroy(ctx);
359+
return -1;
360+
}
361+
362+
struct cw_flush *new_buffer()
363+
{
364+
struct cw_flush *buf;
365+
352366
buf = flb_calloc(1, sizeof(struct cw_flush));
353367
if (!buf) {
354368
flb_errno();
355-
goto error;
369+
return NULL;
356370
}
357371

358372
buf->out_buf = flb_malloc(PUT_LOG_EVENTS_PAYLOAD_SIZE);
359373
if (!buf->out_buf) {
360374
flb_errno();
361375
cw_flush_destroy(buf);
362-
goto error;
376+
return NULL;
363377
}
364378
buf->out_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;
365379

366380
buf->tmp_buf = flb_malloc(sizeof(char) * PUT_LOG_EVENTS_PAYLOAD_SIZE);
367381
if (!buf->tmp_buf) {
368382
flb_errno();
369383
cw_flush_destroy(buf);
370-
goto error;
384+
return NULL;
371385
}
372386
buf->tmp_buf_size = PUT_LOG_EVENTS_PAYLOAD_SIZE;
373387

374388
buf->events = flb_malloc(sizeof(struct cw_event) * MAX_EVENTS_PER_PUT);
375389
if (!buf->events) {
376390
flb_errno();
377391
cw_flush_destroy(buf);
378-
goto error;
392+
return NULL;
379393
}
380394
buf->events_capacity = MAX_EVENTS_PER_PUT;
381395

382-
ctx->buf = buf;
383-
384-
385-
/* Export context */
386-
flb_output_set_context(ins, ctx);
387-
388-
return 0;
389-
390-
error:
391-
flb_free(session_name);
392-
flb_plg_error(ctx->ins, "Initialization failed");
393-
flb_cloudwatch_ctx_destroy(ctx);
394-
return -1;
396+
return buf;
395397
}
396398

397399
static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
@@ -405,15 +407,21 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
405407
(void) i_ins;
406408
(void) config;
407409

408-
event_count = process_and_send(ctx, i_ins->p->name, ctx->buf, event_chunk->tag,
409-
event_chunk->data, event_chunk->size);
410+
struct cw_flush *buf;
411+
412+
buf = new_buffer();
413+
if (!buf) {
414+
FLB_OUTPUT_RETURN(FLB_RETRY);
415+
}
416+
417+
event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size);
410418
if (event_count < 0) {
411419
flb_plg_error(ctx->ins, "Failed to send events");
420+
cw_flush_destroy(buf);
412421
FLB_OUTPUT_RETURN(FLB_RETRY);
413422
}
414423

415-
// TODO: this msg is innaccurate if events are skipped
416-
flb_plg_debug(ctx->ins, "Sent %d events to CloudWatch", event_count);
424+
cw_flush_destroy(buf);
417425

418426
FLB_OUTPUT_RETURN(FLB_OK);
419427
}
@@ -429,10 +437,6 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx)
429437
flb_aws_provider_destroy(ctx->base_aws_provider);
430438
}
431439

432-
if (ctx->buf) {
433-
cw_flush_destroy(ctx->buf);
434-
}
435-
436440
if (ctx->aws_provider) {
437441
flb_aws_provider_destroy(ctx->aws_provider);
438442
}
@@ -496,9 +500,6 @@ void log_stream_destroy(struct log_stream *stream)
496500
if (stream->name) {
497501
flb_sds_destroy(stream->name);
498502
}
499-
if (stream->sequence_token) {
500-
flb_sds_destroy(stream->sequence_token);
501-
}
502503
if (stream->group) {
503504
flb_sds_destroy(stream->group);
504505
}
@@ -657,12 +658,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = {
657658
.cb_init = cb_cloudwatch_init,
658659
.cb_flush = cb_cloudwatch_flush,
659660
.cb_exit = cb_cloudwatch_exit,
660-
661-
/*
662-
* Allow cloudwatch to use async network stack synchronously by opting into
663-
* FLB_OUTPUT_SYNCHRONOUS synchronous task scheduler
664-
*/
665-
.flags = FLB_OUTPUT_SYNCHRONOUS,
661+
.flags = 0,
666662
.workers = 1,
667663

668664
/* Configuration */

plugins/out_cloudwatch_logs/cloudwatch_logs.h

+3-5
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ struct cw_event {
7070
struct log_stream {
7171
flb_sds_t name;
7272
flb_sds_t group;
73-
flb_sds_t sequence_token;
73+
7474
/*
7575
* log streams in CloudWatch do not expire; but our internal representations
7676
* of them are periodically cleaned up if they have been unused for too long
@@ -87,8 +87,6 @@ struct log_stream {
8787
struct mk_list _head;
8888
};
8989

90-
void log_stream_destroy(struct log_stream *stream);
91-
9290
struct flb_cloudwatch {
9391
/*
9492
* TLS instances can not be re-used. So we have one for:
@@ -138,8 +136,6 @@ struct flb_cloudwatch {
138136
/* stores log streams we're putting to */
139137
struct mk_list streams;
140138

141-
/* buffers for data processing and request payload */
142-
struct cw_flush *buf;
143139
/* The namespace to use for the metric */
144140
flb_sds_t metric_namespace;
145141

@@ -155,4 +151,6 @@ struct flb_cloudwatch {
155151

156152
void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);
157153

154+
void log_stream_destroy(struct log_stream *stream);
155+
158156
#endif

src/aws/flb_aws_util.c

+2
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,8 @@ void flb_aws_print_error(char *response, size_t response_len,
581581

582582
error = flb_json_get_val(response, response_len, "__type");
583583
if (!error) {
584+
/* error can not be parsed, print raw response */
585+
flb_plg_warn(ins, "Raw response: %s", response);
584586
return;
585587
}
586588

0 commit comments

Comments
 (0)