diff --git a/Cargo.lock b/Cargo.lock index a93a6e33ba..cafeef166d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1550,6 +1550,7 @@ dependencies = [ "candid", "candid_parser", "cargo_metadata", + "chrono", "ci_info", "clap", "clap_complete", @@ -1623,6 +1624,7 @@ dependencies = [ "tokio", "toml", "url", + "uuid", "walkdir", "walrus", "which", @@ -6795,9 +6797,12 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.12.1" +version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3758f5e68192bb96cc8f9b7e2c2cfdabb435499a28499a42f8f984092adad4b" +checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587" +dependencies = [ + "getrandom 0.3.1", +] [[package]] name = "valuable" diff --git a/src/dfx/Cargo.toml b/src/dfx/Cargo.toml index 5bb99608bf..8f73145e89 100644 --- a/src/dfx/Cargo.toml +++ b/src/dfx/Cargo.toml @@ -44,6 +44,7 @@ bytes.workspace = true candid = { workspace = true } candid_parser = { workspace = true, features = ["random", "assist"] } cargo_metadata = "0.18.1" +chrono = "0.4.39" ci_info = "0.14" clap = { workspace = true, features = [ "derive", @@ -124,6 +125,7 @@ time = { workspace = true, features = [ ] } tokio = { workspace = true, features = ["full"] } url.workspace = true +uuid = { version = "1.15.1", features = [ "v4", ] } walkdir.workspace = true walrus = "0.21.1" which = "4.2.5" diff --git a/src/dfx/src/commands/mod.rs b/src/dfx/src/commands/mod.rs index 64781dd139..01c20aafd0 100644 --- a/src/dfx/src/commands/mod.rs +++ b/src/dfx/src/commands/mod.rs @@ -27,6 +27,7 @@ mod ping; mod quickstart; mod remote; mod schema; +mod send_telemetry; mod start; mod stop; mod toolchain; @@ -60,6 +61,8 @@ pub enum DfxCommand { Quickstart(quickstart::QuickstartOpts), Remote(remote::RemoteOpts), Schema(schema::SchemaOpts), + #[command(name = "_send-telemetry", hide = true)] + SendTelemetry(send_telemetry::SendTelemetryOpts), Start(start::StartOpts), Stop(stop::StopOpts), #[command(hide = true)] @@ -94,6 +97,7 @@ pub fn exec(env: &dyn Environment, cmd: DfxCommand) -> DfxResult { DfxCommand::Quickstart(v) => quickstart::exec(env, v), DfxCommand::Remote(v) => remote::exec(env, v), DfxCommand::Schema(v) => schema::exec(v), + DfxCommand::SendTelemetry(v) => send_telemetry::exec(v), DfxCommand::Start(v) => start::exec(env, v), DfxCommand::Stop(v) => stop::exec(env, v), DfxCommand::Toolchain(v) => toolchain::exec(env, v), @@ -105,6 +109,7 @@ pub fn exec(env: &dyn Environment, cmd: DfxCommand) -> DfxResult { pub fn exec_without_env(cmd: DfxCommand) -> DfxResult { match cmd { DfxCommand::Schema(v) => schema::exec(v), + DfxCommand::SendTelemetry(v) => send_telemetry::exec(v), _ => bail!("Cannot execute this command without environment."), } } diff --git a/src/dfx/src/commands/send_telemetry.rs b/src/dfx/src/commands/send_telemetry.rs new file mode 100644 index 0000000000..320037bcaa --- /dev/null +++ b/src/dfx/src/commands/send_telemetry.rs @@ -0,0 +1,20 @@ +use crate::lib::error::DfxResult; +use crate::lib::telemetry::Telemetry; +use clap::Parser; +use url::Url; + +const DEFAULT_URL: &str = "https://sdk.telemetry.dfinity.network"; + +#[derive(Parser)] +#[command(hide = true)] +pub struct SendTelemetryOpts { + #[clap(long)] + url: Option<String>, +} + +pub fn exec(opts: SendTelemetryOpts) -> DfxResult { + let url = opts.url.unwrap_or_else(|| DEFAULT_URL.to_string()); + let url = Url::parse(&url)?; + + Telemetry::send(&url) +} diff --git a/src/dfx/src/lib/telemetry.rs b/src/dfx/src/lib/telemetry.rs index dd26f3232f..1550106902 100644 --- a/src/dfx/src/lib/telemetry.rs +++ b/src/dfx/src/lib/telemetry.rs @@ -4,25 +4,34 @@ use crate::config::dfx_version; use crate::lib::error::DfxResult; use crate::CliOpts; use anyhow::Context; +use chrono::{Datelike, Local, NaiveDateTime}; use clap::parser::ValueSource; use clap::{ArgMatches, Command, CommandFactory}; use dfx_core::config::directories::project_dirs; use dfx_core::config::model::dfinity::TelemetryState; use dfx_core::fs; -use fd_lock::RwLock as FdRwLock; +use fd_lock::{RwLock as FdRwLock, RwLockWriteGuard}; +use fn_error_context::context; +use reqwest::StatusCode; use semver::Version; use serde::Serialize; use std::ffi::OsString; -use std::fs::OpenOptions; -use std::io::Write; -use std::path::PathBuf; +use std::fs::{File, OpenOptions}; +use std::io::Seek; +use std::io::{Read, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::process::Stdio; use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; +use url::Url; +use uuid::Uuid; use super::environment::Environment; static TELEMETRY: OnceLock<Option<Mutex<Telemetry>>> = OnceLock::new(); +const SEND_SIZE_THRESHOLD_BYTES: u64 = 256 * 1024; + #[derive(Clone, Debug, Eq, PartialEq, Serialize)] #[serde(rename_all = "kebab-case")] enum ArgumentSource { @@ -43,19 +52,29 @@ pub struct Telemetry { arguments: Vec<Argument>, elapsed: Option<Duration>, platform: String, + week: Option<String>, + publish: bool, } impl Telemetry { pub fn init(mode: TelemetryState) { if mode.should_collect() { TELEMETRY - .set(Some(Mutex::new(Telemetry::default()))) + .set(Some(Mutex::new( + Telemetry::default().with_publish(mode.should_publish()), + ))) .expect("Telemetry already initialized"); } else { TELEMETRY.set(None).expect("Telemetry already initialized"); } } + fn with_publish(&self, publish: bool) -> Self { + let mut new = self.clone(); + new.publish = publish; + new + } + pub fn set_command_and_arguments(args: &[OsString]) -> DfxResult { try_with_telemetry(|telemetry| { let arg_matches = CliOpts::command().try_get_matches_from(args)?; @@ -74,12 +93,20 @@ impl Telemetry { }) } + pub fn get_telemetry_dir() -> DfxResult<PathBuf> { + Ok(project_dirs()?.cache_dir().join("telemetry")) + } + pub fn get_log_path() -> DfxResult<PathBuf> { - let path = project_dirs()? - .cache_dir() - .join("telemetry") - .join("telemetry.log"); - Ok(path) + Ok(Self::get_telemetry_dir()?.join("telemetry.log")) + } + + pub fn get_send_time_path() -> DfxResult<PathBuf> { + Ok(Self::get_telemetry_dir()?.join("send-time.txt")) + } + + pub fn get_send_dir() -> DfxResult<PathBuf> { + Ok(Self::get_telemetry_dir()?.join("send")) } pub fn set_platform() { @@ -93,6 +120,14 @@ impl Telemetry { }); } + pub fn set_week() { + with_telemetry(|telemetry| { + let iso_week = Local::now().naive_local().iso_week(); + let week = format!("{:04}-{:02}", iso_week.year(), iso_week.week()); + telemetry.week = Some(week); + }); + } + pub fn set_elapsed(elapsed: Duration) { with_telemetry(|telemetry| { telemetry.elapsed = Some(elapsed); @@ -123,6 +158,7 @@ impl Telemetry { command: &telemetry.command, platform: &telemetry.platform, parameters: &telemetry.arguments, + week: telemetry.week.as_deref(), exit_code, execution_time_ms: telemetry.elapsed.map(|e| e.as_millis()), replica_error_call_site: None, @@ -137,6 +173,265 @@ impl Telemetry { Ok(()) }) } + + pub fn maybe_publish() -> DfxResult { + try_with_telemetry(|telemetry| { + if telemetry.publish && (Self::check_send_time()? || Self::check_file_size()?) { + Self::launch_publisher()?; + } + + Ok(()) + }) + } + + #[context("failed to launch publisher")] + pub fn launch_publisher() -> DfxResult { + let mut exe = std::env::current_exe()?; + let mut cmd = std::process::Command::new(exe); + cmd.arg("_send-telemetry") + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()); + + #[cfg(unix)] + { + use std::os::unix::process::CommandExt; + cmd.process_group(0); // Detach from parent’s process group + } + + #[cfg(windows)] + { + use std::os::windows::process::CommandExt; + cmd.creation_flags(0x08000000); // CREATE_NO_WINDOW (prevents it from being killed if parent exits) + } + + cmd.spawn()?; // Spawn and immediately detach + + Ok(()) + } + + // look at telemetry/telemetry.log file size to see if it's time to send + fn check_file_size() -> DfxResult<bool> { + let path = Self::get_log_path()?; + let filesize = fs::metadata(&path).map(|m| m.len()).unwrap_or(0); + Ok(filesize >= SEND_SIZE_THRESHOLD_BYTES) + } + + // Look at telemetry/send-time.txt to see if it's time to send + #[context("failed to check send trigger")] + fn check_send_time() -> DfxResult<bool> { + let send_time_path = Self::get_send_time_path()?; + + let file = match OpenOptions::new().read(true).open(&send_time_path) { + Ok(file) => file, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + Self::try_create_send_time(&send_time_path)?; + return Ok(false); + } + Err(e) => return Err(e.into()), + }; + + let readlock = FdRwLock::new(file); + let Ok(readguard) = readlock.try_read() else { + return Ok(false); + }; + + let Ok(send_time) = Self::read_send_time(&send_time_path) else { + // If there's some problem reading the send time, trigger sending. + // This will overwrite the file with a new send time. + return Ok(true); + }; + + let current_time = Local::now().naive_local(); + Ok(send_time <= current_time) + } + + fn try_create_send_time(path: &Path) -> DfxResult { + let file = match OpenOptions::new().write(true).create_new(true).open(path) { + Ok(file) => file, + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + let mut lock = FdRwLock::new(file); + if let Ok(mut write_guard) = lock.try_write() { + Self::write_send_time(&mut write_guard, None)?; + } + Ok(()) + } + + fn read_send_time(path: &Path) -> DfxResult<NaiveDateTime> { + let send_time = fs::read_to_string(path)?; + let send_time = send_time.trim(); + let send_time = NaiveDateTime::parse_from_str(send_time, "%Y-%m-%d %H:%M:%S") + .with_context(|| format!("failed to parse send time: {:?}", send_time))?; + Ok(send_time) + } + + fn write_send_time( + guard: &mut RwLockWriteGuard<File>, + future_duration: Option<Duration>, + ) -> DfxResult { + let future_duration = future_duration.unwrap_or_else(|| { + // random 5-7 days in the future + let future_seconds = 86400.0 * (5.0 + rand::random::<f64>() * 2.0); + Duration::from_secs(future_seconds as u64) + }); + let future_time = Local::now().naive_local() + future_duration; + let future_time_str = future_time.format("%Y-%m-%d %H:%M:%S").to_string(); + + writeln!(*guard, "{}", future_time_str)?; + Ok(()) + } + + pub fn send(url: &Url) -> DfxResult { + fs::create_dir_all(&Self::get_telemetry_dir()?)?; + let send_time_path = Self::get_send_time_path()?; + + let mut file = FdRwLock::new( + OpenOptions::new() + .create(true) + .write(true) + .read(true) + .truncate(true) + .open(send_time_path)?, + ); + + let Ok(mut lock) = file.try_write() else { + // another instance of _send-telemetry is already running + return Ok(()); + }; + + let thirty_minutes = 30 * 60; + Self::write_send_time(&mut lock, Some(Duration::from_secs(thirty_minutes)))?; + + Self::move_active_log_for_send()?; + Self::transmit_any_batches(url)?; + + lock.seek(SeekFrom::Start(0))?; + Self::write_send_time(&mut lock, None)?; + Ok(()) + } + + fn move_active_log_for_send() -> DfxResult { + let log_path = Self::get_log_path()?; + if !log_path.exists() { + return Ok(()); + } + + let send_dir = Self::get_send_dir()?; + fs::create_dir_all(&send_dir)?; + + let batch_id = Uuid::new_v4(); + eprintln!("Assigning telemetry.log contents to batch {:?}", batch_id); + let batch_path = send_dir.join(batch_id.to_string()); + + let mut file = FdRwLock::new( + OpenOptions::new() + .create(true) + .append(true) + .open(&log_path)?, + ); + let lock = file.write()?; + + fs::rename(&log_path, &batch_path)?; + Ok(()) + } + + fn transmit_any_batches(url: &Url) -> DfxResult { + let batches = Self::list_batches()?; + + eprintln!("Batches to send:"); + for batch in &batches { + eprintln!(" {:?}", batch); + } + + batches + .iter() + .map(|batch| Self::transmit_batch(batch, url)) + .find_map(Result::err) + .map_or(Ok(()), Err) + } + + fn transmit_batch(batch: &Uuid, url: &Url) -> DfxResult { + eprintln!("Transmitting batch: {:?}", batch); + let batch_path = Self::get_send_dir()?.join(batch.to_string()); + + let original_content = fs::read_to_string(&batch_path)?; + let final_payload = Self::add_batch_and_sequence_to_batch(original_content, batch)?; + + let client = reqwest::blocking::Client::new(); + + let op = || { + client + .post(url.as_str()) + .body(final_payload.clone()) + .send() + .map_err(backoff::Error::transient) + .and_then(|response| { + response + .error_for_status() + .map_err(backoff::Error::transient) + }) + .map(|_| ()) + }; + let notify = |err, dur| { + println!("Error happened at {:?}: {}", dur, err); + }; + + let policy = backoff::ExponentialBackoffBuilder::default() + .with_max_elapsed_time(Some(Duration::from_secs(180))) + .build(); + + backoff::retry_notify(policy, op, notify)?; + + fs::remove_file(&batch_path)?; + + Ok(()) + } + + fn add_batch_and_sequence_to_batch(content: String, batch: &Uuid) -> DfxResult<String> { + // Process each line, adding batch ID and sequence number + let modified_json_docs: Vec<String> = content + .lines() + .enumerate() + .map(|(idx, line)| Self::add_batch_and_sequence(line, batch, idx as u64)) + .collect::<Result<_, _>>()?; + + // Reassemble into a newline-delimited JSON string + Ok(modified_json_docs.join("\n")) + } + + fn add_batch_and_sequence(content: &str, batch: &Uuid, sequence: u64) -> DfxResult<String> { + let mut json: serde_json::Value = serde_json::from_str(content)?; + + json["batch"] = serde_json::Value::String(batch.to_string()); + json["sequence"] = serde_json::Value::Number(sequence.into()); + + serde_json::to_string(&json).map_err(|e| e.into()) + } + + fn list_batches() -> DfxResult<Vec<Uuid>> { + let send_dir = Self::get_send_dir()?; + if !send_dir.exists() { + return Ok(vec![]); + } + let send_dir = Self::get_send_dir()?; + let dir_content = dfx_core::fs::read_dir(&send_dir)?; + + let batches = dir_content + .filter_map(|v| { + let dir_entry = v.ok()?; + if dir_entry.file_type().is_ok_and(|e| e.is_file()) { + Uuid::parse_str(&dir_entry.file_name().to_string_lossy()).ok() + } else { + None + } + }) + .collect(); + Ok(batches) + } } fn try_with_telemetry(f: impl FnOnce(&mut Telemetry) -> DfxResult) -> DfxResult { @@ -159,6 +454,7 @@ struct CommandRecord<'a> { version: &'a Version, command: &'a str, platform: &'a str, + week: Option<&'a str>, parameters: &'a [Argument], exit_code: i32, execution_time_ms: Option<u128>, diff --git a/src/dfx/src/main.rs b/src/dfx/src/main.rs index cdc05351c3..7bff257134 100644 --- a/src/dfx/src/main.rs +++ b/src/dfx/src/main.rs @@ -160,10 +160,14 @@ fn inner_main(log_level: &mut Option<i64>) -> DfxResult { let _ = Telemetry::set_command_and_arguments(&args); Telemetry::set_platform(); + Telemetry::set_week(); let cli_opts = CliOpts::parse_from(args); - if matches!(cli_opts.command, commands::DfxCommand::Schema(_)) { + if matches!( + cli_opts.command, + commands::DfxCommand::Schema(_) | commands::DfxCommand::SendTelemetry(_) + ) { return commands::exec_without_env(cli_opts.command); } @@ -207,6 +211,11 @@ fn main() { eprintln!("error appending to telemetry log: {e}") } } + if let Err(e) = Telemetry::maybe_publish() { + if log_level.unwrap_or_default() > 0 { + eprintln!("error transmitting telemetry: {e}") + } + } std::process::exit(exit_code); }