Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
tests: add integration test for billing limits
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Nov 16, 2023
1 parent f72c02a commit 1cc951e
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 40 deletions.
11 changes: 1 addition & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
thiserror = "1.0.48"
envconfig = "0.10.0"
redis = { version="0.23.3", features=["tokio-comp"] }
1 change: 1 addition & 0 deletions capture-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ assert-json-diff = { workspace = true }
futures = "0.3.29"
once_cell = "1.18.0"
rand = { workspace = true }
redis = { workspace = true }
rdkafka = { workspace = true }
reqwest = "0.11.22"
serde_json = { workspace = true }
88 changes: 79 additions & 9 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use rdkafka::config::{ClientConfig, FromClientConfig};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use redis::Commands;
use time::OffsetDateTime;
use tokio::sync::Notify;
use tracing::debug;

Expand All @@ -25,8 +27,9 @@ use capture::server::serve;

pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
print_sink: false,
address: SocketAddr::from_str("127.0.0.1:0").unwrap(),
redis_url: "redis://localhost:6379/".to_string(),
address: SocketAddr::from_str("127.0.0.1:0").unwrap(), // Use a random port
redis_url: "redis://localhost:6379/2".to_string(), // Use DB 2 to avoid overlap with devenv
redis_key_prefix: None, // Will be set if needed
burst_limit: NonZeroU32::new(5).unwrap(),
per_second_limit: NonZeroU32::new(10).unwrap(),
kafka: KafkaConfig {
Expand All @@ -39,7 +42,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
},
otel_url: None,
otel_sampling_rate: 0.0,
export_prometheus: false,
export_prometheus: false, // Not idempotent, second test would fail to start
});

static TRACING_INIT: Once = Once::new();
Expand All @@ -50,18 +53,46 @@ pub fn setup_tracing() {
.init()
});
}

pub trait ConfigMutator {
fn configure(&self, config: &mut Config);
}

pub struct ServerConfig {
config: Config,
}

impl Default for ServerConfig {
fn default() -> Self {
Self {
config: DEFAULT_CONFIG.clone(),
}
}
}

impl ServerConfig {
pub fn new(f: fn(&mut Config)) -> Self {
let mut config = DEFAULT_CONFIG.clone();
f(&mut config);
Self { config }
}
pub fn with(mut self, f: &impl ConfigMutator) -> Self {
f.configure(&mut self.config);
self
}

pub fn start(self) -> ServerHandle {
ServerHandle::new(self.config)
}
}

pub struct ServerHandle {
pub addr: SocketAddr,
shutdown: Arc<Notify>,
}

impl ServerHandle {
pub fn for_topic(topic: &EphemeralTopic) -> Self {
let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
Self::for_config(config)
}
pub fn for_config(config: Config) -> Self {
pub fn new(config: Config) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let notify = Arc::new(Notify::new());
Expand Down Expand Up @@ -171,6 +202,12 @@ impl EphemeralTopic {
}
}

impl ConfigMutator for EphemeralTopic {
fn configure(&self, config: &mut Config) {
config.kafka.kafka_topic = self.topic_name.clone()
}
}

impl Drop for EphemeralTopic {
fn drop(&mut self) {
debug!("dropping EphemeralTopic {}...", self.topic_name);
Expand Down Expand Up @@ -201,3 +238,36 @@ pub fn random_string(prefix: &str, length: usize) -> String {
.collect();
format!("{}_{}", prefix, suffix)
}

pub struct EphemeralRedis {
client: redis::Client,
key_prefix: String,
}

impl EphemeralRedis {
pub async fn new() -> Self {
Self {
client: redis::Client::open(DEFAULT_CONFIG.redis_url.clone())
.expect("failed to create Redis client"),
key_prefix: random_string("events_", 16),
}
}

pub fn add_billing_limit(
&self,
resource: &str,
token: &str,
until: OffsetDateTime,
) -> anyhow::Result<()> {
let mut conn = self.client.get_connection()?;
let key = format!("{}:::@posthog/quota-limits/{}", self.key_prefix, resource);
conn.zadd(key, token, until.unix_timestamp())?;
Ok(())
}
}

impl ConfigMutator for EphemeralRedis {
fn configure(&self, config: &mut Config) {
config.redis_key_prefix = Some(self.key_prefix.clone())
}
}
118 changes: 102 additions & 16 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::num::NonZeroU32;
use std::ops::Add;

use anyhow::Result;
use assert_json_diff::assert_json_include;
use reqwest::StatusCode;
use serde_json::json;
use time::{Duration, OffsetDateTime};

use crate::common::*;
mod common;
Expand All @@ -14,7 +16,7 @@ async fn it_captures_one_event() -> Result<()> {
let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerConfig::default().with(&topic).start();

let event = json!({
"token": token,
Expand Down Expand Up @@ -44,7 +46,7 @@ async fn it_captures_a_batch() -> Result<()> {
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic);
let server = ServerConfig::default().with(&topic).start();

let event = json!([{
"token": token,
Expand Down Expand Up @@ -84,13 +86,12 @@ async fn it_is_limited_with_burst() -> Result<()> {
let distinct_id = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(2).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);
let server = ServerConfig::new(|config| {
config.burst_limit = NonZeroU32::new(2).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();
})
.with(&topic)
.start();

let event = json!([{
"token": token,
Expand Down Expand Up @@ -133,13 +134,12 @@ async fn it_does_not_partition_limit_different_ids() -> Result<()> {
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;

let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.burst_limit = NonZeroU32::new(1).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();

let server = ServerHandle::for_config(config);
let server = ServerConfig::new(|config| {
config.burst_limit = NonZeroU32::new(1).unwrap();
config.per_second_limit = NonZeroU32::new(1).unwrap();
})
.with(&topic)
.start();

let event = json!([{
"token": token,
Expand All @@ -166,3 +166,89 @@ async fn it_does_not_partition_limit_different_ids() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn it_enforces_billing_limits() -> Result<()> {
setup_tracing();

let distinct_id = random_string("id", 16);
let topic = EphemeralTopic::new().await;
let redis = EphemeralRedis::new().await;

// team1 hit rate-limits for events, capture should drop
let token1 = random_string("token1", 16);
redis.add_billing_limit(
"events",
&token1,
OffsetDateTime::now_utc().add(Duration::hours(1)),
)?;

// team2 has no billing limit for events, allow input
let token2 = random_string("token2", 16);
redis.add_billing_limit(
"recordings",
&token2,
OffsetDateTime::now_utc().add(Duration::hours(1)),
)?;

// team3 was limited, but the new billing period started an hour ago, allow input
let token3 = random_string("token3", 16);
redis.add_billing_limit(
"events",
&token3,
OffsetDateTime::now_utc().add(Duration::hours(-1)),
)?;

let server = ServerConfig::default().with(&topic).with(&redis).start();

let res = server
.capture_events(
json!({
"token": token1,
"event": "NOK: should be dropped",
"distinct_id": distinct_id
})
.to_string(),
)
.await;
assert_eq!(StatusCode::OK, res.status());

let res = server
.capture_events(
json!({
"token": token2,
"event": "OK: no billing limit",
"distinct_id": distinct_id
})
.to_string(),
)
.await;
assert_eq!(StatusCode::OK, res.status());

let res = server
.capture_events(
json!({
"token": token3,
"event": "OK: billing limit expired",
"distinct_id": distinct_id
})
.to_string(),
)
.await;
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: topic.next_event()?,
expected: json!({
"token": token2, // Message with token1 should not pass through
})
);
assert_json_include!(
actual: topic.next_event()?,
expected: json!({
"token": token3,
})
);

Ok(())
}
2 changes: 1 addition & 1 deletion capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rdkafka = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
thiserror = { workspace = true }
redis = { version="0.23.3", features=["tokio-comp", "cluster", "cluster-async"] }
redis = { workspace = true }
envconfig = { workspace = true }
dashmap = "5.5.3"

Expand Down
3 changes: 2 additions & 1 deletion capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ pub struct Config {
pub address: SocketAddr,

pub redis_url: String,
pub redis_key_prefix: Option<String>,
pub redis_key_prefix: Option<String>, // Used for integration tests only

pub otel_url: Option<String>,

#[envconfig(default = "100")]
Expand Down
2 changes: 1 addition & 1 deletion capture/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait Client {

pub struct RedisClient {
client: redis::Client,
key_prefix: Option<String>
key_prefix: Option<String>,
}

impl RedisClient {
Expand Down
6 changes: 4 additions & 2 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ where
{
let liveness = HealthRegistry::new("liveness");

let redis_client =
Arc::new(RedisClient::new(config.redis_url, config.redis_key_prefix).expect("failed to create redis client"));
let redis_client = Arc::new(
RedisClient::new(config.redis_url, config.redis_key_prefix)
.expect("failed to create redis client"),
);

let billing = BillingLimiter::new(Duration::seconds(5), redis_client.clone())
.expect("failed to create billing limiter");
Expand Down

0 comments on commit 1cc951e

Please sign in to comment.