Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate IPA code to new metrics engine #1379

Merged
merged 3 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ web-app = [
"http-body",
"http-body-util",
]
test-fixture = ["weak-field"]
test-fixture = ["weak-field", "ipa-metrics-tracing", "ipa-metrics/partitions"]
# Include observability instruments that detect lack of progress inside MPC. If there is a bug that leads to helper
# miscommunication, this feature helps to detect it. Turning it on has some cost.
# If "shuttle" feature is enabled, turning this on has no effect.
Expand Down Expand Up @@ -86,6 +86,8 @@ ipa-prf = []
relaxed-dp = []

[dependencies]
ipa-metrics = { path = "../ipa-metrics" }
ipa-metrics-tracing = { optional = true, path = "../ipa-metrics-tracing" }
ipa-step = { version = "*", path = "../ipa-step" }
ipa-step-derive = { version = "*", path = "../ipa-step-derive" }

Expand Down Expand Up @@ -128,9 +130,6 @@ hyper-util = { version = "0.1.3", optional = true, features = ["http2"] }
http-body-util = { version = "0.1.1", optional = true }
http-body = { version = "1", optional = true }
iai = { version = "0.1.1", optional = true }
metrics = "0.21.0"
metrics-tracing-context = "0.14.0"
metrics-util = { version = "0.15.0" }
once_cell = "1.18"
pin-project = "1.0"
rand = "0.8"
Expand Down Expand Up @@ -175,6 +174,8 @@ permutation = "0.4.1"
proptest = "1.4"
rustls = { version = "0.23" }
tempfile = "3"
ipa-metrics-tracing = { path = "../ipa-metrics-tracing" }
ipa-metrics = { path = "../ipa-metrics", features = ["partitions"] }


[lib]
Expand Down
61 changes: 39 additions & 22 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
use hyper::http::uri::Scheme;
use ipa_core::{
cli::{
client_config_setup, keygen, test_setup, ConfGenArgs, KeygenArgs, TestSetupArgs, Verbosity,
client_config_setup, keygen, test_setup, ConfGenArgs, KeygenArgs, LoggingHandle,
TestSetupArgs, Verbosity,
},
config::{hpke_registry, HpkeServerConfig, NetworkConfig, ServerConfig, TlsConfig},
error::BoxError,
Expand Down Expand Up @@ -113,7 +114,7 @@
.map_err(|e| format!("failed to open file {}: {e:?}", path.display()))?)
}

async fn server(args: ServerArgs) -> Result<(), BoxError> {
async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), BoxError> {

Check warning on line 117 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L117

Added line #L117 was not covered by tests
let my_identity = HelperIdentity::try_from(args.identity.expect("enforced by clap")).unwrap();

let (identity, server_tls) = match (args.tls_cert, args.tls_key) {
Expand All @@ -136,7 +137,7 @@
private_key_file: sk_path,
});

let query_runtime = new_query_runtime();
let query_runtime = new_query_runtime(&logging_handle);

Check warning on line 140 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L140

Added line #L140 was not covered by tests
let app_config = AppConfig::default()
.with_key_registry(hpke_registry(mk_encryption.as_ref()).await?)
.with_active_work(args.active_work)
Expand Down Expand Up @@ -165,7 +166,7 @@
let shard_server_config = server_config.clone();
// ---

let http_runtime = new_http_runtime();
let http_runtime = new_http_runtime(&logging_handle);

Check warning on line 169 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L169

Added line #L169 was not covered by tests
let clients = MpcHelperClient::from_conf(
&IpaRuntime::from_tokio_runtime(&http_runtime),
&network_config,
Expand Down Expand Up @@ -230,18 +231,26 @@
/// if for some reason query runtime becomes overloaded.
/// When multi-threading feature is enabled it creates a runtime with thread-per-core,
/// otherwise a single-threaded runtime is created.
fn new_http_runtime() -> Runtime {
fn new_http_runtime(logging_handle: &LoggingHandle) -> Runtime {

Check warning on line 234 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L234

Added line #L234 was not covered by tests
if cfg!(feature = "multi-threading") {
tokio::runtime::Builder::new_multi_thread()
.thread_name("http-worker")
.enable_all()
logging_handle
.metrics_handle
.tokio_bind(
tokio::runtime::Builder::new_multi_thread()
.thread_name("http-worker")
.enable_all(),
)

Check warning on line 242 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L236-L242

Added lines #L236 - L242 were not covered by tests
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("http-worker")
.enable_all()
logging_handle
.metrics_handle
.tokio_bind(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("http-worker")
.enable_all(),
)

Check warning on line 253 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L246-L253

Added lines #L246 - L253 were not covered by tests
.build()
.unwrap()
}
Expand All @@ -250,21 +259,29 @@
/// This function creates a runtime suitable for executing MPC queries.
/// When multi-threading feature is enabled it creates a runtime with thread-per-core,
/// otherwise a single-threaded runtime is created.
fn new_query_runtime() -> Runtime {
fn new_query_runtime(logging_handle: &LoggingHandle) -> Runtime {

Check warning on line 262 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L262

Added line #L262 was not covered by tests
// it is intentional that IO driver is not enabled here (enable_time() call only).
// query runtime is supposed to use CPU/memory only, no writes to disk and all
// network communication is handled by HTTP runtime.
if cfg!(feature = "multi-threading") {
tokio::runtime::Builder::new_multi_thread()
.thread_name("query-executor")
.enable_time()
logging_handle
.metrics_handle
.tokio_bind(
tokio::runtime::Builder::new_multi_thread()
.thread_name("query-executor")
.enable_time(),
)

Check warning on line 273 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L267-L273

Added lines #L267 - L273 were not covered by tests
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("query-executor")
.enable_time()
logging_handle
.metrics_handle
.tokio_bind(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.thread_name("query-executor")
.enable_time(),
)

Check warning on line 284 in ipa-core/src/bin/helper.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/bin/helper.rs#L277-L284

Added lines #L277 - L284 were not covered by tests
.build()
.unwrap()
}
Expand All @@ -275,10 +292,10 @@
#[tokio::main(flavor = "current_thread")]
pub async fn main() {
let args = Args::parse();
let _handle = args.logging.setup_logging();
let handle = args.logging.setup_logging();

let res = match args.command {
None => server(args.server).await,
None => server(args.server, handle).await,
Some(HelperCommand::Keygen(args)) => keygen(&args),
Some(HelperCommand::TestSetup(args)) => test_setup(args),
Some(HelperCommand::Confgen(args)) => client_config_setup(args),
Expand Down
72 changes: 40 additions & 32 deletions ipa-core/src/cli/metric_collector.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,56 @@
use std::{io::stderr, thread};
use std::{io, thread, thread::JoinHandle};

use metrics_tracing_context::TracingContextLayer;
use metrics_util::{
debugging::{DebuggingRecorder, Snapshotter},
layers::Layer,
use ipa_metrics::{
MetricChannelType, MetricsCollectorController, MetricsCurrentThreadContext, MetricsProducer,
};
use tokio::runtime::Builder;

use crate::telemetry::stats::Metrics;

/// Collects metrics using `DebuggingRecorder` and dumps them to `stderr` when dropped.
/// Holds a reference to metrics controller and producer
pub struct CollectorHandle {
snapshotter: Snapshotter,
thread_handle: JoinHandle<()>,
/// This will be used once we start consuming metrics
_controller: MetricsCollectorController,
producer: MetricsProducer,
}

///
/// Initializes this collector by installing `DebuggingRecorder` to keep track of metrics
/// emitted from different parts of the app.
///
/// ## Panics
/// Panics if metric recorder has already been set
#[must_use]
pub fn install_collector() -> CollectorHandle {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

// use span fields as dimensions for metric
let recorder = TracingContextLayer::all().layer(recorder);
metrics::set_boxed_recorder(Box::new(recorder))
.expect("Metric recorder has been installed already");

// register metrics
crate::telemetry::metrics::register();
tracing::info!("Metrics enabled");

CollectorHandle { snapshotter }
/// ## Errors
/// If it fails to start a new thread
pub fn install_collector() -> io::Result<CollectorHandle> {
let (producer, controller, handle) =
ipa_metrics::install_new_thread(MetricChannelType::Unbounded)?;
tracing::info!("Metrics engine is enabled");

Ok(CollectorHandle {
thread_handle: handle,
_controller: controller,
producer,
})
}

impl Drop for CollectorHandle {
fn drop(&mut self) {
if !thread::panicking() {
let stats = Metrics::from_snapshot(self.snapshotter.snapshot());
stats
.print(&mut stderr())
.expect("Failed to dump metrics to stderr");
}
if !thread::panicking() && !self.thread_handle.is_finished() {
tracing::warn!("Metrics thread is still running");
};

Check warning on line 38 in ipa-core/src/cli/metric_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/cli/metric_collector.rs#L38

Added line #L38 was not covered by tests
}
}

impl CollectorHandle {
pub fn tokio_bind<'a>(&self, target: &'a mut Builder) -> &'a mut Builder {
let flush_fn = || MetricsCurrentThreadContext::flush();

Check warning on line 44 in ipa-core/src/cli/metric_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/cli/metric_collector.rs#L43-L44

Added lines #L43 - L44 were not covered by tests

target
.on_thread_start({
let producer = self.producer.clone();
move || {
producer.install();
}
})
.on_thread_stop(flush_fn)
.on_thread_park(flush_fn)

Check warning on line 54 in ipa-core/src/cli/metric_collector.rs

View check run for this annotation

Codecov / codecov/patch

ipa-core/src/cli/metric_collector.rs#L46-L54

Added lines #L46 - L54 were not covered by tests
}
}
2 changes: 1 addition & 1 deletion ipa-core/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ pub use metric_collector::{install_collector, CollectorHandle};
pub use paths::PathExt as CliPaths;
#[cfg(feature = "web-app")]
pub use test_setup::{test_setup, TestSetupArgs};
pub use verbosity::Verbosity;
pub use verbosity::{LoggingHandle, Verbosity};
24 changes: 10 additions & 14 deletions ipa-core/src/cli/verbosity.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::io::{stderr, IsTerminal};

use clap::Parser;
use metrics_tracing_context::MetricsLayer;
use tracing::{info, metadata::LevelFilter, Level};
use tracing_subscriber::{
fmt, fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter,
Expand All @@ -24,11 +23,14 @@ pub struct Verbosity {
}

pub struct LoggingHandle {
#[allow(dead_code)] // we care about handle's drop semantic so it is ok to not read it
metrics_handle: Option<CollectorHandle>,
pub metrics_handle: CollectorHandle,
}

impl Verbosity {
/// Sets up logging and metrics infrastructure
///
/// ## Panics
/// If metrics failed to setup
#[must_use]
pub fn setup_logging(&self) -> LoggingHandle {
let filter_layer = self.log_filter();
Expand All @@ -39,20 +41,14 @@ impl Verbosity {
.with_ansi(std::io::stderr().is_terminal())
.with_writer(stderr);

let registry = tracing_subscriber::registry()
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer);
.with(fmt_layer)
.init();

if cfg!(feature = "disable-metrics") {
registry.init();
} else {
registry.with(MetricsLayer::new()).init();
}
let metrics_handle = install_collector().expect("Can install metrics");

let handle = LoggingHandle {
metrics_handle: (!self.quiet && !cfg!(feature = "disable-metrics"))
.then(install_collector),
};
let handle = LoggingHandle { metrics_handle };
set_global_panic_hook();

handle
Expand Down
13 changes: 7 additions & 6 deletions ipa-core/src/helpers/gateway/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use dashmap::{mapref::entry::Entry, DashMap};
use futures::Stream;
use ipa_metrics::counter;
#[cfg(all(test, feature = "shuttle"))]
use shuttle::future as tokio;
use typenum::Unsigned;
Expand Down Expand Up @@ -158,13 +159,13 @@ impl<I: TransportIdentity, M: Message> SendingEnd<I, M> {
))]
pub async fn send<B: Borrow<M>>(&self, record_id: RecordId, msg: B) -> Result<(), Error<I>> {
let r = self.inner.send(record_id, msg).await;
metrics::increment_counter!(RECORDS_SENT,
STEP => self.inner.channel_id.gate.as_ref().to_string(),
ROLE => self.sender_id.as_str(),
counter!(RECORDS_SENT, 1,
STEP => &self.inner.channel_id.gate,
ROLE => &self.sender_id
);
metrics::counter!(BYTES_SENT, M::Size::U64,
STEP => self.inner.channel_id.gate.as_ref().to_string(),
ROLE => self.sender_id.as_str(),
counter!(BYTES_SENT, M::Size::U64,
STEP => &self.inner.channel_id.gate,
ROLE => &self.sender_id
);

r
Expand Down
Loading