diff --git a/.github/ISSUE_TEMPLATE/BUG_REPORT.yaml b/.github/ISSUE_TEMPLATE/BUG_REPORT.yaml index dbcb450..d8dfa06 100644 --- a/.github/ISSUE_TEMPLATE/BUG_REPORT.yaml +++ b/.github/ISSUE_TEMPLATE/BUG_REPORT.yaml @@ -15,9 +15,11 @@ body: label: Preliminary checks description: Please make sure that you verify each checkbox and follow the instructions for them. options: - - label: "Before opening a new issue, please search existing issues: https://github.com/maksimryndin/goral/issues. Perhaps, the bug has been already reported or solved" + - label: "This issue is a bug report. For a feature request or other question feel free to use a blank issue." required: true - - label: "This issue is not a feature request or other - for that just feel free to use a blank issue." + - label: "I've updated Goral to the latest version: https://github.com/maksimryndin/goral/releases and the bug still exists" + required: true + - label: "I've searched existing issues https://github.com/maksimryndin/goral/issues for the same bug and there is nothing similar" required: true - type: textarea attributes: @@ -80,6 +82,7 @@ body: label: Which version of Goral do you run (`goral --version`)? multiple: false options: + - 0.1.7 - 0.1.6 - 0.1.5 - 0.1.4 diff --git a/.github/site/src/install.sh b/.github/site/src/install.sh index bf9c2c7..4d02bb5 100755 --- a/.github/site/src/install.sh +++ b/.github/site/src/install.sh @@ -21,7 +21,7 @@ main() { get_architecture || return 1 local _arch="$RETVAL" - local _version=${1:-'0.1.6'} + local _version=${1:-'0.1.7'} assert_nz "$_arch" "arch" local _file="goral-${_version}-${_arch}" diff --git a/.github/site/src/installation.md b/.github/site/src/installation.md index 9cf026a..7a91929 100644 --- a/.github/site/src/installation.md +++ b/.github/site/src/installation.md @@ -11,9 +11,9 @@ curl --proto '=https' --tlsv1.2 -sSf https://maksimryndin.github.io/goral/instal ```sh -wget https://github.com/maksimryndin/goral/releases/download/0.1.6/goral-0.1.6-x86_64-unknown-linux-gnu.tar.gz -tar -xzf goral-0.1.6-x86_64-unknown-linux-gnu.tar.gz -cd goral-0.1.6-x86_64-unknown-linux-gnu/ +wget https://github.com/maksimryndin/goral/releases/download/0.1.7/goral-0.1.7-x86_64-unknown-linux-gnu.tar.gz +tar -xzf goral-0.1.7-x86_64-unknown-linux-gnu.tar.gz +cd goral-0.1.7-x86_64-unknown-linux-gnu/ shasum -a 256 -c sha256_checksum.txt ``` @@ -23,7 +23,7 @@ shasum -a 256 -c sha256_checksum.txt ```sh curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -git clone --depth 1 --branch 0.1.6 https://github.com/maksimryndin/goral +git clone --depth 1 --branch 0.1.7 https://github.com/maksimryndin/goral cd goral RUSTFLAGS='-C target-feature=+crt-static' cargo build --release --target ``` diff --git a/.github/site/src/system.md b/.github/site/src/system.md index 068716a..7e2197c 100644 --- a/.github/site/src/system.md +++ b/.github/site/src/system.md @@ -37,6 +37,13 @@ With this configuration System service will create the following sheets: * `top_open_files` (for Linux only) - among the processes with the same user as Goral (!) - process with the most opened files * for every process with name containing one of the substrings in `names` - a sheet with process info. Note: the first match (_case sensitive_) is used so plan accordingly a unique name for your binary. * for every mount in `mounts` - disk usage and free space. +* `ssh` - for Linux systems ssh access log is monitored. There is a `status` field with the following values: + * `rejected` - ssh user is correct but the key or password is wrong. Also a catchall reason for other unsuccessful connections. + * `invalid_user` - an invalid ssh user (unexisting) was used. + * `timeout` - a timeout on ssh connection happened. + * `wrong_params` - no matching key exchange method found or an invalid format of the key + * `connected` - a successful ssh connection is established (by default there is a rule with a warning notification for this event) + * `terminated` - an ssh session (established earlier with `connected`) is terminated System service doesn't require root privileges to collect the telemetry. For a process a cpu usage percent may be [more than 100%](https://blog.guillaume-gomez.fr/articles/2021-09-06+sysinfo%3A+how+to+extract+systems%27+information) in a case of a multi-threaded process on several cores. `memory_used` by process is a [resident-set size](https://www.baeldung.com/linux/resident-set-vs-virtual-memory-size). diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cad9b3..45a4e50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +* 0.1.7 + * ssh log monitoring + * rules for text now support "is" and "is not" conditions + * more helpful message about usage limits + * remove access/refresh tokens for google oauth from logs at the debug level + * 0.1.6 * safe numbers conversions * ids collision tests diff --git a/Cargo.lock b/Cargo.lock index c8b6a8b..2ef99ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -657,7 +657,7 @@ dependencies = [ [[package]] name = "goral" -version = "0.1.6" +version = "0.1.7" dependencies = [ "anyhow", "async-trait", @@ -671,6 +671,7 @@ dependencies = [ "hyper", "hyper-rustls", "lazy_static", + "logwatcher2", "prometheus-parse", "prost", "psutil", @@ -985,6 +986,20 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "logwatcher2" +version = "0.2.1" +source = "git+https://github.com/maksimryndin/logwatcher2.git#e6c6511c92addc3ec2c51cb409f466f51d4126bb" + +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matches" version = "0.1.10" @@ -1435,8 +1450,17 @@ checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.5", + "regex-syntax 0.8.2", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1447,9 +1471,15 @@ checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -2228,12 +2258,16 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "serde", "serde_json", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", "tracing-serde", diff --git a/Cargo.toml b/Cargo.toml index bd402fd..cbbede3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "goral" -version = "0.1.6" +version = "0.1.7" edition = "2021" author = "Maksim Ryndin" license = "Apache-2.0" @@ -46,10 +46,11 @@ tokio = { version = "^1.0", features = ["sync", "signal", "io-std", "process", " tonic = { version = "^0.10", features = ["transport", "prost"]} tonic-health = "0.10.2" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["fmt", "json"] } +tracing-subscriber = { version = "0.3", features = ["fmt", "json", "env-filter"] } url = { version = "2", features = ["serde"] } [target.'cfg(target_os = "linux")'.dependencies] +logwatcher2 = { git = "https://github.com/maksimryndin/logwatcher2.git" } psutil = { version = "3.2.2", default-features = false, features = ["disk"]} [dev-dependencies] diff --git a/README.md b/README.md index 95749e5..488620f 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ So Goral provides the following features being deployed next to your app(s): * [Periodic healthchecks](https://maksimryndin.github.io/goral/healthcheck.html) (aka [liveness probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/)) * [Metrics collection](https://maksimryndin.github.io/goral/metrics.html) (fully compatible with Prometheus to be easily replaced with more advanced stack as your project grows) * [Logs](https://maksimryndin.github.io/goral/logs.html) collection (importing logs from stdout/stderr of the target process) -* [System telemetry](https://maksimryndin.github.io/goral/system.html) (CPU, Memory, Free/Busy storage space etc) +* [System telemetry](https://maksimryndin.github.io/goral/system.html) (CPU, Memory, Free/Busy storage space, ssh access log etc) * A general key-value appendable log storage (see [the user case](https://maksimryndin.github.io/goral/kv-log.html)) * Features are modular - all [services](https://maksimryndin.github.io/goral/services.html) are switched on/off in the configuration. * You can observe several instances of the same app or different apps on the same host with a single Goral daemon (except logs as logs are collected via stdin of Goral - see [Logs](https://maksimryndin.github.io/goral/logs.html)) diff --git a/src/google/datavalue.rs b/src/google/datavalue.rs index 726dc7e..ded96ad 100644 --- a/src/google/datavalue.rs +++ b/src/google/datavalue.rs @@ -54,7 +54,7 @@ pub(crate) enum Datavalue { pub struct Datarow { log_name: String, timestamp: NaiveDateTime, - data: Vec<(String, Datavalue)>, + pub(crate) data: Vec<(String, Datavalue)>, sheet_id: Option, } diff --git a/src/main.rs b/src/main.rs index d13ba35..6e6563f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::signal; use tokio::sync::broadcast; -use tracing_subscriber::filter::LevelFilter; +use tracing_subscriber::filter::{EnvFilter, LevelFilter}; use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -75,6 +75,14 @@ async fn start() -> Result<(), String> { )?; let level = LevelFilter::from_level(config.general.log_level); + let filter = EnvFilter::new("") + .add_directive(level.into()) + .add_directive( + "yup_oauth2=info" + .parse() + .expect("assert: tracing directive is properly set for yup_oauth2"), + ); + let (json, plain) = if args.json { ( Some(tracing_subscriber::fmt::layer().with_target(true).json()), @@ -85,7 +93,7 @@ async fn start() -> Result<(), String> { }; tracing_subscriber::registry() - .with(level) + .with(filter) .with(json) .with(plain) .init(); diff --git a/src/rules/mod.rs b/src/rules/mod.rs index 7736691..9af80d4 100644 --- a/src/rules/mod.rs +++ b/src/rules/mod.rs @@ -202,6 +202,12 @@ impl Rule { RuleCondition::IsNot if value.is_string() && value.as_str()? == NOT_AVAILABLE => { Datavalue::NotAvailable } + RuleCondition::Is if value.is_string() && value.as_str()? != NOT_AVAILABLE => { + Datavalue::Text(value.as_str()?.to_string()) + } + RuleCondition::IsNot if value.is_string() && value.as_str()? != NOT_AVAILABLE => { + Datavalue::Text(value.as_str()?.to_string()) + } RuleCondition::Contains if value.is_string() && value.as_str()? != NOT_AVAILABLE => { Datavalue::Text(value.as_str()?.to_string()) } @@ -285,6 +291,12 @@ impl Rule { (Datavalue::Bool(c), IsNot, Datavalue::NotAvailable) => { format!("{self} triggered for value {c}") } + (Datavalue::Text(c), Is, Datavalue::Text(v)) if c == v => { + format!("{self} triggered for value {c}") + } + (Datavalue::Text(c), IsNot, Datavalue::Text(v)) if c != v => { + format!("{self} triggered for value {c}") + } (Datavalue::Text(c), Contains, Datavalue::Text(v)) if c.contains(v) => { format!("{self} triggered for value {c}") } @@ -479,8 +491,8 @@ mod tests { None, ); assert!( - rule.is_none(), - "test assert: `is` cannot be a condition for a text value" + rule.is_some(), + "test assert: `is` can be a condition for a text value" ); let rule = Rule::try_from_values( @@ -511,8 +523,8 @@ mod tests { None, ); assert!( - rule.is_none(), - "test assert: `is not` cannot be a condition for a text value" + rule.is_some(), + "test assert: `is not` can be a condition for a text value" ); let rule = Rule::try_from_values( @@ -977,6 +989,54 @@ mod tests { _ => panic!("test assert: rule should trigger"), }; + let rule = Rule::try_from_values( + vec![ + json!(0.0), + json!("log_name1"), + json!("key"), + json!(IS_CONDITION), + json!("substring"), + json!(INFO_ACTION), + ], + None, + ); + let mut datarow = Datarow::new( + "log_name1".to_string(), + Utc::now().naive_utc(), + vec![("key".to_string(), Datavalue::Text("substring".to_string()))], + ); + datarow.sheet_id(TEST_HOST_ID, "test"); + match rule.unwrap().apply(&datarow.into()) { + RuleOutput::Process(Some(Triggered { action, .. })) => { + assert_eq!(action, Action::Info) + } + _ => panic!("test assert: rule should trigger"), + }; + + let rule = Rule::try_from_values( + vec![ + json!(0.0), + json!("log_name1"), + json!("key"), + json!(IS_NOT_CONDITION), + json!("first"), + json!(INFO_ACTION), + ], + None, + ); + let mut datarow = Datarow::new( + "log_name1".to_string(), + Utc::now().naive_utc(), + vec![("key".to_string(), Datavalue::Text("second".to_string()))], + ); + datarow.sheet_id(TEST_HOST_ID, "test"); + match rule.unwrap().apply(&datarow.into()) { + RuleOutput::Process(Some(Triggered { action, .. })) => { + assert_eq!(action, Action::Info) + } + _ => panic!("test assert: rule should trigger"), + }; + let rule = Rule::try_from_values( vec![ json!(0.0), diff --git a/src/services/system/mod.rs b/src/services/system/mod.rs index fc2e037..5c569b0 100644 --- a/src/services/system/mod.rs +++ b/src/services/system/mod.rs @@ -1,5 +1,7 @@ pub(crate) mod collector; pub(crate) mod configuration; +#[cfg(target_os = "linux")] +pub(crate) mod ssh; use crate::google::datavalue::{Datarow, Datavalue}; use crate::messenger::configuration::MessengerConfig; use crate::notifications::{MessengerApi, Notification, Sender}; @@ -82,12 +84,12 @@ impl SystemService { .map_err(|e| Data::Message(format!("sysinfo scraping error {e}"))); if sender.blocking_send(TaskResult { id: 0, result }).is_err() { if is_shutdown.load(Ordering::Relaxed) { + tracing::info!("exiting system info scraping thread"); return; } panic!("assert: sysinfo messages queue shouldn't be closed before shutdown signal"); } } - tracing::info!("exiting system info scraping thread"); } async fn make_timed_scrape( @@ -101,6 +103,21 @@ impl SystemService { } } + #[cfg(target_os = "linux")] + async fn ssh_observer( + is_shutdown: Arc, + sender: mpsc::Sender, + messenger: Sender, + ) { + tracing::info!("starting ssh monitoring"); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + std::thread::Builder::new() + .name("ssh-observer".into()) + .spawn(move || ssh::process_sshd_log(is_shutdown, sender, messenger, tx)) + .expect("assert: can spawn ssh monitoring thread"); + let _ = rx.await; + } + #[allow(clippy::too_many_arguments)] async fn sys_observer( is_shutdown: Arc, @@ -169,7 +186,20 @@ impl Service for SystemService { } fn get_example_rules(&self) -> Vec { - let mut rows = Vec::with_capacity(2 + self.mounts.len() + 2 * self.process_names.len()); + let mut rows = Vec::with_capacity(3 + self.mounts.len() + 2 * self.process_names.len()); + #[cfg(target_os = "linux")] + { + rows.push( + Rule { + log_name: ssh::SSH_LOG.to_string(), + key: ssh::SSH_LOG_STATUS.to_string(), + condition: RuleCondition::Is, + value: Datavalue::Text(ssh::SSH_LOG_STATUS_CONNECTED.to_string()), + action: Action::Warn, + } + .into(), + ); + } rows.push( Rule { log_name: collector::BASIC_LOG.to_string(), @@ -261,7 +291,7 @@ impl Service for SystemService { Ok(data) => data, Err(Data::Message(msg)) => { tracing::error!("{}", msg); - self.send_error(format!("`{}` while scraping sysinfo", msg)) + self.send_error(format!("`{}` while observing system", msg)) .await; Data::Empty } @@ -284,7 +314,19 @@ impl Service for SystemService { let names = self.process_names.clone(); let scrape_interval = self.scrape_interval; let scrape_timeout = self.scrape_timeout; - vec![tokio::spawn(async move { + + let mut tasks = vec![]; + #[cfg(target_os = "linux")] + { + let cloned_is_shutdown = is_shutdown.clone(); + let cloned_sender = sender.clone(); + let cloned_messenger = messenger.clone(); + tasks.push(tokio::spawn(async move { + Self::ssh_observer(cloned_is_shutdown, cloned_sender, cloned_messenger).await; + })); + } + + tasks.push(tokio::spawn(async move { Self::sys_observer( is_shutdown, scrape_interval, @@ -296,7 +338,8 @@ impl Service for SystemService { names, ) .await; - })] + })); + tasks } } diff --git a/src/services/system/ssh.rs b/src/services/system/ssh.rs new file mode 100644 index 0000000..20dd51c --- /dev/null +++ b/src/services/system/ssh.rs @@ -0,0 +1,458 @@ +use crate::google::datavalue::{Datarow, Datavalue}; +use crate::notifications::{Notification, Sender}; +use crate::services::{Data, TaskResult}; +use chrono::{NaiveDateTime, Utc}; +use lazy_static::lazy_static; +use logwatcher::{LogWatcher, LogWatcherAction, LogWatcherEvent}; +use regex::Regex; +use std::collections::HashMap; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use tokio::sync::mpsc; +use tokio::sync::oneshot::Sender as TokioSender; +use tracing::Level; + +pub const SSH_LOG: &str = "ssh"; +pub const SSH_LOG_STATUS: &str = "status"; +pub const SSH_LOG_STATUS_CONNECTED: &str = "connected"; +pub const SSH_LOG_STATUS_TERMINATED: &str = "terminated"; + +pub(super) fn process_sshd_log( + is_shutdown: Arc, + sender: mpsc::Sender, + messenger: Sender, + tx: TokioSender<()>, +) { + tracing::info!("started ssh monitoring thread"); + + let mut log_watcher = match LogWatcher::register("/var/log/auth.log") + .or_else(|_| LogWatcher::register("/var/log/secure")) + { + Ok(f) => f, + Err(_) => { + let message = + "cannot open auth log file, tried paths /var/log/auth.log and /var/log/secure" + .to_string(); + tracing::error!("{}, exiting ssh monitoring thread", message); + messenger.send_nonblock(Notification::new(message, Level::ERROR)); + return; + } + }; + + let mut connections = HashMap::new(); + + log_watcher.watch(&mut move |result| { + let result = match result { + Ok(event) => match event { + LogWatcherEvent::Line(line) => match parse(&line) { + Some(mut datarow) => { + lookup_connection(&mut datarow, &mut connections); + let Datavalue::Text(ref status) = datarow.data[4].1 else { + panic!("assert: ssh status is parsed") + }; + if status == SSH_LOG_STATUS_CONNECTED && connections.len() > 100 { + let message = + format!("there are {} active ssh connections", connections.len()); + tracing::warn!("{}", message); + messenger.send_nonblock(Notification::new(message, Level::WARN)); + } + Ok(Data::Single(datarow)) + } + None => { + return LogWatcherAction::None; + } + }, + LogWatcherEvent::LogRotation => { + tracing::info!("auth log file rotation"); + return LogWatcherAction::None; + } + }, + Err(err) => { + let message = format!("error watching ssh access log: {err}"); + Err(Data::Message(message)) + } + }; + if sender.blocking_send(TaskResult { id: 0, result }).is_err() { + if is_shutdown.load(Ordering::Relaxed) { + return LogWatcherAction::Finish; + } + panic!( + "assert: ssh monitoring messages queue shouldn't be closed before shutdown signal" + ); + } + + LogWatcherAction::None + }); + + let _ = tx.send(()); + tracing::info!("exiting ssh monitoring thread"); +} + +struct Connection { + user_ip: String, + user_port: u32, + user_key: String, +} + +fn lookup_connection(datarow: &mut Datarow, connections: &mut HashMap) { + let Datavalue::Text(ref status) = datarow.data[4].1 else { + panic!("assert: ssh status is parsed") + }; + if status != SSH_LOG_STATUS_CONNECTED && status != SSH_LOG_STATUS_TERMINATED { + return; + } + let Datavalue::IntegerID(id) = datarow.data[0].1 else { + panic!("assert: ssh id is parsed") + }; + + // For terminated + if let Some(Connection { + user_ip, + user_port, + user_key, + }) = connections.remove(&id) + { + datarow.data[2].1 = Datavalue::Text(user_ip); + datarow.data[3].1 = Datavalue::IntegerID(user_port); + datarow.data[5].1 = Datavalue::Text(user_key); + return; + } + + if status == SSH_LOG_STATUS_TERMINATED { + return; + } + + // For connected + let Datavalue::Text(ref ip) = datarow.data[2].1 else { + panic!("assert: connected ssh user has an ip") + }; + let Datavalue::IntegerID(port) = datarow.data[3].1 else { + panic!("assert: connected ssh user has a port") + }; + let Datavalue::Text(ref key) = datarow.data[5].1 else { + panic!("assert: connected ssh user has a pubkey") + }; + + connections.insert( + id, + Connection { + user_ip: ip.to_string(), + user_port: port, + user_key: key.to_string(), + }, + ); +} + +fn parse(line: &str) -> Option { + lazy_static! { + static ref RE: Regex = Regex::new( + r"(?x) + (?P + [A-Za-z]{3,9}\s\d{1,2}\s\d{2}:\d{2}:\d{2} + ) + \s\S+\s + sshd\[(?P\d+)\]:\s + ( + (Disconnected\sfrom|Disconnecting|Connection\sclosed\sby)\sauthenticating\suser\s(?P\S+)| + (Disconnected\sfrom|Disconnecting|Connection\sclosed\sby)\sinvalid\suser\s(?P\S+)| + Accepted\spublickey\sfor\s(?P\S+)\sfrom| + pam_unix\(sshd:session\):\ssession\sclosed\sfor\suser\s(?P\S+)| + (?Pfatal:\sTimeout\sbefore\sauthentication\sfor|Unable\sto\snegotiate\swith|Connection\sclosed\sby|Connection\sreset\sby|banner\sexchange:\sConnection\sfrom) + ) + \s? + ((?P\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\sport\s(?P\d{2,5}))? + \s? + (ssh2:\s(?P.+))? + " + ) + .expect("assert: datetime regex is properly constructed"); + } + RE.captures(line).and_then(|cap| { + let datetime = cap + .name("datetime") + .map(|datetime| { + let captured = datetime.as_str(); + let captured = format!("{} {captured}", Utc::now().format("%Y")); + NaiveDateTime::parse_from_str(&captured, "%Y %b %d %H:%M:%S") + .expect("assert: can parse auth log datetime") + }) + .expect("assert: can get auth log datetime"); + let id = cap.name("id").and_then(|d| d.as_str().parse().ok())?; + let ip = cap.name("ip").map(|d| d.as_str().to_string()); + let port = cap.name("port").and_then(|d| d.as_str().parse().ok()); + let key = cap.name("key").map(|d| d.as_str().to_string()); + + let (username, status) = if let Some(username) = cap.name("username_rejected") { + (Datavalue::Text(username.as_str().to_string()), "rejected") + } else if let Some(username) = cap.name("username_invalid") { + ( + Datavalue::Text(username.as_str().to_string()), + "invalid_user", + ) + } else if let Some(username) = cap.name("username_accepted") { + ( + Datavalue::Text(username.as_str().to_string()), + SSH_LOG_STATUS_CONNECTED, + ) + } else if let Some(username) = cap.name("username_terminated") { + ( + Datavalue::Text(username.as_str().to_string()), + SSH_LOG_STATUS_TERMINATED, + ) + } else if let Some(other_reason) = cap.name("other_reason") { + let other_reason = other_reason.as_str().to_lowercase(); + let reason = if other_reason.contains("timeout") { + "timeout" + } else if other_reason.contains("reset") || other_reason.contains("closed") { + "rejected" + } else if other_reason.contains("negotiate") || other_reason.contains("banner") { + "wrong_params" + } else { + "rejected" + }; + + (Datavalue::NotAvailable, reason) + } else { + (Datavalue::NotAvailable, "rejected") + }; + + Some(Datarow::new( + SSH_LOG.to_string(), + datetime, + vec![ + ("id".to_string(), Datavalue::IntegerID(id)), + ("user".to_string(), username), + ( + "ip".to_string(), + ip.map(Datavalue::Text).unwrap_or(Datavalue::NotAvailable), + ), + ( + "port".to_string(), + port.map(Datavalue::IntegerID) + .unwrap_or(Datavalue::NotAvailable), + ), + ( + SSH_LOG_STATUS.to_string(), + Datavalue::Text(status.to_string()), + ), + ( + "pubkey".to_string(), + key.map(Datavalue::Text).unwrap_or(Datavalue::NotAvailable), + ), + ], + )) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parsing_auth_lines() { + let line = "May 21 11:22:15 server1 sshd[136055]: Disconnected from authenticating user root 139.59.37.55 port 48966 [preauth]"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(136055)); + assert_eq!(parsed.data[1].1, Datavalue::Text("root".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("139.59.37.55".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(48966)); + assert_eq!(parsed.data[4].1, Datavalue::Text("rejected".to_string())); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + + let line = "May 25 19:08:00 household sshd[159380]: Disconnecting authenticating user ubuntu 122.224.37.86 port 53474: Too many authentication failures [preauth]"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(159380)); + assert_eq!(parsed.data[1].1, Datavalue::Text("ubuntu".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("122.224.37.86".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(53474)); + assert_eq!(parsed.data[4].1, Datavalue::Text("rejected".to_string())); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + + let line = "May 21 11:22:18 server1 sshd[136059]: Disconnected from invalid user jj 94.127.212.198 port 1122 [preauth]"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(136059)); + assert_eq!(parsed.data[1].1, Datavalue::Text("jj".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("94.127.212.198".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(1122)); + assert_eq!( + parsed.data[4].1, + Datavalue::Text("invalid_user".to_string()) + ); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + + let line = "May 21 11:22:18 server1 sshd[136063]: Accepted publickey for ubuntu from 77.222.27.80 port 17827 ssh2: RSA SHA256:D726XJ0DkstyhsyH2rAbfYuIaeBOa3Su2l2WWbyXnXs"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(136063)); + assert_eq!(parsed.data[1].1, Datavalue::Text("ubuntu".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("77.222.27.80".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(17827)); + assert_eq!(parsed.data[4].1, Datavalue::Text("connected".to_string())); + assert_eq!( + parsed.data[5].1, + Datavalue::Text("RSA SHA256:D726XJ0DkstyhsyH2rAbfYuIaeBOa3Su2l2WWbyXnXs".to_string()) + ); + + let line = "May 21 10:09:13 los sshd[1511]: Accepted publickey for los from 192.168.64.1 port 63629 ssh2: ED25519 SHA256:tOfMBR3wtNPSvsy8dY6fMSIp+A9RllVkBTK8S+RiSkQ"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(1511)); + assert_eq!(parsed.data[1].1, Datavalue::Text("los".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("192.168.64.1".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(63629)); + assert_eq!(parsed.data[4].1, Datavalue::Text("connected".to_string())); + assert_eq!( + parsed.data[5].1, + Datavalue::Text( + "ED25519 SHA256:tOfMBR3wtNPSvsy8dY6fMSIp+A9RllVkBTK8S+RiSkQ".to_string() + ) + ); + + let line = "May 21 11:22:56 server1 sshd[136063]: pam_unix(sshd:session): session closed for user ubuntu"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(136063)); + assert_eq!(parsed.data[1].1, Datavalue::Text("ubuntu".to_string())); + assert_eq!(parsed.data[2].1, Datavalue::NotAvailable); + assert_eq!(parsed.data[3].1, Datavalue::NotAvailable); + assert_eq!(parsed.data[4].1, Datavalue::Text("terminated".to_string())); + + let line = "May 21 11:22:59 server1 sshd[135885]: fatal: Timeout before authentication for 116.255.189.120 port 47014"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(135885)); + assert_eq!(parsed.data[1].1, Datavalue::NotAvailable); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("116.255.189.120".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(47014)); + assert_eq!(parsed.data[4].1, Datavalue::Text("timeout".to_string())); + + let line = "May 25 17:44:08 household sshd[159150]: Connection closed by invalid user user 111.70.3.198 port 52445 [preauth]"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(159150)); + assert_eq!(parsed.data[1].1, Datavalue::Text("user".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("111.70.3.198".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(52445)); + assert_eq!( + parsed.data[4].1, + Datavalue::Text("invalid_user".to_string()) + ); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + + let line = "May 25 17:43:57 household sshd[159147]: Connection closed by authenticating user nobody 213.230.65.20 port 47128 [preauth]"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(159147)); + assert_eq!(parsed.data[1].1, Datavalue::Text("nobody".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("213.230.65.20".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(47128)); + assert_eq!(parsed.data[4].1, Datavalue::Text("rejected".to_string())); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + + let line = + "May 25 17:44:00 household sshd[159149]: Connection closed by 1.62.154.219 port 62293"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(159149)); + assert_eq!(parsed.data[1].1, Datavalue::NotAvailable); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("1.62.154.219".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(62293)); + assert_eq!(parsed.data[4].1, Datavalue::Text("rejected".to_string())); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + + let line = "May 25 16:03:47 household sshd[158910]: Connection reset by 104.248.136.93 port 6116 [preauth]"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(158910)); + assert_eq!(parsed.data[1].1, Datavalue::NotAvailable); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("104.248.136.93".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(6116)); + assert_eq!(parsed.data[4].1, Datavalue::Text("rejected".to_string())); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + + let line = "May 21 19:26:17 server1 sshd[59895]: pam_unix(sshd:session): session opened for user los(uid=1000) by (uid=0)"; + assert!(parse(line).is_none()); + + let line = "May 25 17:45:41 household sshd[159154]: Unable to negotiate with 185.196.8.151 port 34228: no matching key exchange method found. Their offer: diffie-hellman-group14-sha1,diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1 [preauth]"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(159154)); + assert_eq!(parsed.data[1].1, Datavalue::NotAvailable); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("185.196.8.151".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(34228)); + assert_eq!( + parsed.data[4].1, + Datavalue::Text("wrong_params".to_string()) + ); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + + let line = "May 25 15:33:34 household sshd[158862]: banner exchange: Connection from 162.243.135.24 port 53198: invalid format"; + let parsed = parse(line).unwrap(); + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(158862)); + assert_eq!(parsed.data[1].1, Datavalue::NotAvailable); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("162.243.135.24".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(53198)); + assert_eq!( + parsed.data[4].1, + Datavalue::Text("wrong_params".to_string()) + ); + assert_eq!(parsed.data[5].1, Datavalue::NotAvailable); + } + + #[test] + fn lookup_connections() { + let mut connections = HashMap::new(); + + let line = "May 21 11:22:18 server1 sshd[136063]: Accepted publickey for ubuntu from 77.222.27.80 port 17827 ssh2: RSA SHA256:D726XJ0DkstyhsyH2rAbfYuIaeBOa3Su2l2WWbyXnXs"; + let mut parsed = parse(line).unwrap(); + lookup_connection(&mut parsed, &mut connections); + assert_eq!(connections.len(), 1); + + let line = "May 21 11:22:56 server1 sshd[136063]: pam_unix(sshd:session): session closed for user ubuntu"; + let mut parsed = parse(line).unwrap(); + lookup_connection(&mut parsed, &mut connections); + assert_eq!(connections.len(), 0); + + assert_eq!(parsed.data[0].1, Datavalue::IntegerID(136063)); + assert_eq!(parsed.data[1].1, Datavalue::Text("ubuntu".to_string())); + assert_eq!( + parsed.data[2].1, + Datavalue::Text("77.222.27.80".to_string()) + ); + assert_eq!(parsed.data[3].1, Datavalue::IntegerID(17827)); + assert_eq!(parsed.data[4].1, Datavalue::Text("terminated".to_string())); + assert_eq!( + parsed.data[5].1, + Datavalue::Text("RSA SHA256:D726XJ0DkstyhsyH2rAbfYuIaeBOa3Su2l2WWbyXnXs".to_string()) + ); + } +} diff --git a/src/storage.rs b/src/storage.rs index 362ba1a..5b29fb6 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -418,7 +418,8 @@ impl AppendableLog { if usage < limit { if usage > 0.8 * limit && !self.truncate_warning_is_sent { - let message = format!("current spreadsheet usage `{usage:.2}%` for service `{}` is approaching a limit `{limit}%`, the data will be truncated, copy it if needed or consider using a separate spreadsheet for this service", self.service); + let url = self.spreadsheet_baseurl(); + let message = format!("current [spreadsheet]({url}) usage `{usage:.2}%` for service `{}` is approaching a limit `{limit}%`, the data will be truncated, copy it if needed or consider using a separate spreadsheet for this service with a higher [storage quota](https://maksimryndin.github.io/goral/services.html#storage-quota)", self.service); tracing::warn!("{}", message); if let Some(messenger) = self.messenger.as_ref() { messenger.try_warn(message); @@ -1076,7 +1077,7 @@ mod tests { let messages = tokio::spawn(async move { let mut warn_count = 0; while let Some(msg) = rx.recv().await { - if msg.message.contains("current spreadsheet usage") { + if msg.message.contains("the data will be truncated") { warn_count += 1; } println!("{msg:?}");