Skip to content

Commit

Permalink
otel:op
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouBox committed Jan 8, 2025
1 parent 4c564e0 commit fbc909b
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 6 deletions.
2 changes: 1 addition & 1 deletion plugins/ekuiper/plugin_ekuiper.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ static int ekuiper_plugin_request(neu_plugin_t * plugin,

neu_otel_trace_ctx trace = NULL;
neu_otel_scope_ctx scope = NULL;
if (neu_otel_control_is_started()) {
if (neu_otel_control_is_started() && header->ctx) {
trace = neu_otel_find_trace(header->ctx);
if (trace) {
scope = neu_otel_add_span(trace);
Expand Down
2 changes: 1 addition & 1 deletion plugins/mqtt/mqtt_plugin_intf.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ int mqtt_plugin_request(neu_plugin_t *plugin, neu_reqresp_head_t *head,
neu_otel_trace_ctx trace = NULL;
neu_otel_scope_ctx scope = NULL;
char new_span_id[36] = { 0 };
if (neu_otel_control_is_started()) {
if (neu_otel_control_is_started() && head->ctx) {
trace = neu_otel_find_trace(head->ctx);
if (trace) {
scope = neu_otel_add_span(trace);
Expand Down
2 changes: 1 addition & 1 deletion plugins/restful/rest.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ static int dashb_plugin_request(neu_plugin_t * plugin,

neu_otel_trace_ctx trace = NULL;
neu_otel_scope_ctx scope = NULL;
if (neu_otel_control_is_started()) {
if (neu_otel_control_is_started() && header->ctx) {
trace = neu_otel_find_trace(header->ctx);
if (trace) {
scope = neu_otel_add_span(trace);
Expand Down
19 changes: 16 additions & 3 deletions src/otel/otel_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,16 @@ neu_otel_trace_ctx neu_otel_find_trace(void *req_ctx)
pthread_mutex_unlock(&table_mutex);

if (find) {
return (void *) find->ctx;
if (find->ctx->final &&
find->ctx->span_num ==
find->ctx->trace_data.resource_spans[0]
->scope_spans[0]
->n_spans &&
find->ctx->expected_span_num <= 0) {
return NULL;
} else {
return (void *) find->ctx;
}
} else {
return NULL;
}
Expand Down Expand Up @@ -932,8 +941,10 @@ int neu_otel_trace_pack(neu_otel_trace_ctx ctx, uint8_t *out)

void neu_otel_new_span_id(char *id)
{
int64_t p_id = (int64_t) pthread_self();
for (int i = SPAN_ID_LENGTH - 1; i >= 0; i--) {
id[i] = ID_CHARSET[rand() % 16];
int64_t rand_id = (int64_t) rand() + neu_time_ns() + p_id;
id[i] = ID_CHARSET[rand_id % 16];
}
id[SPAN_ID_LENGTH] = '\0';
}
Expand Down Expand Up @@ -1009,6 +1020,8 @@ static int otel_timer_cb(void *data)
HASH_DEL(traces_table, el);
neu_otel_free_trace(el->ctx);
free(el);
} else {
break;
}
} else if (neu_time_ms() - el->ctx->ts >= TRACE_TIME_OUT) {
nlog_debug("trace:%s time out", (char *) el->ctx->trace_id);
Expand All @@ -1035,7 +1048,7 @@ void neu_otel_start()
param.usr_data = NULL;

param.second = 0;
param.millisecond = 100;
param.millisecond = 80;
param.cb = otel_timer_cb;
param.type = NEU_EVENT_TIMER_BLOCK;

Expand Down

0 comments on commit fbc909b

Please sign in to comment.