Skip to content

Commit

Permalink
feat: add API to write OpenTelemetry logs to GreptimeDB
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Sep 22, 2024
1 parent b6b71e8 commit 2e07556
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 22 deletions.
29 changes: 13 additions & 16 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub async fn traces(
}

pub struct PipelineInfo {
pub pipeline_name: String,
pub pipeline_name: Option<String>,
pub pipeline_version: Option<String>,
}

Expand Down Expand Up @@ -124,15 +124,15 @@ where
let pipeline_version = parts.headers.get("X-Pipeline-Version");
match (pipeline_name, pipeline_version) {
(Some(name), Some(version)) => Ok(PipelineInfo {
pipeline_name: pipeline_header_error(name)?,
pipeline_name: Some(pipeline_header_error(name)?),
pipeline_version: Some(pipeline_header_error(version)?),
}),
(None, _) => Err((
StatusCode::BAD_REQUEST,
"`X-Pipeline-Name` header is missing",
)),
(None, _) => Ok(PipelineInfo {
pipeline_name: None,
pipeline_version: None,
}),
(Some(name), None) => Ok(PipelineInfo {
pipeline_name: pipeline_header_error(name)?,
pipeline_name: Some(pipeline_header_error(name)?),
pipeline_version: None,
}),
}
Expand Down Expand Up @@ -175,15 +175,13 @@ pub async fn logs(
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;

let pipeline_way;
if pipeline_info.pipeline_name == "identity" {
pipeline_way = PipelineWay::Identity;
} else {
if let Some(pipeline_name) = &pipeline_info.pipeline_name {
let pipeline_version =
to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| {
error::InvalidParameterSnafu {
Expand All @@ -192,11 +190,7 @@ pub async fn logs(
.build()
})?;
let pipeline = match handler
.get_pipeline(
&pipeline_info.pipeline_name,
pipeline_version,
query_ctx.clone(),
)
.get_pipeline(pipeline_name, pipeline_version, query_ctx.clone())
.await
{
Ok(p) => p,
Expand All @@ -205,7 +199,10 @@ pub async fn logs(
}
};
pipeline_way = PipelineWay::Custom(pipeline);
} else {
pipeline_way = PipelineWay::Identity;
}

handler
.logs(request, pipeline_way, table_info.table_name, query_ctx)
.await
Expand Down
7 changes: 7 additions & 0 deletions src/servers/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ lazy_static! {
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED: HistogramVec =
register_histogram_vec!(
"greptime_servers_http_otlp_logs_elapsed",
"servers http otlp logs elapsed",
&[METRIC_DB_LABEL]
)
.unwrap();
pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!(
"greptime_servers_http_logs_ingestion_counter",
"servers http logs ingestion counter",
Expand Down
59 changes: 53 additions & 6 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use std::io::Write;

use api::prom_store::remote::WriteRequest;
use auth::user_provider_from_option;
use axum::http::{HeaderName, StatusCode};
use axum::http::{HeaderName, HeaderValue, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics;
Expand Down Expand Up @@ -89,6 +90,7 @@ macro_rules! http_tests {

test_otlp_metrics,
test_otlp_traces,
test_otlp_logs,
);
)*
};
Expand Down Expand Up @@ -1429,7 +1431,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) {
let client = TestClient::new(app);

// write metrics data
let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), false).await;
let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), false).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data
Expand All @@ -1441,7 +1443,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);

// write metrics data with gzip
let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), true).await;
let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), true).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data again
Expand All @@ -1466,7 +1468,7 @@ pub async fn test_otlp_traces(store_type: StorageType) {
let client = TestClient::new(app);

// write traces data
let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), false).await;
let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), false).await;
assert_eq!(StatusCode::OK, res.status());

// select traces data
Expand All @@ -1481,7 +1483,7 @@ pub async fn test_otlp_traces(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);

// write metrics data with gzip
let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), true).await;
let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), true).await;
assert_eq!(StatusCode::OK, res.status());

// select metrics data again
Expand All @@ -1490,6 +1492,41 @@ pub async fn test_otlp_traces(store_type: StorageType) {
guard.remove_all().await;
}

pub async fn test_otlp_logs(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await;

let content = r#"
{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/resourceLogs","scopeLogs":[{"scope":{},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/scopeLogs","logRecords":[{"flags":1,"timeUnixNano":1581452773000009875,"observedTimeUnixNano":1581452773000009875,"severityNumber":9,"severityText":"Info","body":{"stringValue":"This is a log message"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":1}}],"droppedAttributesCount":1,"traceId":[48,56,48,52,48,50,48,49,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48],"spanId":[48,49,48,50,48,52,48,56,48,48,48,48,48,48,48,48]},{"flags":1,"timeUnixNano":1581452773000000789,"observedTimeUnixNano":1581452773000000789,"severityNumber":9,"severityText":"Info","body":{"stringValue":"something happened"},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":[48],"spanId":[48]}]}]}]}
"#;

let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();

// handshake
let client = TestClient::new(app);

// write traces data
let res = send_req(
&client,
vec![(
HeaderName::from_static("x-table-name"),
HeaderValue::from_static("logs"),
)],
"/v1/otlp/v1/logs?db=public",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());

// TODO(qtang): we show convert jsonb to json string in http sql API
let expected = r#"[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"null"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"null"]]"#;
validate_data(&client, "select * from logs;", expected).await;

guard.remove_all().await;
}

async fn validate_data(client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())
Expand All @@ -1502,11 +1539,21 @@ async fn validate_data(client: &TestClient, sql: &str, expected: &str) {
assert_eq!(v, expected);
}

async fn send_req(client: &TestClient, path: &str, body: Vec<u8>, with_gzip: bool) -> TestResponse {
async fn send_req(
client: &TestClient,
headers: Vec<(HeaderName, HeaderValue)>,
path: &str,
body: Vec<u8>,
with_gzip: bool,
) -> TestResponse {
let mut req = client
.post(path)
.header("content-type", "application/x-protobuf");

for (k, v) in headers {
req = req.header(k, v);
}

let mut len = body.len();

if with_gzip {
Expand Down

0 comments on commit 2e07556

Please sign in to comment.