Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
feat(flags): Basic flags service (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
neilkakkar authored May 7, 2024
1 parent ae707cb commit 871441b
Show file tree
Hide file tree
Showing 14 changed files with 594 additions and 13 deletions.
48 changes: 36 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ resolver = "2"
members = [
"capture",
"common/health",
"feature-flags",
"hook-api",
"hook-common",
"hook-janitor",
Expand Down Expand Up @@ -49,7 +50,7 @@ opentelemetry-otlp = "0.15.0"
opentelemetry_sdk = { version = "0.22.1", features = ["trace", "rt-tokio"] }
rand = "0.8.5"
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
reqwest = { version = "0.12.3", features = ["stream"] }
reqwest = { version = "0.12.3", features = ["json", "stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_derive = { version = "1.0" }
serde_json = { version = "1.0" }
Expand Down
35 changes: 35 additions & 0 deletions feature-flags/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "feature-flags"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
axum-client-ip = { workspace = true }
envconfig = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
bytes = { workspace = true }
rand = { workspace = true }
redis = { version = "0.23.3", features = [
"tokio-comp",
"cluster",
"cluster-async",
] }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }

[lints]
workspace = true

[dev-dependencies]
assert-json-diff = { workspace = true }
once_cell = "1.18.0"
reqwest = { workspace = true }

58 changes: 58 additions & 0 deletions feature-flags/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::collections::HashMap;

use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use thiserror::Error;

#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum FlagsResponseCode {
Ok = 1,
}

#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FlagsResponse {
pub error_while_computing_flags: bool,
// TODO: better typing here, support bool responses
pub feature_flags: HashMap<String, String>,
}

#[derive(Error, Debug)]
pub enum FlagError {
#[error("failed to decode request: {0}")]
RequestDecodingError(String),
#[error("failed to parse request: {0}")]
RequestParsingError(#[from] serde_json::Error),

#[error("Empty distinct_id in request")]
EmptyDistinctId,
#[error("No distinct_id in request")]
MissingDistinctId,

#[error("No api_key in request")]
NoTokenError,
#[error("API key is not valid")]
TokenValidationError,

#[error("rate limited")]
RateLimited,
}

impl IntoResponse for FlagError {
fn into_response(self) -> Response {
match self {
FlagError::RequestDecodingError(_)
| FlagError::RequestParsingError(_)
| FlagError::EmptyDistinctId
| FlagError::MissingDistinctId => (StatusCode::BAD_REQUEST, self.to_string()),

FlagError::NoTokenError | FlagError::TokenValidationError => {
(StatusCode::UNAUTHORIZED, self.to_string())
}

FlagError::RateLimited => (StatusCode::TOO_MANY_REQUESTS, self.to_string()),
}
.into_response()
}
}
24 changes: 24 additions & 0 deletions feature-flags/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::net::SocketAddr;

use envconfig::Envconfig;

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "127.0.0.1:0")]
pub address: SocketAddr,

#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")]
pub write_database_url: String,

#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")]
pub read_database_url: String,

#[envconfig(default = "1024")]
pub max_concurrent_jobs: usize,

#[envconfig(default = "100")]
pub max_pg_connections: u32,

#[envconfig(default = "redis://localhost:6379/")]
pub redis_url: String,
}
7 changes: 7 additions & 0 deletions feature-flags/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pub mod api;
pub mod config;
pub mod redis;
pub mod router;
pub mod server;
pub mod v0_endpoint;
pub mod v0_request;
39 changes: 39 additions & 0 deletions feature-flags/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use envconfig::Envconfig;
use tokio::signal;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};

use feature_flags::config::Config;
use feature_flags::server::serve;

async fn shutdown() {
let mut term = signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to register SIGTERM handler");

let mut interrupt = signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("failed to register SIGINT handler");

tokio::select! {
_ = term.recv() => {},
_ = interrupt.recv() => {},
};

tracing::info!("Shutting down gracefully...");
}

#[tokio::main]
async fn main() {
let config = Config::init_from_env().expect("Invalid configuration:");

// Basic logging for now:
// - stdout with a level configured by the RUST_LOG envvar (default=ERROR)
let log_layer = tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env());
tracing_subscriber::registry().with(log_layer).init();

// Open the TCP port and start the server
let listener = tokio::net::TcpListener::bind(config.address)
.await
.expect("could not bind port");
serve(config, listener, shutdown()).await
}
77 changes: 77 additions & 0 deletions feature-flags/src/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use redis::AsyncCommands;
use tokio::time::timeout;

// average for all commands is <10ms, check grafana
const REDIS_TIMEOUT_MILLISECS: u64 = 10;

/// A simple redis wrapper
/// Copied from capture/src/redis.rs.
/// TODO: Modify this to support hincrby, get, and set commands.
#[async_trait]
pub trait Client {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>>;
}

pub struct RedisClient {
client: redis::Client,
}

impl RedisClient {
pub fn new(addr: String) -> Result<RedisClient> {
let client = redis::Client::open(addr)?;

Ok(RedisClient { client })
}
}

#[async_trait]
impl Client for RedisClient {
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>> {
let mut conn = self.client.get_async_connection().await?;

let results = conn.zrangebyscore(k, min, max);
let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?;

Ok(fut?)
}
}

// TODO: Find if there's a better way around this.
#[derive(Clone)]
pub struct MockRedisClient {
zrangebyscore_ret: Vec<String>,
}

impl MockRedisClient {
pub fn new() -> MockRedisClient {
MockRedisClient {
zrangebyscore_ret: Vec::new(),
}
}

pub fn zrangebyscore_ret(&mut self, ret: Vec<String>) -> Self {
self.zrangebyscore_ret = ret;

self.clone()
}
}

impl Default for MockRedisClient {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl Client for MockRedisClient {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result<Vec<String>> {
Ok(self.zrangebyscore_ret.clone())
}
}
Loading

0 comments on commit 871441b

Please sign in to comment.