Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

capture: add overflow_enabled option #43

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Config {
pub redis_url: String,
pub otel_url: Option<String>,

#[envconfig(default = "false")]
pub overflow_enabled: bool,

#[envconfig(default = "100")]
pub overflow_per_second_limit: NonZeroU32,

Expand Down
42 changes: 24 additions & 18 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,30 @@ where
.register("rdkafka".to_string(), Duration::seconds(30))
.await;

let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
}
let partition = match config.overflow_enabled {
false => None,
true => {
let partition = OverflowLimiter::new(
config.overflow_per_second_limit,
config.overflow_burst_limit,
config.overflow_forced_keys,
);
if config.export_prometheus {
let partition = partition.clone();
tokio::spawn(async move {
partition.report_metrics().await;
});
}
{
// Ensure that the rate limiter state does not grow unbounded
let partition = partition.clone();
tokio::spawn(async move {
partition.clean_state().await;
});
Comment on lines +67 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, wondering if we should Arc also the OverflowLimiter's forced_keys HashSet, since that's being cloned here it could be expensive if it's too large.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyways, seems like that was there from before, not a blocker on this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. forced_keys is empty though, supported as a last resort if the volume is so big that plugin-server cannot forward to overflow fast enough. Cloning it as long as it has a zero or a couple of entries might be cheaper than the Arc.
Should probably be superseeded by a redis zset instead.

}
Some(partition)
}
};
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

Expand Down
14 changes: 9 additions & 5 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl rdkafka::ClientContext for KafkaContext {
#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
main_topic: String,
historical_topic: String,
}
Expand All @@ -89,7 +89,7 @@ impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: OverflowLimiter,
partition: Option<OverflowLimiter>,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -150,7 +150,11 @@ impl KafkaSink {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
// TODO: deprecate capture-led overflow or move logic in handler
if self.partition.is_limited(&event_key) {
let is_limited = match &self.partition {
None => false,
Some(partition) => partition.is_limited(&event_key),
};
if is_limited {
(&self.main_topic, None) // Analytics overflow goes to the main topic without locality
} else {
(&self.main_topic, Some(event_key.as_str()))
Expand Down Expand Up @@ -280,11 +284,11 @@ mod tests {
let handle = registry
.register("one".to_string(), Duration::seconds(30))
.await;
let limiter = OverflowLimiter::new(
let limiter = Some(OverflowLimiter::new(
NonZeroU32::new(10).unwrap(),
NonZeroU32::new(10).unwrap(),
None,
);
));
let cluster = MockCluster::new(1).expect("failed to create mock brokers");
let config = config::KafkaConfig {
kafka_producer_linger_ms: 0,
Expand Down
1 change: 1 addition & 0 deletions capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
overflow_enabled: false,
overflow_burst_limit: NonZeroU32::new(5).unwrap(),
overflow_per_second_limit: NonZeroU32::new(10).unwrap(),
overflow_forced_keys: None,
Expand Down
54 changes: 54 additions & 0 deletions capture/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async fn it_overflows_events_on_burst() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = true;
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

Expand Down Expand Up @@ -223,6 +224,7 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = true;
config.overflow_burst_limit = NonZeroU32::new(1).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

Expand Down Expand Up @@ -254,6 +256,58 @@ async fn it_does_not_overflow_team_with_different_ids() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn it_skips_overflows_when_disabled() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.overflow_enabled = false;
config.overflow_burst_limit = NonZeroU32::new(2).unwrap();
config.overflow_per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config).await;

let event = json!([{
"token": token,
"event": "event1",
"distinct_id": distinct_id
},{
"token": token,
"event": "event2",
"distinct_id": distinct_id
},{
"token": token,
"event": "event3",
"distinct_id": distinct_id
}]);

let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);

// Should have triggered overflow, but has not
assert_eq!(
topic.next_message_key()?.unwrap(),
format!("{}:{}", token, distinct_id)
);
Ok(())
}

#[tokio::test]
async fn it_trims_distinct_id() -> Result<()> {
setup_tracing();
Expand Down
Loading