Skip to content

Commit

Permalink
feat: successfully write traces into db
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanbohan committed Oct 19, 2023
1 parent 2641a4b commit cdb1bfb
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 7 deletions.
4 changes: 4 additions & 0 deletions src/servers/src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@
pub mod metrics;
pub mod plugin;
pub mod trace;

const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
const GREPTIME_VALUE: &str = "greptime_value";
const GREPTIME_COUNT: &str = "greptime_count";
4 changes: 1 addition & 3 deletions src/servers/src/otlp/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ
use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue};
use opentelemetry_proto::tonic::metrics::v1::{metric, number_data_point, *};

use super::{GREPTIME_COUNT, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};

const GREPTIME_TIMESTAMP: &str = "greptime_timestamp";
const GREPTIME_VALUE: &str = "greptime_value";
const GREPTIME_COUNT: &str = "greptime_count";
/// the default column count for table writer
const APPROXIMATE_COLUMN_COUNT: usize = 8;

Expand Down
14 changes: 14 additions & 0 deletions src/servers/src/otlp/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use api::v1::RowInsertRequests;
Expand Down
226 changes: 222 additions & 4 deletions src/servers/src/otlp/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::RowInsertRequests;
use std::collections::HashMap;

use api::v1::value::ValueData;
use api::v1::{ColumnDataType, RowInsertRequests, Value};
use common_grpc::writer::Precision;
use common_time::time::Time;
use itertools::Itertools;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
use opentelemetry_proto::tonic::common::v1::{
AnyValue, ArrayValue, InstrumentationScope, KeyValue, KeyValueList,
};
use opentelemetry_proto::tonic::trace::v1::span::{Event, Link};
use opentelemetry_proto::tonic::trace::v1::{Span, Status};
use serde_json::json;

use super::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use crate::error::Result;
use crate::row_writer::{self, MultiTableData, TableData};

const APPROXIMATE_COLUMN_COUNT: usize = 16;
const TRACE_TABLE_NAME: &str = "traces_preview";

/// Convert OpenTelemetry traces to GreptimeDB row insert requests
///
Expand All @@ -25,8 +43,208 @@ use crate::error::Result;
///
/// Returns `InsertRequests` and total number of rows to ingest
pub fn to_grpc_insert_requests(
_request: ExportTraceServiceRequest,
request: ExportTraceServiceRequest,
) -> Result<(RowInsertRequests, usize)> {
// TODO:(fys, bobo)
todo!()
let mut multi_table_writer = MultiTableData::default();
let one_table_writer = multi_table_writer.get_or_default_table_data(
TRACE_TABLE_NAME,
APPROXIMATE_COLUMN_COUNT,
APPROXIMATE_COLUMN_COUNT,
);
for resource_spans in request.resource_spans {
let resource_attrs = resource_spans
.resource
.map(|r| r.attributes)
.unwrap_or_default();
for scope_spans in resource_spans.scope_spans {
let scope = scope_spans.scope.unwrap_or_default();
for span in scope_spans.spans {
let row = one_table_writer.alloc_one_row();
write_span(one_table_writer, row, &resource_attrs, &scope, span)?;
}
}
}

Ok(multi_table_writer.into_row_insert_requests())
}

fn write_span(
writer: &mut TableData,
mut row: Vec<Value>,
resource_attrs: &[KeyValue],
scope: &InstrumentationScope,
span: Span,
) -> Result<()> {
write_span_tags(writer, &mut row, &span)?;
write_span_str_fields(writer, &mut row, resource_attrs, scope, &span)?;
write_span_time_fields(writer, &mut row, &span)?;
write_span_value(writer, &mut row, &span)?;
write_span_timestamp(writer, &mut row, &span)?;

writer.add_row(row);
Ok(())
}

fn write_span_tags(writer: &mut TableData, row: &mut Vec<Value>, span: &Span) -> Result<()> {
let iter = vec![
("trace_id", bytes_to_string(&span.trace_id)),
("span_id", bytes_to_string(&span.span_id)),
("parent_span_id", bytes_to_string(&span.parent_span_id)),
]
.into_iter()
.map(|(col, val)| (col.into(), val));

row_writer::write_tags(writer, iter, row)
}

fn bytes_to_string(bs: &[u8]) -> String {
bs.iter().map(|b| format!("{:02x}", b)).join("")
}

fn arr_vals_to_string(arr: &ArrayValue) -> String {
let vs: Vec<String> = arr
.values
.iter()
.filter_map(|val| any_value_to_string(val.clone()))
.collect();

serde_json::to_string(&vs).unwrap_or_else(|_| "[]".into())
}

fn vec_kv_to_string(vec: &[KeyValue]) -> String {
let vs: HashMap<String, String> = vec
.iter()
.map(|kv| {
let val = kv
.value
.clone()
.and_then(any_value_to_string)
.unwrap_or_default();
(kv.key.clone(), val)
})
.collect();

serde_json::to_string(&vs).unwrap_or_else(|_| "{}".into())
}
fn kvlist_to_string(kvlist: &KeyValueList) -> String {
vec_kv_to_string(&kvlist.values)
}

fn any_value_to_string(val: AnyValue) -> Option<String> {
val.value.map(|value| match value {
OtlpValue::StringValue(s) => s,
OtlpValue::BoolValue(b) => b.to_string(),
OtlpValue::IntValue(i) => i.to_string(),
OtlpValue::DoubleValue(d) => d.to_string(),
OtlpValue::ArrayValue(arr) => arr_vals_to_string(&arr),
OtlpValue::KvlistValue(kv) => kvlist_to_string(&kv),
OtlpValue::BytesValue(bs) => bytes_to_string(&bs),
})
}

fn event_to_string(event: &Event) -> String {
json!({
"name": event.name,
"time": Time::new_nanosecond(event.time_unix_nano as i64).to_iso8601_string(),
"attrs": vec_kv_to_string(&event.attributes),
})
.to_string()
}

fn events_to_string(events: &[Event]) -> String {
let v: Vec<String> = events.iter().map(event_to_string).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}

fn link_to_string(link: &Link) -> String {
json!({
"trace_id": link.trace_id,
"span_id": link.span_id,
"trace_state": link.trace_state,
"attributes": vec_kv_to_string(&link.attributes),
})
.to_string()
}

fn links_to_string(links: &[Link]) -> String {
let v: Vec<String> = links.iter().map(link_to_string).collect();
serde_json::to_string(&v).unwrap_or_else(|_| "[]".into())
}

fn status_to_string(status: &Option<Status>) -> (String, String) {
match status {
Some(status) => (status.code().as_str_name().into(), status.message.clone()),
None => ("".into(), "".into()),
}
}

fn write_span_str_fields(
writer: &mut TableData,
row: &mut Vec<Value>,
resource_attrs: &[KeyValue],
scope: &InstrumentationScope,
span: &Span,
) -> Result<()> {
let (code, message) = status_to_string(&span.status);
let iter = vec![
("resource", vec_kv_to_string(resource_attrs)),
("scope_name", scope.name.clone()),
("scope_version", scope.version.clone()),
("scope_attributes", vec_kv_to_string(&scope.attributes)),
("trace_state", span.trace_state.clone()),
("span_name", span.name.clone()),
("span_kind", span.kind().as_str_name().into()),
("span_status_code", code),
("span_status_message", message),
("span_attributes", vec_kv_to_string(&span.attributes)),
("span_events", events_to_string(&span.events)),
("span_links", links_to_string(&span.links)),
]
.into_iter()
.map(|(col, val)| {
(
col.into(),
ColumnDataType::String,
ValueData::StringValue(val),
)
});

row_writer::write_fields(writer, iter, row)
}

fn write_span_time_fields(writer: &mut TableData, row: &mut Vec<Value>, span: &Span) -> Result<()> {
let iter = vec![
("start", span.start_time_unix_nano),
("end", span.end_time_unix_nano),
]
.into_iter()
.map(|(col, val)| {
(
col.into(),
ColumnDataType::TimestampNanosecond,
ValueData::TimestampNanosecondValue(val as i64),
)
});

row_writer::write_fields(writer, iter, row)
}

// duration in milliseconds as the value
fn write_span_value(writer: &mut TableData, row: &mut Vec<Value>, span: &Span) -> Result<()> {
row_writer::write_f64(
writer,
GREPTIME_VALUE,
(span.end_time_unix_nano - span.start_time_unix_nano) as f64 / 1_000_000.0,
row,
)
}

fn write_span_timestamp(writer: &mut TableData, row: &mut Vec<Value>, span: &Span) -> Result<()> {
row_writer::write_ts_precision(
writer,
GREPTIME_TIMESTAMP,
Some(span.start_time_unix_nano as i64),
Precision::Nanosecond,
row,
)
}

0 comments on commit cdb1bfb

Please sign in to comment.