Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: otlp tracing framework via http #1

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev =
metrics = "0.20"
moka = "0.12"
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics", "traces"] }
parquet = "43.0"
paste = "1.0"
prost = "0.11"
Expand Down
35 changes: 33 additions & 2 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ use metrics::counter;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::otlp;
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;

use crate::instance::Instance;
use crate::metrics::OTLP_METRICS_ROWS;
use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};

#[async_trait]
impl OpenTelemetryProtocolHandler for Instance {
Expand All @@ -40,7 +43,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;
let (requests, rows) = otlp::to_grpc_insert_requests(request)?;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
let _ = self
.handle_row_inserts(requests, ctx)
.await
Expand All @@ -55,4 +58,32 @@ impl OpenTelemetryProtocolHandler for Instance {
};
Ok(resp)
}

async fn traces(
&self,
request: ExportTraceServiceRequest,
ctx: QueryContextRef,
) -> ServerResult<ExportTraceServiceResponse> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;

let (requests, rows) = otlp::traces::to_grpc_insert_requests(request)?;

let _ = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;

counter!(OTLP_TRACES_ROWS, rows as u64);

let resp = ExportTraceServiceResponse {
// TODO(fys): add support for partial_success in future patch
partial_success: None,
};
Ok(resp)
}
}
1 change: 1 addition & 0 deletions src/frontend/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"
pub const PROM_STORE_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples";

pub const OTLP_METRICS_ROWS: &str = "frontend.otlp.metrics.rows";
pub const OTLP_TRACES_ROWS: &str = "frontend.otlp.traces.rows";
1 change: 1 addition & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ impl HttpServer {
fn route_otlp<S>(&self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router<S> {
Router::new()
.route("/v1/metrics", routing::post(otlp::metrics))
.route("/v1/traces", routing::post(otlp::traces))
.with_state(otlp_handler)
}

Expand Down
58 changes: 51 additions & 7 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use hyper::Body;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use prost::Message;
use session::context::QueryContextRef;
use snafu::prelude::*;
Expand All @@ -33,16 +36,19 @@ pub async fn metrics(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(body): RawBody,
) -> Result<OtlpResponse> {
) -> Result<OtlpMetricsResponse> {
let _timer = timer!(
crate::metrics::METRIC_HTTP_OPENTELEMETRY_ELAPSED,
crate::metrics::METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())]
);
let request = parse_body(body).await?;
handler.metrics(request, query_ctx).await.map(OtlpResponse)
let request = parse_metrics_body(body).await?;
handler
.metrics(request, query_ctx)
.await
.map(OtlpMetricsResponse)
}

async fn parse_body(body: Body) -> Result<ExportMetricsServiceRequest> {
async fn parse_metrics_body(body: Body) -> Result<ExportMetricsServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
Expand All @@ -51,9 +57,47 @@ async fn parse_body(body: Body) -> Result<ExportMetricsServiceRequest> {
})
}

pub struct OtlpResponse(ExportMetricsServiceResponse);
pub struct OtlpMetricsResponse(ExportMetricsServiceResponse);

impl IntoResponse for OtlpMetricsResponse {
fn into_response(self) -> axum::response::Response {
(
[(header::CONTENT_TYPE, "application/x-protobuf")],
self.0.encode_to_vec(),
)
.into_response()
}
}

#[axum_macros::debug_handler]
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(body): RawBody,
) -> Result<OtlpTracesResponse> {
let _timer = timer!(
crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED,
&[(crate::metrics::METRIC_DB_LABEL, query_ctx.get_db_string())]
);
let request = parse_traces_body(body).await?;
handler
.traces(request, query_ctx)
.await
.map(OtlpTracesResponse)
}

async fn parse_traces_body(body: Body) -> Result<ExportTraceServiceRequest> {
hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)
.and_then(|buf| {
ExportTraceServiceRequest::decode(&buf[..]).context(error::DecodeOtlpRequestSnafu)
})
}

pub struct OtlpTracesResponse(ExportTraceServiceResponse);

impl IntoResponse for OtlpResponse {
impl IntoResponse for OtlpTracesResponse {
fn into_response(self) -> axum::response::Response {
(
[(header::CONTENT_TYPE, "application/x-protobuf")],
Expand Down
5 changes: 4 additions & 1 deletion src/servers/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub(crate) const METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: &str = "servers.http_influx
pub(crate) const METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: &str =
"servers.http_prometheus_write_elapsed";
pub(crate) const METRIC_HTTP_PROM_STORE_READ_ELAPSED: &str = "servers.http_prometheus_read_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_ELAPSED: &str = "servers.http_otlp_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_METRICS_ELAPSED: &str =
"servers.http_otlp_metrics_elapsed";
pub(crate) const METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED: &str =
"servers.http_otlp_traces_elapsed";
pub(crate) const METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED: &str =
"servers.opentsdb_line_write_elapsed";
pub(crate) const METRIC_HTTP_PROMQL_INSTANT_QUERY_ELAPSED: &str =
Expand Down
Loading
Loading