Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
metrics: consolidate into a labelled capture_events_dropped_total metric
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Oct 30, 2023
1 parent 53017d7 commit 9cf0959
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
5 changes: 3 additions & 2 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use time::OffsetDateTime;

use crate::billing_limits::QuotaResource;
use crate::event::ProcessingContext;
use crate::prometheus::report_dropped_events;
use crate::token::validate_token;
use crate::{
api::{CaptureError, CaptureResponse, CaptureResponseCode},
Expand Down Expand Up @@ -50,7 +51,7 @@ pub async fn event(
}

let token = extract_and_verify_token(&events).map_err(|err| {
counter!("capture_token_shape_invalid_total", events.len() as u64);
report_dropped_events("token_shape_invalid", events.len() as u64);
err
})?;

Expand Down Expand Up @@ -81,7 +82,7 @@ pub async fn event(
.await;

if limited {
counter!("capture_events_dropped_over_quota", 1);
report_dropped_events("over_quota", 1);

// for v0 we want to just return ok 🙃
// this is because the clients are pretty dumb and will just retry over and over and
Expand Down
4 changes: 4 additions & 0 deletions capture/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
use std::time::Instant;

use axum::{extract::MatchedPath, http::Request, middleware::Next, response::IntoResponse};
use metrics::counter;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};

pub fn report_dropped_events(cause: &'static str, quantity: u64) {
counter!("capture_events_dropped_total", quantity, "cause" => cause);
}
pub fn setup_metrics_recorder() -> PrometheusHandle {
// Ok I broke it at the end, but the limit on our ingress is 60 and that's a nicer way of reaching it
const EXPONENTIAL_SECONDS: &[f64] = &[
Expand Down
10 changes: 4 additions & 6 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use rdkafka::util::Timeout;
use tracing::info;

use crate::event::ProcessedEvent;
use crate::prometheus::report_dropped_events;

#[async_trait]
pub trait EventSink {
Expand Down Expand Up @@ -152,18 +153,15 @@ impl KafkaSink {
timestamp: None,
headers: None,
}) {
Ok(_) => {
metrics::increment_counter!("capture_events_ingested");
Ok(())
}
Ok(_) => Ok(()),
Err((e, _)) => match e.rdkafka_error_code() {
Some(RDKafkaErrorCode::InvalidMessageSize) => {
metrics::increment_counter!("capture_events_dropped_too_big");
report_dropped_events("kafka_message_size", 1);
Err(CaptureError::EventTooBig)
}
_ => {
// TODO(maybe someday): Don't drop them but write them somewhere and try again
metrics::increment_counter!("capture_events_dropped");
report_dropped_events("kafka_write_error", 1);
tracing::error!("failed to produce event: {}", e);
Err(CaptureError::RetryableSinkError)
}
Expand Down

0 comments on commit 9cf0959

Please sign in to comment.