Skip to content

Commit

Permalink
out_azure_blob: added blob destination tracking and stickiness
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Alminana <[email protected]>
  • Loading branch information
leonardo-albertovich authored and edsiper committed Oct 10, 2024
1 parent a1c51f6 commit 500ea4a
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 33 deletions.
42 changes: 38 additions & 4 deletions plugins/out_azure_blob/azure_blob.c
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ static int process_blob_chunk(struct flb_azure_blob *ctx, struct flb_event_chunk
continue;
}

ret = azb_db_file_insert(ctx, source, file_path, file_size);
ret = azb_db_file_insert(ctx, source, ctx->endpoint, file_path, file_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot insert blob file into database: %s (size=%lu)",
file_path, file_size);
Expand Down Expand Up @@ -708,6 +708,7 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context
uint64_t file_delivery_attempts;
off_t offset_start;
off_t offset_end;
cfl_sds_t file_destination = NULL;
cfl_sds_t file_path = NULL;
cfl_sds_t part_ids = NULL;
cfl_sds_t source = NULL;
Expand Down Expand Up @@ -876,7 +877,8 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context
&offset_start, &offset_end,
&part_delivery_attempts,
&file_delivery_attempts,
&file_path);
&file_path,
&file_destination);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot get next blob file part");
info->active_upload = FLB_FALSE;
Expand All @@ -891,6 +893,25 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context
/* just continue, the row info was retrieved */
}

if (strcmp(file_destination, ctx->endpoint) != 0) {
flb_plg_info(ctx->ins,
"endpoint change detected, restarting file : %s",
file_path);

info->active_upload = FLB_FALSE;

/* we need to set the aborted state flag to wait for existing uploads
* to finish and then wipe the slate and start again but we don't want
* to increment the failure count in this case.
*/
azb_db_file_set_aborted_state(ctx, file_id, file_path, 1);

cfl_sds_destroy(file_path);
cfl_sds_destroy(file_destination);

flb_sched_timer_cb_coro_return();
}

/* since this is the first part we want to increment the files
* delivery attempt counter.
*/
Expand All @@ -902,20 +923,31 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context
ret = flb_utils_read_file_offset(file_path, offset_start, offset_end, &out_buf, &out_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot read file part %s", file_path);
cfl_sds_destroy(file_path);

info->active_upload = FLB_FALSE;

cfl_sds_destroy(file_path);
cfl_sds_destroy(file_destination);

flb_sched_timer_cb_coro_return();
}

azb_db_file_part_delivery_attempts(ctx, file_id, part_id, ++part_delivery_attempts);

flb_plg_debug(ctx->ins, "sending part file %s (id=%" PRIu64 " part_id=%" PRIu64 ")", file_path, id, part_id);

ret = send_blob(config, NULL, ctx, FLB_EVENT_TYPE_BLOBS,
AZURE_BLOB_BLOCKBLOB, file_path, part_id, NULL, 0, out_buf, out_size);

if (ret == FLB_OK) {
ret = azb_db_file_part_uploaded(ctx, id);

if (ret == -1) {
info->active_upload = FLB_FALSE;

cfl_sds_destroy(file_path);
cfl_sds_destroy(file_destination);

flb_sched_timer_cb_coro_return();
}
}
Expand All @@ -926,14 +958,16 @@ static void cb_azb_blob_file_upload(struct flb_config *config, void *out_context
part_delivery_attempts >= ctx->part_delivery_attempt_limit) {
azb_db_file_set_aborted_state(ctx, file_id, file_path, 1);
}
/* FIXME */
}

info->active_upload = FLB_FALSE;

if (out_buf) {
flb_free(out_buf);
}

cfl_sds_destroy(file_path);
cfl_sds_destroy(file_destination);

flb_sched_timer_cb_coro_return();
}
Expand Down
1 change: 1 addition & 0 deletions plugins/out_azure_blob/azure_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ struct flb_azure_blob {
sqlite3_stmt *stmt_delete_file;
sqlite3_stmt *stmt_abort_file;
sqlite3_stmt *stmt_get_file;
sqlite3_stmt *stmt_update_file_destination;
sqlite3_stmt *stmt_update_file_delivery_attempt_count;
sqlite3_stmt *stmt_set_file_aborted_state;
sqlite3_stmt *stmt_get_next_aborted_file;
Expand Down
73 changes: 67 additions & 6 deletions plugins/out_azure_blob/azure_blob_db.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ static int prepare_stmts(struct flb_sqldb *db, struct flb_azure_blob *ctx)
return -1;
}

/* file destination update */
ret = sqlite3_prepare_v2(db->handler,
SQL_UPDATE_FILE_DESTINATION, -1,
&ctx->stmt_update_file_destination,
NULL);
if (ret != SQLITE_OK) {
flb_plg_error(ctx->ins, "cannot prepare SQL statement: %s",
SQL_UPDATE_FILE_DESTINATION);
return -1;
}

/* delivery attempt counter update */
ret = sqlite3_prepare_v2(db->handler,
SQL_UPDATE_FILE_DELIVERY_ATTEMPT_COUNT, -1,
Expand Down Expand Up @@ -245,6 +256,7 @@ int azb_db_close(struct flb_azure_blob *ctx)
sqlite3_finalize(ctx->stmt_delete_file);
sqlite3_finalize(ctx->stmt_set_file_aborted_state);
sqlite3_finalize(ctx->stmt_get_file);
sqlite3_finalize(ctx->stmt_update_file_destination);
sqlite3_finalize(ctx->stmt_update_file_delivery_attempt_count);
sqlite3_finalize(ctx->stmt_get_next_aborted_file);
sqlite3_finalize(ctx->stmt_get_next_stale_file);
Expand Down Expand Up @@ -295,7 +307,10 @@ int azb_db_file_exists(struct flb_azure_blob *ctx, char *path, uint64_t *id)
}

int64_t azb_db_file_insert(struct flb_azure_blob *ctx,
char *source, char *path, size_t size)
char *source,
char *destination,
char *path,
size_t size)
{
int ret;
int64_t id;
Expand All @@ -308,9 +323,10 @@ int64_t azb_db_file_insert(struct flb_azure_blob *ctx,

/* Bind parameters */
sqlite3_bind_text(ctx->stmt_insert_file, 1, source, -1, 0);
sqlite3_bind_text(ctx->stmt_insert_file, 2, path, -1, 0);
sqlite3_bind_int64(ctx->stmt_insert_file, 3, size);
sqlite3_bind_int64(ctx->stmt_insert_file, 4, created);
sqlite3_bind_text(ctx->stmt_insert_file, 2, destination, -1, 0);
sqlite3_bind_text(ctx->stmt_insert_file, 3, path, -1, 0);
sqlite3_bind_int64(ctx->stmt_insert_file, 4, size);
sqlite3_bind_int64(ctx->stmt_insert_file, 5, created);

/* Run the insert */
ret = sqlite3_step(ctx->stmt_insert_file);
Expand Down Expand Up @@ -404,6 +420,35 @@ int azb_db_file_set_aborted_state(struct flb_azure_blob *ctx,
return 0;
}

int azb_db_file_change_destination(struct flb_azure_blob *ctx, uint64_t id, cfl_sds_t destination)
{
int ret;

azb_db_lock(ctx);

/* Bind parameters */
sqlite3_bind_text(ctx->stmt_update_file_destination, 1, destination, -1, 0);
sqlite3_bind_int64(ctx->stmt_update_file_destination, 2, id);

/* Run the update */
ret = sqlite3_step(ctx->stmt_update_file_destination);

sqlite3_clear_bindings(ctx->stmt_update_file_destination);
sqlite3_reset(ctx->stmt_update_file_destination);

azb_db_unlock(ctx);

if (ret != SQLITE_DONE) {
flb_plg_error(ctx->ins,
"cannot update file destination "
"count for file id=%" PRIu64, id);

return -1;
}

return 0;
}

int azb_db_file_delivery_attempts(struct flb_azure_blob *ctx,
uint64_t id, uint64_t attempts)
{
Expand Down Expand Up @@ -680,11 +725,14 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx,
off_t *offset_start, off_t *offset_end,
uint64_t *part_delivery_attempts,
uint64_t *file_delivery_attempts,
cfl_sds_t *file_path)
cfl_sds_t *file_path,
cfl_sds_t *destination)
{
int ret;
char *tmp = NULL;
char *tmp_destination = NULL;
cfl_sds_t path;
cfl_sds_t local_destination;

if (azb_db_lock(ctx) != 0) {
return -1;
Expand All @@ -703,6 +751,7 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx,
*part_delivery_attempts = sqlite3_column_int64(ctx->stmt_get_next_file_part, 5);
tmp = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 6);
*file_delivery_attempts = sqlite3_column_int64(ctx->stmt_get_next_file_part, 7);
tmp_destination = (char *) sqlite3_column_text(ctx->stmt_get_next_file_part, 8);
}
else if (ret == SQLITE_DONE) {
/* no records */
Expand All @@ -719,11 +768,20 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx,
}

path = cfl_sds_create(tmp);
local_destination = cfl_sds_create(tmp_destination);

sqlite3_clear_bindings(ctx->stmt_get_next_file_part);
sqlite3_reset(ctx->stmt_get_next_file_part);

if (!path) {
if (path == NULL || local_destination == NULL) {
if (path != NULL) {
cfl_sds_destroy(path);
}

if (local_destination != NULL) {
cfl_sds_destroy(local_destination);
}

azb_db_unlock(ctx);
return -1;
}
Expand All @@ -732,11 +790,14 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx,
ret = azb_db_file_part_in_progress(ctx, 1, *id);
if (ret == -1) {
cfl_sds_destroy(path);
cfl_sds_destroy(local_destination);
azb_db_unlock(ctx);
return -1;
}

*file_path = path;
*destination = local_destination;

azb_db_unlock(ctx);

return 1;
Expand Down
62 changes: 39 additions & 23 deletions plugins/out_azure_blob/azure_blob_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"CREATE TABLE IF NOT EXISTS out_azure_blob_files (" \
" id INTEGER PRIMARY KEY," \
" source TEXT NOT NULL," \
" destination TEXT NOT NULL," \
" path TEXT NOT NULL," \
" size INTEGER," \
" created INTEGER," \
Expand All @@ -52,8 +53,8 @@
");"

#define SQL_INSERT_FILE \
"INSERT INTO out_azure_blob_files (source, path, size, created)" \
" VALUES (@source, @path, @size, @created);"
"INSERT INTO out_azure_blob_files (source, destination, path, size, created)" \
" VALUES (@source, @destination, @path, @size, @created);"

/* DELETE a registered file and all it parts */
#define SQL_DELETE_FILE \
Expand All @@ -62,6 +63,9 @@
#define SQL_SET_FILE_ABORTED_STATE \
"UPDATE out_azure_blob_files SET aborted=@state WHERE id=@id;"

#define SQL_UPDATE_FILE_DESTINATION \
"UPDATE out_azure_blob_files SET destination=@destination WHERE id=@id;"

#define SQL_UPDATE_FILE_DELIVERY_ATTEMPT_COUNT \
"UPDATE out_azure_blob_files " \
" SET delivery_attempts=@delivery_attempts, " \
Expand Down Expand Up @@ -121,25 +125,31 @@
" WHERE file_id=@id;"

/* Find the oldest files and retrieve the oldest part ready to be uploaded */
#define SQL_GET_NEXT_FILE_PART \
"SELECT p.id, " \
" p.file_id, " \
" p.part_id, " \
" p.offset_start, " \
" p.offset_end, " \
" p.delivery_attempts, " \
" f.path, " \
" f.delivery_attempts, " \
" f.last_delivery_attempt " \
"FROM out_azure_blob_parts p " \
" JOIN out_azure_blob_files f " \
" ON p.file_id = f.id " \
"WHERE p.uploaded = 0 " \
" AND p.in_progress = 0 " \
" AND f.aborted = 0 " \
"ORDER BY f.created ASC, " \
" p.part_id ASC " \
"LIMIT 1;"
#define SQL_GET_NEXT_FILE_PART \
" SELECT p.id, " \
" p.file_id, " \
" p.part_id, " \
" p.offset_start, " \
" p.offset_end, " \
" p.delivery_attempts, " \
" f.path, " \
" f.delivery_attempts, " \
" f.last_delivery_attempt, " \
" f.destination " \
" FROM out_azure_blob_parts p " \
" JOIN out_azure_blob_files f " \
" ON p.file_id = f.id " \
" WHERE p.uploaded = 0 " \
" AND p.in_progress = 0 " \
" AND f.aborted = 0 " \
" AND (p.part_id = 0 OR " \
" (SELECT sp.uploaded " \
" FROM out_azure_blob_parts sp " \
" WHERE sp.part_id = 0 " \
" AND sp.file_id = p.file_id) = 1) " \
"ORDER BY f.created ASC, " \
" p.part_id ASC " \
" LIMIT 1;"


/*
Expand Down Expand Up @@ -168,14 +178,19 @@ int azb_db_close(struct flb_azure_blob *ctx);
int azb_db_file_exists(struct flb_azure_blob *ctx, char *path, uint64_t *id);

int64_t azb_db_file_insert(struct flb_azure_blob *ctx,
char *source, char *path, size_t size);
char *source,
char *destination,
char *path,
size_t size);

int azb_db_file_delete(struct flb_azure_blob *ctx, uint64_t id, char *path);

int azb_db_file_set_aborted_state(struct flb_azure_blob *ctx,
uint64_t id, char *path,
uint64_t state);

int azb_db_file_change_destination(struct flb_azure_blob *ctx, uint64_t id, cfl_sds_t destination);

int azb_db_file_delivery_attempts(struct flb_azure_blob *ctx, uint64_t id, uint64_t attempts);

int azb_db_file_get_next_aborted(struct flb_azure_blob *ctx,
Expand All @@ -201,7 +216,8 @@ int azb_db_file_part_get_next(struct flb_azure_blob *ctx,
off_t *offset_start, off_t *offset_end,
uint64_t *part_delivery_attempts,
uint64_t *file_delivery_attempts,
cfl_sds_t *file_path);
cfl_sds_t *file_path,
cfl_sds_t *destination);
int azb_db_file_part_uploaded(struct flb_azure_blob *ctx, uint64_t id);
int azb_db_file_part_delivery_attempts(struct flb_azure_blob *ctx,
uint64_t file_id,
Expand Down

0 comments on commit 500ea4a

Please sign in to comment.