From 23ef91ebd2bf5cc39b5e23e7dde3c9e17b8f2817 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 29 Apr 2024 16:08:00 +0200 Subject: [PATCH] keep overflow detection in sink pending removal --- capture/src/api.rs | 9 ++++- capture/src/sinks/kafka.rs | 67 +++++++++++++--------------------- capture/src/sinks/mod.rs | 15 +------- capture/src/sinks/print.rs | 14 +++---- capture/src/v0_endpoint.rs | 19 +++++----- capture/src/v0_request.rs | 3 +- capture/tests/django_compat.rs | 10 ++--- 7 files changed, 54 insertions(+), 83 deletions(-) diff --git a/capture/src/api.rs b/capture/src/api.rs index 91ed578..c79c69f 100644 --- a/capture/src/api.rs +++ b/capture/src/api.rs @@ -80,8 +80,15 @@ impl IntoResponse for CaptureError { } } -#[derive(Clone, Default, Debug, Serialize, Eq, PartialEq)] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum DataType { + AnalyticsMain, + AnalyticsHistorical, +} +#[derive(Clone, Debug, Serialize, Eq, PartialEq)] pub struct ProcessedEvent { + #[serde(skip_serializing)] + pub data_type: DataType, pub uuid: Uuid, pub distinct_id: String, pub ip: String, diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index 83cdb0b..945e581 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -11,11 +11,11 @@ use tokio::task::JoinSet; use tracing::log::{debug, error, info}; use tracing::{info_span, instrument, Instrument}; -use crate::api::{CaptureError, ProcessedEvent}; +use crate::api::{CaptureError, DataType, ProcessedEvent}; use crate::config::KafkaConfig; use crate::limiters::overflow::OverflowLimiter; use crate::prometheus::report_dropped_events; -use crate::sinks::{DataType, Event}; +use crate::sinks::Event; struct KafkaContext { liveness: HealthHandle, @@ -139,28 +139,23 @@ impl KafkaSink { self.producer.flush(Duration::new(30, 0)) } - async fn kafka_send( - &self, - event: ProcessedEvent, - data_type: &DataType, - ) -> Result { + async fn kafka_send(&self, event: ProcessedEvent) -> Result { let payload = serde_json::to_string(&event).map_err(|e| { error!("failed to serialize event: {}", e); CaptureError::NonRetryableSinkError })?; let event_key = event.key(); - let (topic, partition_key): (&str, Option<&str>) = match data_type { + let (topic, partition_key): (&str, Option<&str>) = match &event.data_type { + DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events DataType::AnalyticsMain => { - // TODO: move overflow up in the handler + // TODO: deprecate capture-led overflow or move logic in handler if self.partition.is_limited(&event_key) { - (&self.main_topic, None) + (&self.main_topic, None) // Analytics overflow goes to the main topic without locality } else { (&self.main_topic, Some(event_key.as_str())) } } - DataType::AnalyticsOverflow => (&self.main_topic, None), // Overflow is going on the main topic for analytics - DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events }; match self.producer.send_result(FutureRecord { @@ -217,8 +212,8 @@ impl KafkaSink { #[async_trait] impl Event for KafkaSink { #[instrument(skip_all)] - async fn send(&self, data_type: DataType, event: ProcessedEvent) -> Result<(), CaptureError> { - let ack = self.kafka_send(event, &data_type).await?; + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { + let ack = self.kafka_send(event).await?; histogram!("capture_event_batch_size").record(1.0); Self::process_ack(ack) .instrument(info_span!("ack_wait_one")) @@ -226,16 +221,12 @@ impl Event for KafkaSink { } #[instrument(skip_all)] - async fn send_batch( - &self, - data_type: DataType, - events: Vec, - ) -> Result<(), CaptureError> { + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { let mut set = JoinSet::new(); let batch_size = events.len(); for event in events { // We await kafka_send to get events in the producer queue sequentially - let ack = self.kafka_send(event, &data_type).await?; + let ack = self.kafka_send(event).await?; // Then stash the returned DeliveryFuture, waiting concurrently for the write ACKs from brokers. set.spawn(Self::process_ack(ack)); @@ -269,11 +260,11 @@ impl Event for KafkaSink { #[cfg(test)] mod tests { - use crate::api::{CaptureError, ProcessedEvent}; + use crate::api::{CaptureError, DataType, ProcessedEvent}; use crate::config; use crate::limiters::overflow::OverflowLimiter; use crate::sinks::kafka::KafkaSink; - use crate::sinks::{DataType, Event}; + use crate::sinks::Event; use crate::utils::uuid_v7; use health::HealthRegistry; use rand::distributions::Alphanumeric; @@ -316,6 +307,7 @@ mod tests { let (cluster, sink) = start_on_mocked_sink().await; let event: ProcessedEvent = ProcessedEvent { + data_type: DataType::AnalyticsMain, uuid: uuid_v7(), distinct_id: "id1".to_string(), ip: "".to_string(), @@ -327,20 +319,16 @@ mod tests { // Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster for _ in 0..20 { - if sink - .send(DataType::AnalyticsMain, event.clone()) - .await - .is_ok() - { + if sink.send(event.clone()).await.is_ok() { break; } } // Send events to confirm happy path - sink.send(DataType::AnalyticsMain, event.clone()) + sink.send(event.clone()) .await .expect("failed to send one initial event"); - sink.send_batch(DataType::AnalyticsMain, vec![event.clone(), event.clone()]) + sink.send_batch(vec![event.clone(), event.clone()]) .await .expect("failed to send initial event batch"); @@ -351,6 +339,7 @@ mod tests { .map(char::from) .collect(); let big_event: ProcessedEvent = ProcessedEvent { + data_type: DataType::AnalyticsMain, uuid: uuid_v7(), distinct_id: "id1".to_string(), ip: "".to_string(), @@ -359,7 +348,7 @@ mod tests { sent_at: None, token: "token1".to_string(), }; - match sink.send(DataType::AnalyticsMain, big_event).await { + match sink.send(big_event).await { Err(CaptureError::EventTooBig) => {} // Expected Err(err) => panic!("wrong error code {}", err), Ok(()) => panic!("should have errored"), @@ -369,7 +358,7 @@ mod tests { cluster.clear_request_errors(RDKafkaApiKey::Produce); let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE; 1]; cluster.request_errors(RDKafkaApiKey::Produce, &err); - match sink.send(DataType::AnalyticsMain, event.clone()).await { + match sink.send(event.clone()).await { Err(CaptureError::EventTooBig) => {} // Expected Err(err) => panic!("wrong error code {}", err), Ok(()) => panic!("should have errored"), @@ -377,10 +366,7 @@ mod tests { cluster.clear_request_errors(RDKafkaApiKey::Produce); let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_INVALID_PARTITIONS; 1]; cluster.request_errors(RDKafkaApiKey::Produce, &err); - match sink - .send_batch(DataType::AnalyticsMain, vec![event.clone(), event.clone()]) - .await - { + match sink.send_batch(vec![event.clone(), event.clone()]).await { Err(CaptureError::RetryableSinkError) => {} // Expected Err(err) => panic!("wrong error code {}", err), Ok(()) => panic!("should have errored"), @@ -390,13 +376,13 @@ mod tests { cluster.clear_request_errors(RDKafkaApiKey::Produce); let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2]; cluster.request_errors(RDKafkaApiKey::Produce, &err); - sink.send(DataType::AnalyticsMain, event.clone()) + sink.send(event.clone()) .await .expect("failed to send one event after recovery"); cluster.clear_request_errors(RDKafkaApiKey::Produce); let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 2]; cluster.request_errors(RDKafkaApiKey::Produce, &err); - sink.send_batch(DataType::AnalyticsMain, vec![event.clone(), event.clone()]) + sink.send_batch(vec![event.clone(), event.clone()]) .await .expect("failed to send event batch after recovery"); @@ -404,15 +390,12 @@ mod tests { cluster.clear_request_errors(RDKafkaApiKey::Produce); let err = [RDKafkaRespErr::RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE; 50]; cluster.request_errors(RDKafkaApiKey::Produce, &err); - match sink.send(DataType::AnalyticsMain, event.clone()).await { + match sink.send(event.clone()).await { Err(CaptureError::RetryableSinkError) => {} // Expected Err(err) => panic!("wrong error code {}", err), Ok(()) => panic!("should have errored"), }; - match sink - .send_batch(DataType::AnalyticsMain, vec![event.clone(), event.clone()]) - .await - { + match sink.send_batch(vec![event.clone(), event.clone()]).await { Err(CaptureError::RetryableSinkError) => {} // Expected Err(err) => panic!("wrong error code {}", err), Ok(()) => panic!("should have errored"), diff --git a/capture/src/sinks/mod.rs b/capture/src/sinks/mod.rs index 49ca00c..bedbcbc 100644 --- a/capture/src/sinks/mod.rs +++ b/capture/src/sinks/mod.rs @@ -5,19 +5,8 @@ use crate::api::{CaptureError, ProcessedEvent}; pub mod kafka; pub mod print; -#[derive(Debug, Copy, Clone)] -pub enum DataType { - AnalyticsMain, - AnalyticsOverflow, - AnalyticsHistorical, -} - #[async_trait] pub trait Event { - async fn send(&self, data_type: DataType, event: ProcessedEvent) -> Result<(), CaptureError>; - async fn send_batch( - &self, - data_type: DataType, - events: Vec, - ) -> Result<(), CaptureError>; + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError>; + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError>; } diff --git a/capture/src/sinks/print.rs b/capture/src/sinks/print.rs index 5dc18eb..c67dc7a 100644 --- a/capture/src/sinks/print.rs +++ b/capture/src/sinks/print.rs @@ -3,30 +3,26 @@ use metrics::{counter, histogram}; use tracing::log::info; use crate::api::{CaptureError, ProcessedEvent}; -use crate::sinks::{DataType, Event}; +use crate::sinks::Event; pub struct PrintSink {} #[async_trait] impl Event for PrintSink { - async fn send(&self, data_type: DataType, event: ProcessedEvent) -> Result<(), CaptureError> { - info!("single {:?} event: {:?}", data_type, event); + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { + info!("single {:?} event: {:?}", event.data_type, event); counter!("capture_events_ingested_total").increment(1); Ok(()) } - async fn send_batch( - &self, - data_type: DataType, - events: Vec, - ) -> Result<(), CaptureError> { + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { let span = tracing::span!(tracing::Level::INFO, "batch of events"); let _enter = span.enter(); histogram!("capture_event_batch_size").record(events.len() as f64); counter!("capture_events_ingested_total").increment(events.len() as u64); for event in events { - info!("{:?} event: {:?}", data_type, event); + info!("{:?} event: {:?}", event.data_type, event); } Ok(()) diff --git a/capture/src/v0_endpoint.rs b/capture/src/v0_endpoint.rs index 202b5be..5782d53 100644 --- a/capture/src/v0_endpoint.rs +++ b/capture/src/v0_endpoint.rs @@ -13,10 +13,9 @@ use tracing::instrument; use crate::limiters::billing::QuotaResource; use crate::prometheus::report_dropped_events; -use crate::sinks::DataType; use crate::v0_request::{Compression, ProcessingContext, RawRequest}; use crate::{ - api::{CaptureError, CaptureResponse, CaptureResponseCode, ProcessedEvent}, + api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent}, router, sinks, utils::uuid_v7, v0_request::{EventFormData, EventQuery, RawEvent}, @@ -121,16 +120,12 @@ pub async fn event( counter!("capture_events_received_total").increment(events.len() as u64); let context = ProcessingContext { - data_type: if is_historical { - DataType::AnalyticsHistorical - } else { - DataType::AnalyticsMain - }, lib_version: meta.lib_version.clone(), sent_at, token, now: state.timesource.current_time(), client_ip: ip.to_string(), + is_historical, }; let billing_limited = state @@ -180,12 +175,18 @@ pub fn process_single_event( return Err(CaptureError::MissingEventName); } + let data_type = match context.is_historical { + true => DataType::AnalyticsHistorical, + false => DataType::AnalyticsMain, + }; + let data = serde_json::to_string(&event).map_err(|e| { tracing::error!("failed to encode data field: {}", e); CaptureError::NonRetryableSinkError })?; Ok(ProcessedEvent { + data_type, uuid: event.uuid.unwrap_or_else(uuid_v7), distinct_id: event.extract_distinct_id()?, ip: context.client_ip.clone(), @@ -210,8 +211,8 @@ pub async fn process_events<'a>( tracing::debug!(events=?events, "processed {} events", events.len()); if events.len() == 1 { - sink.send(context.data_type, events[0].clone()).await + sink.send(events[0].clone()).await } else { - sink.send_batch(context.data_type, events).await + sink.send_batch(events).await } } diff --git a/capture/src/v0_request.rs b/capture/src/v0_request.rs index 1a04211..172ffaf 100644 --- a/capture/src/v0_request.rs +++ b/capture/src/v0_request.rs @@ -11,7 +11,6 @@ use tracing::instrument; use uuid::Uuid; use crate::api::CaptureError; -use crate::sinks::DataType; use crate::token::validate_token; #[derive(Deserialize, Default)] @@ -228,12 +227,12 @@ impl RawEvent { #[derive(Debug)] pub struct ProcessingContext { - pub data_type: DataType, pub lib_version: Option, pub sent_at: Option, pub token: String, pub now: String, pub client_ip: String, + pub is_historical: bool, } #[cfg(test)] diff --git a/capture/tests/django_compat.rs b/capture/tests/django_compat.rs index d9615c3..9ef4e39 100644 --- a/capture/tests/django_compat.rs +++ b/capture/tests/django_compat.rs @@ -8,7 +8,7 @@ use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode, Processed use capture::limiters::billing::BillingLimiter; use capture::redis::MockRedisClient; use capture::router::router; -use capture::sinks::{DataType, Event}; +use capture::sinks::Event; use capture::time::TimeSource; use health::HealthRegistry; use serde::Deserialize; @@ -61,16 +61,12 @@ impl MemorySink { #[async_trait] impl Event for MemorySink { - async fn send(&self, _: DataType, event: ProcessedEvent) -> Result<(), CaptureError> { + async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> { self.events.lock().unwrap().push(event); Ok(()) } - async fn send_batch( - &self, - _: DataType, - events: Vec, - ) -> Result<(), CaptureError> { + async fn send_batch(&self, events: Vec) -> Result<(), CaptureError> { self.events.lock().unwrap().extend_from_slice(&events); Ok(()) }