Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

kafka: check reachability + collect metrics #37

Merged
merged 3 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions capture-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct Config {
#[envconfig(default = "127.0.0.1:3000")]
address: SocketAddr,
redis_url: String,

kafka_hosts: String,
kafka_topic: String,
}
Expand All @@ -34,6 +35,9 @@ async fn shutdown() {

#[tokio::main]
async fn main() {
// initialize tracing
tracing_subscriber::fmt::init();

let config = Config::init_from_env().expect("Invalid configuration:");

let redis_client =
Expand Down Expand Up @@ -62,9 +66,6 @@ async fn main() {
)
};

// initialize tracing
tracing_subscriber::fmt::init();

// run our app with hyper
// `axum::Server` is a re-export of `hyper::Server`

Expand Down
8 changes: 2 additions & 6 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pub async fn event(
headers: HeaderMap,
body: Bytes,
) -> Result<Json<CaptureResponse>, CaptureError> {
tracing::debug!(len = body.len(), "new event request");

let events = match headers
.get("content-type")
.map_or("", |v| v.to_str().unwrap_or(""))
Expand All @@ -47,8 +45,6 @@ pub async fn event(
_ => RawEvent::from_bytes(&meta, body),
}?;

tracing::debug!("got events {:?}", &events);

if events.is_empty() {
return Err(CaptureError::EmptyBatch);
}
Expand Down Expand Up @@ -98,7 +94,7 @@ pub async fn event(
}));
}

tracing::debug!("got context {:?}", &context);
tracing::debug!(context=?context, events=?events, "decoded request");

process_events(state.sink.clone(), &events, &context).await?;

Expand Down Expand Up @@ -169,7 +165,7 @@ pub async fn process_events<'a>(
.map(|e| process_single_event(e, context))
.collect::<Result<Vec<ProcessedEvent>, CaptureError>>()?;

println!("Processed events: {:?}", events);
tracing::debug!(events=?events, "processed {} events", events.len());

if events.len() == 1 {
sink.send(events[0].clone()).await?;
Expand Down
78 changes: 73 additions & 5 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use async_trait::async_trait;
use metrics::{counter, histogram};
use metrics::{absolute_counter, counter, gauge, histogram};
use std::time::Duration;
use tokio::task::JoinSet;

use crate::api::CaptureError;
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::Producer;
use rdkafka::util::Timeout;
use tracing::info;

use crate::event::ProcessedEvent;

Expand Down Expand Up @@ -39,25 +43,89 @@ impl EventSink for PrintSink {
}
}

struct KafkaContext;

impl rdkafka::ClientContext for KafkaContext {
fn stats(&self, stats: rdkafka::Statistics) {
gauge!("capture_kafka_callback_queue_depth", stats.replyq as f64);
gauge!("capture_kafka_producer_queue_depth", stats.msg_cnt as f64);
gauge!(
"capture_kafka_producer_queue_depth_limit",
stats.msg_max as f64
);
gauge!("capture_kafka_producer_queue_bytes", stats.msg_max as f64);
gauge!(
"capture_kafka_producer_queue_bytes_limit",
stats.msg_size_max as f64
);

for (topic, stats) in stats.topics {
gauge!(
"capture_kafka_produce_avg_batch_size_bytes",
stats.batchsize.avg as f64,
"topic" => topic.clone()
);
gauge!(
"capture_kafka_produce_avg_batch_size_events",
stats.batchcnt.avg as f64,
"topic" => topic
);
}

for (_, stats) in stats.brokers {
let id_string = format!("{}", stats.nodeid);
gauge!(
"capture_kafka_broker_requests_pending",
stats.outbuf_cnt as f64,
"broker" => id_string.clone()
);
gauge!(
"capture_kafka_broker_responses_awaiting",
stats.waitresp_cnt as f64,
"broker" => id_string.clone()
);
absolute_counter!(
"capture_kafka_broker_tx_errors_total",
stats.txerrs,
"broker" => id_string.clone()
);
absolute_counter!(
"capture_kafka_broker_rx_errors_total",
stats.rxerrs,
"broker" => id_string
);
}
}
}

#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer,
producer: FutureProducer<KafkaContext>,
topic: String,
}

impl KafkaSink {
pub fn new(topic: String, brokers: String) -> anyhow::Result<KafkaSink> {
let producer: FutureProducer = ClientConfig::new()
info!("connecting to Kafka brokers at {}...", brokers);
let producer: FutureProducer<KafkaContext> = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.create()?;
.set("statistics.interval.ms", "10000")
.create_with_context(KafkaContext)?;

// Ping the cluster to make sure we can reach brokers
_ = producer.client().fetch_metadata(
Some("__consumer_offsets"),
Timeout::After(Duration::new(10, 0)),
)?;
info!("connected to Kafka brokers");

Ok(KafkaSink { producer, topic })
}
}

impl KafkaSink {
async fn kafka_send(
producer: FutureProducer,
producer: FutureProducer<KafkaContext>,
topic: String,
event: ProcessedEvent,
) -> Result<(), CaptureError> {
Expand Down