Skip to content

Commit

Permalink
improve attributes for opentelemetry + add host to http metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
GlenDC committed Sep 12, 2024
1 parent 97b908b commit d394317
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 26 deletions.
45 changes: 32 additions & 13 deletions rama-http/src/layer/opentelemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const HTTP_SERVER_TOTAL_REQUESTS: &str = "http.requests.total";
const HTTP_SERVER_TOTAL_FAILURES: &str = "http.failures.total";
const HTTP_SERVER_TOTAL_RESPONSES: &str = "http.responses.total";

const HTTP_REQUEST_HOST: &str = "http.request.host";

/// Records http server metrics
///
/// See the [spec] for details.
Expand Down Expand Up @@ -96,13 +98,15 @@ impl Metrics {
/// A layer that records http server metrics using OpenTelemetry.
pub struct RequestMetricsLayer<F = ()> {
metrics: Arc<Metrics>,
base_attributes: Vec<KeyValue>,
attributes_factory: F,
}

impl<F: fmt::Debug> fmt::Debug for RequestMetricsLayer<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("RequestMetricsLayer")
.field("metrics", &self.metrics)
.field("base_attributes", &self.base_attributes)
.field("attributes_factory", &self.attributes_factory)
.finish()
}
Expand All @@ -112,6 +116,7 @@ impl<F: Clone> Clone for RequestMetricsLayer<F> {
fn clone(&self) -> Self {
RequestMetricsLayer {
metrics: self.metrics.clone(),
base_attributes: self.base_attributes.clone(),
attributes_factory: self.attributes_factory.clone(),
}
}
Expand All @@ -127,16 +132,21 @@ impl RequestMetricsLayer<()> {
/// Create a new [`RequestMetricsLayer`] using the global [`Meter`] provider,
/// with a custom name and version.
pub fn custom(opts: MeterOptions) -> Self {
let meter = get_versioned_meter(
opts.service.unwrap_or_else(|| ServiceInfo {
name: rama_utils::info::NAME.to_owned(),
version: rama_utils::info::VERSION.to_owned(),
}),
opts.attributes,
);
let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
name: rama_utils::info::NAME.to_owned(),
version: rama_utils::info::VERSION.to_owned(),
});

let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));

let meter = get_versioned_meter(service_info);
let metrics = Metrics::new(meter, opts.metric_prefix);

Self {
metrics: Arc::new(metrics),
base_attributes: attributes,
attributes_factory: (),
}
}
Expand All @@ -146,6 +156,7 @@ impl RequestMetricsLayer<()> {
pub fn with_attributes<F>(self, attributes: F) -> RequestMetricsLayer<F> {
RequestMetricsLayer {
metrics: self.metrics,
base_attributes: self.base_attributes,
attributes_factory: attributes,
}
}
Expand All @@ -157,15 +168,12 @@ impl Default for RequestMetricsLayer {
}
}

fn get_versioned_meter(service_info: ServiceInfo, attributes: Option<Vec<KeyValue>>) -> Meter {
let mut attributes = attributes.unwrap_or_else(|| Vec::with_capacity(2));
attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
fn get_versioned_meter(service_info: ServiceInfo) -> Meter {
global::meter_with_version(
service_info.name,
Some(service_info.version),
Some(semantic_conventions::SCHEMA_URL),
Some(attributes),
None,
)
}

Expand All @@ -176,6 +184,7 @@ impl<S, F: Clone> Layer<S> for RequestMetricsLayer<F> {
RequestMetricsService {
inner,
metrics: self.metrics.clone(),
base_attributes: self.base_attributes.clone(),
attributes_factory: self.attributes_factory.clone(),
}
}
Expand All @@ -185,6 +194,7 @@ impl<S, F: Clone> Layer<S> for RequestMetricsLayer<F> {
pub struct RequestMetricsService<S, F = ()> {
inner: S,
metrics: Arc<Metrics>,
base_attributes: Vec<KeyValue>,
attributes_factory: F,
}

Expand All @@ -202,6 +212,7 @@ impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for RequestMetricsService<S, F> {
f.debug_struct("RequestMetricsService")
.field("inner", &self.inner)
.field("metrics", &self.metrics)
.field("base_attributes", &self.base_attributes)
.field("attributes_factory", &self.attributes_factory)
.finish()
}
Expand All @@ -212,6 +223,7 @@ impl<S: Clone, F: Clone> Clone for RequestMetricsService<S, F> {
Self {
inner: self.inner.clone(),
metrics: self.metrics.clone(),
base_attributes: self.base_attributes.clone(),
attributes_factory: self.attributes_factory.clone(),
}
}
Expand All @@ -226,13 +238,20 @@ impl<S, F> RequestMetricsService<S, F> {
where
F: AttributesFactory<State>,
{
let mut attributes = self.attributes_factory.attributes(5, ctx);
let mut attributes = self
.attributes_factory
.attributes(6 + self.base_attributes.len(), ctx);
attributes.extend(self.base_attributes.iter().cloned());

// server info
let request_ctx: Option<&mut RequestContext> = ctx
.get_or_try_insert_with_ctx(|ctx| (ctx, req).try_into())
.ok();
if let Some(authority) = request_ctx.as_ref().map(|rc| &rc.authority) {
attributes.push(KeyValue::new(
HTTP_REQUEST_HOST,
authority.host().to_string(),
));
attributes.push(KeyValue::new(SERVER_PORT, authority.port() as i64));
}

Expand Down
49 changes: 36 additions & 13 deletions rama-net/src/stream/layer/opentelemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ impl Metrics {
/// A layer that records network server metrics using OpenTelemetry.
pub struct NetworkMetricsLayer<F = ()> {
metrics: Arc<Metrics>,
base_attributes: Vec<KeyValue>,
attributes_factory: F,
}

impl<F: fmt::Debug> fmt::Debug for NetworkMetricsLayer<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("NetworkMetricsLayer")
.field("metrics", &self.metrics)
.field("base_attributes", &self.base_attributes)
.field("attributes_factory", &self.attributes_factory)
.finish()
}
Expand All @@ -79,6 +81,7 @@ impl<F: Clone> Clone for NetworkMetricsLayer<F> {
fn clone(&self) -> Self {
NetworkMetricsLayer {
metrics: self.metrics.clone(),
base_attributes: self.base_attributes.clone(),
attributes_factory: self.attributes_factory.clone(),
}
}
Expand All @@ -94,16 +97,21 @@ impl NetworkMetricsLayer {
/// Create a new [`NetworkMetricsLayer`] using the global [`Meter`] provider,
/// with a custom name and version.
pub fn custom(opts: MeterOptions) -> Self {
let meter = get_versioned_meter(
opts.service.unwrap_or_else(|| ServiceInfo {
name: rama_utils::info::NAME.to_owned(),
version: rama_utils::info::VERSION.to_owned(),
}),
opts.attributes,
);
let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
name: rama_utils::info::NAME.to_owned(),
version: rama_utils::info::VERSION.to_owned(),
});

let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));

let meter = get_versioned_meter(service_info);
let metrics = Metrics::new(meter, opts.metric_prefix);

Self {
metrics: Arc::new(metrics),
base_attributes: attributes,
attributes_factory: (),
}
}
Expand All @@ -113,6 +121,7 @@ impl NetworkMetricsLayer {
pub fn with_attributes<F>(self, attributes: F) -> NetworkMetricsLayer<F> {
NetworkMetricsLayer {
metrics: self.metrics,
base_attributes: self.base_attributes,
attributes_factory: attributes,
}
}
Expand All @@ -124,15 +133,12 @@ impl Default for NetworkMetricsLayer {
}
}

fn get_versioned_meter(service_info: ServiceInfo, attributes: Option<Vec<KeyValue>>) -> Meter {
let mut attributes = attributes.unwrap_or_else(|| Vec::with_capacity(2));
attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
fn get_versioned_meter(service_info: ServiceInfo) -> Meter {
global::meter_with_version(
service_info.name,
Some(service_info.version),
Some(semantic_conventions::SCHEMA_URL),
Some(attributes),
None,
)
}

Expand All @@ -143,6 +149,7 @@ impl<S, F: Clone> Layer<S> for NetworkMetricsLayer<F> {
NetworkMetricsService {
inner,
metrics: self.metrics.clone(),
base_attributes: self.base_attributes.clone(),
attributes_factory: self.attributes_factory.clone(),
}
}
Expand All @@ -152,6 +159,7 @@ impl<S, F: Clone> Layer<S> for NetworkMetricsLayer<F> {
pub struct NetworkMetricsService<S, F = ()> {
inner: S,
metrics: Arc<Metrics>,
base_attributes: Vec<KeyValue>,
attributes_factory: F,
}

Expand All @@ -169,17 +177,32 @@ impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for NetworkMetricsService<S, F> {
f.debug_struct("NetworkMetricsService")
.field("inner", &self.inner)
.field("metrics", &self.metrics)
.field("base_attributes", &self.base_attributes)
.field("attributes_factory", &self.attributes_factory)
.finish()
}
}

impl<S: Clone, F: Clone> Clone for NetworkMetricsService<S, F> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
metrics: self.metrics.clone(),
base_attributes: self.base_attributes.clone(),
attributes_factory: self.attributes_factory.clone(),
}
}
}

impl<S, F> NetworkMetricsService<S, F> {
fn compute_attributes<State>(&self, ctx: &Context<State>) -> Vec<KeyValue>
where
F: AttributesFactory<State>,
{
let mut attributes = self.attributes_factory.attributes(2, ctx);
let mut attributes = self
.attributes_factory
.attributes(2 + self.base_attributes.len(), ctx);
attributes.extend(self.base_attributes.iter().cloned());

// client info
if let Some(socket_info) = ctx.get::<SocketInfo>() {
Expand Down

0 comments on commit d394317

Please sign in to comment.