From ebaf596ec571f0c2c2cc8bafbdd4ee932ac28b2a Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 27 May 2024 14:23:07 +0200 Subject: [PATCH] capture: add overflow_enabled option (#43) --- capture/src/config.rs | 3 +++ capture/src/server.rs | 42 ++++++++++++++++------------- capture/src/sinks/kafka.rs | 14 ++++++---- capture/tests/common.rs | 1 + capture/tests/events.rs | 54 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 23 deletions(-) diff --git a/capture/src/config.rs b/capture/src/config.rs index 07b7f89..d91e7b7 100644 --- a/capture/src/config.rs +++ b/capture/src/config.rs @@ -13,6 +13,9 @@ pub struct Config { pub redis_url: String, pub otel_url: Option, + #[envconfig(default = "false")] + pub overflow_enabled: bool, + #[envconfig(default = "100")] pub overflow_per_second_limit: NonZeroU32, diff --git a/capture/src/server.rs b/capture/src/server.rs index 0704987..8585036 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -48,24 +48,30 @@ where .register("rdkafka".to_string(), Duration::seconds(30)) .await; - let partition = OverflowLimiter::new( - config.overflow_per_second_limit, - config.overflow_burst_limit, - config.overflow_forced_keys, - ); - if config.export_prometheus { - let partition = partition.clone(); - tokio::spawn(async move { - partition.report_metrics().await; - }); - } - { - // Ensure that the rate limiter state does not grow unbounded - let partition = partition.clone(); - tokio::spawn(async move { - partition.clean_state().await; - }); - } + let partition = match config.overflow_enabled { + false => None, + true => { + let partition = OverflowLimiter::new( + config.overflow_per_second_limit, + config.overflow_burst_limit, + config.overflow_forced_keys, + ); + if config.export_prometheus { + let partition = partition.clone(); + tokio::spawn(async move { + partition.report_metrics().await; + }); + } + { + // Ensure that the rate limiter state does not grow unbounded + let partition = partition.clone(); + tokio::spawn(async move { + partition.clean_state().await; + }); + } + Some(partition) + } + }; let sink = KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink"); diff --git a/capture/src/sinks/kafka.rs b/capture/src/sinks/kafka.rs index 945e581..bc45fa1 100644 --- a/capture/src/sinks/kafka.rs +++ b/capture/src/sinks/kafka.rs @@ -80,7 +80,7 @@ impl rdkafka::ClientContext for KafkaContext { #[derive(Clone)] pub struct KafkaSink { producer: FutureProducer, - partition: OverflowLimiter, + partition: Option, main_topic: String, historical_topic: String, } @@ -89,7 +89,7 @@ impl KafkaSink { pub fn new( config: KafkaConfig, liveness: HealthHandle, - partition: OverflowLimiter, + partition: Option, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -150,7 +150,11 @@ impl KafkaSink { DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events DataType::AnalyticsMain => { // TODO: deprecate capture-led overflow or move logic in handler - if self.partition.is_limited(&event_key) { + let is_limited = match &self.partition { + None => false, + Some(partition) => partition.is_limited(&event_key), + }; + if is_limited { (&self.main_topic, None) // Analytics overflow goes to the main topic without locality } else { (&self.main_topic, Some(event_key.as_str())) @@ -280,11 +284,11 @@ mod tests { let handle = registry .register("one".to_string(), Duration::seconds(30)) .await; - let limiter = OverflowLimiter::new( + let limiter = Some(OverflowLimiter::new( NonZeroU32::new(10).unwrap(), NonZeroU32::new(10).unwrap(), None, - ); + )); let cluster = MockCluster::new(1).expect("failed to create mock brokers"); let config = config::KafkaConfig { kafka_producer_linger_ms: 0, diff --git a/capture/tests/common.rs b/capture/tests/common.rs index 788e6e2..868b27c 100644 --- a/capture/tests/common.rs +++ b/capture/tests/common.rs @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { print_sink: false, address: SocketAddr::from_str("127.0.0.1:0").unwrap(), redis_url: "redis://localhost:6379/".to_string(), + overflow_enabled: false, overflow_burst_limit: NonZeroU32::new(5).unwrap(), overflow_per_second_limit: NonZeroU32::new(10).unwrap(), overflow_forced_keys: None, diff --git a/capture/tests/events.rs b/capture/tests/events.rs index 111b02c..7d2defc 100644 --- a/capture/tests/events.rs +++ b/capture/tests/events.rs @@ -174,6 +174,7 @@ async fn it_overflows_events_on_burst() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); @@ -223,6 +224,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { let mut config = DEFAULT_CONFIG.clone(); config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = true; config.overflow_burst_limit = NonZeroU32::new(1).unwrap(); config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); @@ -254,6 +256,58 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> { Ok(()) } +#[tokio::test] +async fn it_skips_overflows_when_disabled() -> Result<()> { + setup_tracing(); + + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + + let mut config = DEFAULT_CONFIG.clone(); + config.kafka.kafka_topic = topic.topic_name().to_string(); + config.overflow_enabled = false; + config.overflow_burst_limit = NonZeroU32::new(2).unwrap(); + config.overflow_per_second_limit = NonZeroU32::new(1).unwrap(); + + let server = ServerHandle::for_config(config).await; + + let event = json!([{ + "token": token, + "event": "event1", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event2", + "distinct_id": distinct_id + },{ + "token": token, + "event": "event3", + "distinct_id": distinct_id + }]); + + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + + // Should have triggered overflow, but has not + assert_eq!( + topic.next_message_key()?.unwrap(), + format!("{}:{}", token, distinct_id) + ); + Ok(()) +} + #[tokio::test] async fn it_trims_distinct_id() -> Result<()> { setup_tracing();