Skip to content

Commit

Permalink
Merge commit 'a481b5677d3d643132fece3d3c043c15556d322a' into dekaf/ev…
Browse files Browse the repository at this point in the history
…erything_merged
  • Loading branch information
jshearer committed Feb 21, 2025
2 parents 48e0956 + a481b56 commit 334151e
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async-compression = { version = "0.3", features = [
async-stripe = { version = "0.37", features = ["runtime-tokio-hyper"] }
async-trait = "0.1"
atty = "0.2"
arc-swap = { version = "1.7.1" }
apache-avro = { version = "0.17.0", features = ["snappy"] }

base64 = "0.13"
Expand Down
1 change: 1 addition & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ unseal = { path = "../unseal" }
aes-siv = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
arc-swap = { workspace = true }
axum = { workspace = true }
axum-extra = { workspace = true }
axum-server = { workspace = true }
Expand Down
48 changes: 27 additions & 21 deletions crates/dekaf/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::log_appender::{self, GazetteWriter, TaskForwarder};
use futures::Future;
use lazy_static::lazy_static;
use rand::Rng;
use std::sync::Arc;
use tracing::{level_filters::LevelFilter, Instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};

Expand All @@ -10,12 +11,12 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
// from the point at which you call `forward_logs()` downwards will get forwarded to the same journal.
tokio::task_local! {
static TASK_FORWARDER: TaskForwarder<GazetteWriter>;
static LOG_LEVEL: std::cell::Cell<ops::LogLevel>;
static LOG_LEVEL: Arc<arc_swap::ArcSwap<tracing_subscriber::filter::Targets>>;
}

pub fn install() {
// Build a tracing_subscriber::Filter which uses our dynamic log level.
let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, _cx| {
let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, ctx| {
if metadata
.fields()
.iter()
Expand All @@ -24,20 +25,9 @@ pub fn install() {
return false;
}

let cur_level = match metadata.level().as_str() {
"TRACE" => ops::LogLevel::Trace as i32,
"DEBUG" => ops::LogLevel::Debug as i32,
"INFO" => ops::LogLevel::Info as i32,
"WARN" => ops::LogLevel::Warn as i32,
"ERROR" => ops::LogLevel::Error as i32,
_ => ops::LogLevel::UndefinedLevel as i32,
};

cur_level
<= LOG_LEVEL
.try_with(|log_level| log_level.get())
.unwrap_or(ops::LogLevel::Info)
.into()
LOG_LEVEL
.try_with(|filter| filter.load().enabled(&metadata, ctx.to_owned()))
.unwrap_or_else(|_| metadata.level() <= &tracing::metadata::Level::INFO)
});

// We want to be able to control Dekaf's own logging output via the RUST_LOG environment variable like usual.
Expand All @@ -51,6 +41,7 @@ pub fn install() {

let registry = tracing_subscriber::registry()
.with(tracing_record_hierarchical::HierarchicalRecord::default())
.with(fmt_layer)
.with(
ops::tracing::Layer::new(
|log| {
Expand All @@ -59,8 +50,7 @@ pub fn install() {
std::time::SystemTime::now,
)
.with_filter(log_filter),
)
.with(fmt_layer);
);

registry.init();
}
Expand All @@ -75,6 +65,18 @@ lazy_static! {
};
}

fn build_log_filter(level: ops::LogLevel) -> tracing_subscriber::filter::Targets {
let filter = match level {
ops::LogLevel::Error => "error",
ops::LogLevel::Warn => "warn",
ops::LogLevel::Info | ops::LogLevel::UndefinedLevel => "warn,dekaf=info",
ops::LogLevel::Debug => "debug,simple_crypt=warn,aws_config=warn,h2=warn",
ops::LogLevel::Trace => "trace,simple_crypt=warn,aws_config=warn,h2=warn",
};

filter.parse().expect("Filters should be correct")
}

/// Capture all log messages emitted by the passed future and all of its descendants, and writes them out
/// based on the behavior of the provided writer. Initially, log messages will get buffered in a circular
/// queue until such time as the forwarder is informed of the name of the journal to emit them into. Then,
Expand All @@ -90,7 +92,9 @@ where
let forwarder = TaskForwarder::new(PRODUCER.to_owned(), writer);

LOG_LEVEL.scope(
ops::LogLevel::Info.into(),
Arc::new(arc_swap::ArcSwap::new(Arc::new(build_log_filter(
ops::LogLevel::Info,
)))),
TASK_FORWARDER.scope(
forwarder,
fut.instrument(tracing::info_span!(
Expand All @@ -111,7 +115,7 @@ pub fn propagate_task_forwarder<F, O>(fut: F) -> impl Future<Output = O>
where
F: Future<Output = O>,
{
let current_level = LOG_LEVEL.get();
let current_level = LOG_LEVEL.with(|l| l.clone());
let current_forwarder = TASK_FORWARDER.get();

LOG_LEVEL.scope(
Expand All @@ -125,5 +129,7 @@ pub fn get_log_forwarder() -> TaskForwarder<GazetteWriter> {
}

pub fn set_log_level(level: ops::LogLevel) {
LOG_LEVEL.with(|cell| cell.set(level))
LOG_LEVEL.with(|current_level| {
current_level.store(Arc::new(build_log_filter(level)));
})
}

0 comments on commit 334151e

Please sign in to comment.