diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eab9f18261..0e877b9616 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,7 @@ on: env: CARGO_TERM_COLOR: always GO_VERSION: '1.18' - ACTION_MSRV_TOOLCHAIN: 1.59.0 + ACTION_MSRV_TOOLCHAIN: 1.60.0 jobs: build: runs-on: ubuntu-latest @@ -209,5 +209,7 @@ jobs: path: target/x86_64-unknown-linux-musl/release - run: chmod +x target/x86_64-unknown-linux-musl/release/conmonrs - run: .github/install-deps + - name: create symlink + run: sudo ln -f -s $GOROOT/bin/* /usr/bin/ - name: Integration tests run: RUNTIME_PATH="/usr/sbin/runc" make integration-static diff --git a/Cargo.lock b/Cargo.lock index e9c410dbc1..21e1ed8718 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -253,18 +253,22 @@ dependencies = [ "conmon-common", "futures", "getset", + "lazy_static", "libc", "memchr", "mockall", "multimap", "nix", + "notify", "prctl", + "regex", "sendfd", "serde", "shadow-rs", "strum", "tempfile", "tokio", + "tokio-eventfd", "tokio-util", "tracing", "tracing-journald", @@ -315,6 +319,26 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "difflib" version = "0.4.0" @@ -348,6 +372,18 @@ dependencies = [ "instant", ] +[[package]] +name = "filetime" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0408e2626025178a6a7f7ffc05a25bc47103229f19c113755de7bf63816290c" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "winapi", +] + [[package]] name = "float-cmp" version = "0.9.0" @@ -373,6 +409,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9d758e60b45e8d749c89c1b389ad8aee550f86aa12e2b9298b546dda7a82ab1" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.21" @@ -502,9 +547,9 @@ dependencies = [ [[package]] name = "git2" -version = "0.14.2" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3826a6e0e2215d7a41c2bfc7c9244123969273f3476b939a226aac0ab56e9e3c" +checksum = "5e77a14ffc6ba4ad5188d6cf428894c4fcfda725326b37558f35bb677e712cec" dependencies = [ "bitflags", "libc", @@ -555,6 +600,26 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -588,6 +653,26 @@ dependencies = [ "libc", ] +[[package]] +name = "kqueue" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -602,9 +687,9 @@ checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b" [[package]] name = "libgit2-sys" -version = "0.13.2+1.4.2" +version = "0.13.3+1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a42de9a51a5c12e00fc0e4ca6bc2ea43582fc6418488e8f615e905d886f258b" +checksum = "c24d36c3ac9b9996a2418d6bf428cc0bc5d1a814a84303fc60986088c5ed60de" dependencies = [ "cc", "libc", @@ -721,6 +806,24 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "notify" +version = "5.0.0-pre.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "553f9844ad0b0824605c20fb55a661679782680410abfb1a8144c2e7e437e7a7" +dependencies = [ + "bitflags", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "mio", + "walkdir", + "winapi", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -964,6 +1067,15 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2cc38e8fa666e2de3c4aba7edeb5ffc5246c1c2ed0e3d17e560aeeba736b23f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "sendfd" version = "0.4.1" @@ -1187,6 +1299,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "tokio-eventfd" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435aa354af05ef661bac1f4fd46f1c635d7879bcf6c49303ce929e79bbd30f51" +dependencies = [ + "futures-lite", + "libc", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.7.0" @@ -1360,6 +1483,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" diff --git a/Makefile b/Makefile index 332b3ed86c..c5df51877d 100644 --- a/Makefile +++ b/Makefile @@ -29,8 +29,9 @@ release-static: make release && \ strip -s target/x86_64-unknown-linux-musl/release/$(BINARY)" -lint: +lint: .install.golangci-lint cargo fmt + $(GOTOOLS_BINDIR)/golangci-lint run unit: cargo test --bins --no-fail-fast @@ -39,7 +40,7 @@ integration: .install.ginkgo release # It needs to be release so we correctly te export CONMON_BINARY="$(MAKEFILE_PATH)target/release/$(BINARY)" && \ export RUNTIME_BINARY="$(RUNTIME_PATH)" && \ export MAX_RSS_KB=10240 && \ - "$(GOTOOLS_BINDIR)/ginkgo" -v -r pkg/client + sudo -E "$(GOTOOLS_BINDIR)/ginkgo" -v -r pkg/client integration-static: .install.ginkgo # It needs to be release so we correctly test the RSS usage export CONMON_BINARY="$(MAKEFILE_PATH)target/x86_64-unknown-linux-musl/release/$(BINARY)" && \ @@ -47,12 +48,15 @@ integration-static: .install.ginkgo # It needs to be release so we correctly tes $(MAKE) release-static; \ fi && \ export RUNTIME_BINARY="$(RUNTIME_PATH)" && \ - export MAX_RSS_KB=2800 && \ - $(GOTOOLS_BINDIR)/ginkgo -v -r pkg/client + export MAX_RSS_KB=3100 && \ + sudo -E "$(GOTOOLS_BINDIR)/ginkgo" -v -r pkg/client .install.ginkgo: GOBIN=$(abspath $(GOTOOLS_BINDIR)) go install github.com/onsi/ginkgo/v2/ginkgo@latest +.install.golangci-lint: + curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | BINDIR=$(abspath $(GOTOOLS_BINDIR)) sh -s v1.45.2 + clean: rm -rf target/ diff --git a/conmon-rs/server/Cargo.toml b/conmon-rs/server/Cargo.toml index 69946addd6..70aee907e3 100644 --- a/conmon-rs/server/Cargo.toml +++ b/conmon-rs/server/Cargo.toml @@ -32,6 +32,10 @@ tracing = "0.1.34" tracing-journald = "0.3.0" tracing-subscriber = "0.3.11" uuid = { version = "1.0.0", features = ["v4", "fast-rng", "macro-diagnostics"] } +regex = "1.5.5" +notify = "5.0.0-pre.14" +tokio-eventfd = "0.2.0" +lazy_static = "1.4.0" [build-dependencies] shadow-rs = "0.11.0" diff --git a/conmon-rs/server/src/child.rs b/conmon-rs/server/src/child.rs index 7af8fd8c68..b11cef45bc 100644 --- a/conmon-rs/server/src/child.rs +++ b/conmon-rs/server/src/child.rs @@ -14,6 +14,9 @@ pub struct Child { #[getset(get = "pub")] exit_paths: Vec, + #[getset(get = "pub")] + oom_exit_paths: Vec, + #[getset(get = "pub")] timeout: Option, @@ -26,6 +29,7 @@ impl Child { id: String, pid: u32, exit_paths: Vec, + oom_exit_paths: Vec, timeout: Option, io: SharedContainerIO, ) -> Self { @@ -33,6 +37,7 @@ impl Child { id, pid, exit_paths, + oom_exit_paths, timeout, io, } diff --git a/conmon-rs/server/src/child_reaper.rs b/conmon-rs/server/src/child_reaper.rs index 3a565c8c19..b45d743a27 100644 --- a/conmon-rs/server/src/child_reaper.rs +++ b/conmon-rs/server/src/child_reaper.rs @@ -2,13 +2,13 @@ use crate::{ child::Child, container_io::{ContainerIO, ContainerIOType, SharedContainerIO}, + oom_watcher::OOMWatcher, }; use anyhow::{anyhow, format_err, Context, Result}; use getset::{CopyGetters, Getters, Setters}; use libc::pid_t; use multimap::MultiMap; use nix::errno::Errno; -use nix::sys::wait::WaitPidFlag; use nix::{ sys::{ signal::{kill, Signal}, @@ -18,19 +18,19 @@ use nix::{ }; use std::{ ffi::OsStr, - fs::File, - io::Write, path::{Path, PathBuf}, process::Stdio, sync::{Arc, Mutex}, }; use tokio::{ - fs, + fs::{self, File}, + io::AsyncWriteExt, process::Command, sync::broadcast::{self, Receiver, Sender}, - task, + task::{self, JoinHandle}, time::{self, Instant}, }; +use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, Instrument}; #[derive(Debug, Default, Getters)] @@ -114,7 +114,7 @@ impl ChildReaper { pub fn watch_grandchild(&self, child: Child) -> Result> { let locked_grandchildren = &self.grandchildren().clone(); let mut map = lock!(locked_grandchildren); - let reapable_grandchild = ReapableChild::from_child(&child); + let mut reapable_grandchild = ReapableChild::from_child(&child); let (exit_tx, exit_rx) = reapable_grandchild.watch()?; @@ -142,17 +142,19 @@ impl ChildReaper { } pub fn kill_grandchildren(&self, s: Signal) -> Result<()> { - let locked_grandchildren = &self.grandchildren().clone(); - for (_, grandchild) in lock!(locked_grandchildren).iter() { - debug!("Killing pid {}", grandchild.pid); - kill_grandchild(grandchild.pid, s)?; + for (_, grandchild) in self.grandchildren.lock().unwrap().iter() { + debug!(grandchild.pid, "killing grandchild"); + let _ = kill_grandchild(grandchild.pid, s); + futures::executor::block_on(async { + grandchild.close().await; + }); } Ok(()) } } -pub fn kill_grandchild(pid: u32, s: Signal) -> Result<()> { - let pid = Pid::from_raw(pid as pid_t); +pub fn kill_grandchild(raw_pid: u32, s: Signal) -> Result<()> { + let pid = Pid::from_raw(raw_pid as pid_t); if let Ok(pgid) = getpgid(Some(pid)) { // If process_group is 1, we will end up calling // kill(-1), which kills everything conmon is allowed to. @@ -161,22 +163,30 @@ pub fn kill_grandchild(pid: u32, s: Signal) -> Result<()> { match kill(Pid::from_raw(-pgid), s) { Ok(()) => return Ok(()), Err(e) => { - error!("Failed to get pgid, falling back to killing pid {}", e); + error!( + raw_pid, + "Failed to get pgid, falling back to killing pid {}", e + ); } } } } if let Err(e) = kill(pid, s) { - debug!("Failed killing pid {} with error {}", pid, e); + debug!(raw_pid, "Failed killing pid with error {}", e); } Ok(()) } +type TaskHandle = Arc>>>>; + #[derive(Clone, CopyGetters, Debug, Getters, Setters)] pub struct ReapableChild { #[getset(get)] exit_paths: Vec, + #[getset(get)] + oom_exit_paths: Vec, + #[getset(get_copy)] pid: u32, @@ -185,6 +195,11 @@ pub struct ReapableChild { #[getset(get = "pub")] timeout: Option, + + #[getset(get = "pub")] + token: CancellationToken, + + task: Option, } #[derive(Clone, CopyGetters, Debug, Getters, Setters)] @@ -192,6 +207,9 @@ pub struct ExitChannelData { #[getset(get = "pub")] pub exit_code: i32, + #[getset(get = "pub")] + pub oomed: bool, + #[getset(get = "pub")] pub timed_out: bool, } @@ -200,101 +218,141 @@ impl ReapableChild { pub fn from_child(child: &Child) -> Self { Self { exit_paths: child.exit_paths().clone(), + oom_exit_paths: child.oom_exit_paths().clone(), pid: child.pid(), io: child.io().clone(), timeout: *child.timeout(), + token: CancellationToken::new(), + task: None, } } - fn watch(&self) -> Result<(Sender, Receiver)> { + pub async fn close(&self) { + debug!("{}: grandchild close", self.pid); + self.token.cancel(); + if let Some(t) = self.task.clone() { + for t in t.lock().unwrap().take().unwrap().into_iter() { + debug!("{}: grandchild await", self.pid); + let _ = t.await; + } + } + } + + fn watch(&mut self) -> Result<(Sender, Receiver)> { let exit_paths = self.exit_paths().clone(); + let oom_exit_paths = self.oom_exit_paths().clone(); let pid = self.pid(); // Only one exit code will be written. let (exit_tx, exit_rx) = broadcast::channel(1); let exit_tx_clone = exit_tx.clone(); let timeout = *self.timeout(); + let stop_token = self.token().clone(); - task::spawn( + let task = task::spawn( async move { - let exit_code: i32; + let mut exit_code: i32 = -1; + let mut oomed = false; let mut timed_out = false; + let (oom_tx, mut oom_rx) = tokio::sync::mpsc::channel(1); + let oom_watcher = OOMWatcher::new(&stop_token, pid, &oom_exit_paths, oom_tx).await; let wait_for_exit_code = - task::spawn_blocking(move || Self::wait_for_exit_code(pid)); + task::spawn_blocking(move || Self::wait_for_exit_code(&stop_token, pid)); + let closure = async { + let (code, oom) = tokio::join!(wait_for_exit_code, oom_rx.recv()); + if let Ok(code) = code { + exit_code = code; + } + if let Some(event) = oom { + oomed = event.oom; + } + }; if let Some(timeout) = timeout { - match time::timeout_at(timeout, wait_for_exit_code).await { - Ok(status) => match status { - Ok(code) => exit_code = code, - Err(err) => { - return Err(err); - } - }, - Err(_) => { - timed_out = true; - exit_code = -3; - let _ = kill_grandchild(pid, Signal::SIGKILL); - } + if time::timeout_at(timeout, closure).await.is_err() { + timed_out = true; + exit_code = -3; + let _ = kill_grandchild(pid, Signal::SIGKILL); } } else { - match wait_for_exit_code.await { - Ok(code) => exit_code = code, - Err(_) => exit_code = -1, - } + closure.await; } - debug!( - "Sending exit code to channel for pid {} : {}", - pid, exit_code - ); + oom_watcher.stop().await; let exit_channel_data = ExitChannelData { exit_code, + oomed, timed_out, }; + debug!( + pid, + "sending exit struct to channel : {:?}", exit_channel_data + ); if exit_tx_clone.send(exit_channel_data).is_err() { - error!("Unable to send exit status"); + debug!(pid, "Unable to send exit status"); + } + debug!(pid, "write to exit paths"); + if let Err(e) = Self::write_to_exit_paths(exit_code, &exit_paths).await { + error!(pid, "could not write exit paths: {}", e); } - let _ = - Self::write_to_exit_paths(exit_code, &exit_paths).context("write exit paths"); - Ok(()) } .instrument(debug_span!("watch")), ); + let tasks = Arc::new(Mutex::new(Some(Vec::new()))); + tasks.lock().unwrap().as_mut().unwrap().push(task); + self.task = Some(tasks); + Ok((exit_tx, exit_rx)) } - fn wait_for_exit_code(pid: u32) -> i32 { + fn wait_for_exit_code(token: &CancellationToken, pid: u32) -> i32 { const FAILED_EXIT_CODE: i32 = -3; loop { - match waitpid( - Pid::from_raw(pid as pid_t), - Some(WaitPidFlag::WNOHANG | WaitPidFlag::__WALL), - ) { + match waitpid(Pid::from_raw(pid as pid_t), None) { Ok(WaitStatus::Exited(_, exit_code)) => { + debug!(pid, "Exited {}", exit_code); + token.cancel(); return exit_code; } Ok(WaitStatus::Signaled(_, sig, _)) => { + debug!(pid, "Signaled"); + token.cancel(); return (sig as i32) + 128; } Ok(_) => { continue; } Err(Errno::EINTR) => { - debug!("Failed to wait for pid on EINTR, retrying"); + debug!(pid, "Failed to wait for pid on EINTR, retrying"); continue; } Err(err) => { - error!("Unable to waitpid on {}: {}", pid, err); + error!(pid, "Unable to waitpid on {}", err); + token.cancel(); return FAILED_EXIT_CODE; } }; } } - fn write_to_exit_paths(code: i32, paths: &[PathBuf]) -> Result<()> { - let code_str = format!("{}", code); - for path in paths { - debug!("Writing exit code {} to {}", code, path.display()); - File::create(path)?.write_all(code_str.as_bytes())?; + async fn write_to_exit_paths(code: i32, paths: &[PathBuf]) -> Result<()> { + let paths = paths.to_owned(); + let tasks: Vec<_> = paths + .into_iter() + .map(|path| { + tokio::spawn(async move { + let code_str = format!("{}", code); + if let Ok(mut fp) = File::create(&path).await { + if let Err(e) = fp.write_all(code_str.as_bytes()).await { + error!("could not write exit file to path {} {}", path.display(), e); + } + } + }) + }) + .collect(); + + for task in tasks { + task.await?; } + Ok(()) } } diff --git a/conmon-rs/server/src/lib.rs b/conmon-rs/server/src/lib.rs index 846e7ec174..57de30418f 100644 --- a/conmon-rs/server/src/lib.rs +++ b/conmon-rs/server/src/lib.rs @@ -10,6 +10,7 @@ mod container_log; mod cri_logger; mod init; mod listener; +mod oom_watcher; mod rpc; mod server; mod streams; diff --git a/conmon-rs/server/src/oom_watcher.rs b/conmon-rs/server/src/oom_watcher.rs new file mode 100644 index 0000000000..5ae32574de --- /dev/null +++ b/conmon-rs/server/src/oom_watcher.rs @@ -0,0 +1,331 @@ +use anyhow::{anyhow, Context, Result}; +use lazy_static::lazy_static; +use nix::sys::statfs::{statfs, FsType}; +use notify::{Error, Event, RecommendedWatcher, RecursiveMode, Watcher}; +use regex::Regex; +use std::os::unix::prelude::AsRawFd; +use std::path::{Path, PathBuf}; +use tokio::fs::File; +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio::sync::mpsc::{channel, Receiver}; +use tokio::task::{self, JoinHandle}; +use tokio_eventfd::EventFd; +use tokio_util::sync::CancellationToken; +use tracing::{debug, debug_span, error}; + +#[cfg(any(all(target_os = "linux", target_env = "musl")))] +pub const CGROUP2_SUPER_MAGIC: FsType = FsType(libc::CGROUP2_SUPER_MAGIC as u64); +#[cfg(any(all(target_os = "linux", not(target_env = "musl"))))] +pub const CGROUP2_SUPER_MAGIC: FsType = FsType(libc::CGROUP2_SUPER_MAGIC as i64); + +static CGROUP_ROOT: &str = "/sys/fs/cgroup"; + +lazy_static! { + static ref IS_CGROUP_V2: bool = { + if let Ok(sts) = statfs(CGROUP_ROOT) { + return sts.filesystem_type() == CGROUP2_SUPER_MAGIC; + } + false + }; +} + +pub struct OOMWatcher { + pid: u32, + token: CancellationToken, + task: JoinHandle<()>, +} + +pub struct OOMEvent { + pub oom: bool, +} + +impl OOMWatcher { + pub async fn new( + token: &CancellationToken, + pid: u32, + exit_paths: &[PathBuf], + tx: tokio::sync::mpsc::Sender, + ) -> OOMWatcher { + let exit_paths = exit_paths.to_owned(); + let token = token.clone(); + let task = { + let stop = token.clone(); + task::spawn(async move { + if let Err(e) = if *IS_CGROUP_V2 { + Self::oom_handling_cgroup_v2(stop, pid, &exit_paths, tx) + .await + .context("setup cgroupv2 oom handling") + } else { + Self::oom_handling_cgroup_v1(stop, pid, &exit_paths, tx) + .await + .context("setup cgroupv1 oom handling") + } { + error!("Failed to watch OOM: {:#}", e) + } + }) + }; + OOMWatcher { pid, token, task } + } + + pub async fn stop(self) { + self.token.cancel(); + if let Err(err) = self.task.await { + error!("{}: stop failed: {}", self.pid, err); + } + } + + async fn oom_handling_cgroup_v1( + token: CancellationToken, + pid: u32, + exit_paths: &[PathBuf], + tx: tokio::sync::mpsc::Sender, + ) -> Result<()> { + let span = debug_span!("oom_handling_cgroup_v1", pid); + let _enter = span.enter(); + let memory_cgroup_path = Self::process_cgroup_subsystem_path(pid, false, "memory").await?; + let memory_cgroup_file_oom_path = memory_cgroup_path.join("memory.oom_control"); + let event_control_path = memory_cgroup_path.join("cgroup.event_control"); + let path = memory_cgroup_file_oom_path.to_str(); + + debug!(path, "enabled cgroup v1 oom handling"); + + let oom_cgroup_file = tokio::fs::OpenOptions::new() + .write(true) + .open(memory_cgroup_file_oom_path) + .await + .context("opening cgroup file")?; + let mut oom_event_fd = EventFd::new(0, false).context("creating eventfd")?; + + let mut event_control = tokio::fs::OpenOptions::new() + .write(true) + .open(event_control_path) + .await + .context("opening cgroup file")?; + event_control + .write_all( + format!( + "{} {}", + oom_event_fd.as_raw_fd(), + oom_cgroup_file.as_raw_fd() + ) + .as_bytes(), + ) + .await + .context("writing control data")?; + event_control.flush().await.context("flush control data")?; + + debug!("successfully setup cgroup v1 oom detection"); + + let mut buffer = [0u8; 16]; + loop { + tokio::select! { + _ = token.cancelled() => { + let _ = tx.try_send(OOMEvent{ oom: false }); + break; + } + _ = oom_event_fd.read(&mut buffer) => { + debug!("cgroup v1 oom event"); + if let Err(e) = Self::write_oom_files(exit_paths).await { + error!("error writing oom files: {}", e); + } else { + debug!("successfully wrote oom files"); + } + let _ = tx.try_send(OOMEvent{ oom: true }); + break; + } + } + } + + debug!("done watching for ooms"); + Ok(()) + } + + fn async_watcher() -> Result<(RecommendedWatcher, Receiver>)> { + let (tx, rx) = channel(1); + + let watcher = notify::recommended_watcher(move |res: Result| { + futures::executor::block_on(async { + tx.send(res).await.unwrap(); + }) + })?; + + Ok((watcher, rx)) + } + + async fn oom_handling_cgroup_v2( + token: CancellationToken, + pid: u32, + exit_paths: &[PathBuf], + tx: tokio::sync::mpsc::Sender, + ) -> Result<()> { + let span = debug_span!("oom_handling_cgroup_v2", pid); + let _enter = span.enter(); + let subsystem_path = Self::process_cgroup_subsystem_path(pid, true, "memory").await?; + let memory_events_file_path = subsystem_path.join("memory.events"); + let mut last_counter: u64 = 0; + + let path = memory_events_file_path.to_str(); + debug!(path, "settup up cgroup v2 handling"); + + let (mut watcher, mut rx) = Self::async_watcher()?; + watcher.watch(&memory_events_file_path, RecursiveMode::NonRecursive)?; + + loop { + debug!("oom_handling_cgroup_v2 loop"); + tokio::select! { + _ = token.cancelled() => { + debug!("oom_handling_cgroup_v2 loop cancelled"); + match tx.try_send(OOMEvent{ oom: false }) { + Ok(_) => break, + Err(e) => error!("try_send failed: {}", e) + }; + break; + } + Some(res) = rx.recv() => { + match res { + Ok(event) => { + if event.kind.is_remove() || event.kind.is_other() { + match tx.try_send(OOMEvent{ oom: false }) { + Ok(_) => break, + Err(e) => error!("try_send failed: {}", e) + }; + break + } + if !event.kind.is_modify() { + continue; + } + debug!("found modify event"); + match Self::check_for_oom(&memory_events_file_path, last_counter).await { + Ok((counter, is_oom)) => { + if !is_oom { + continue; + } + debug!(counter, "found oom event"); + if let Err(e) = Self::write_oom_files(exit_paths).await { + error!("error writing oom files: {}", e); + } + last_counter = counter; + match tx.try_send(OOMEvent{ oom: true }) { + Ok(_) => break, + Err(e) => error!("try_send failed: {}", e) + }; + } + Err(e) => { + error!(pid, "error checking for oom: {}", e); + match tx.try_send(OOMEvent{ oom: false }) { + Ok(_) => break, + Err(e) => error!("try_send failed: {}", e) + }; + } + }; + }, + Err(e) => { + debug!("watch error: {:?}", e); + match tx.try_send(OOMEvent{ oom: false }) { + Ok(_) => break, + Err(e) => error!("try_send failed: {}", e) + }; + break; + }, + }; + } + } + } + watcher.unwatch(&memory_events_file_path)?; + + debug!("done watching for ooms"); + + Ok(()) + } + + async fn check_for_oom( + memory_events_file_path: &Path, + last_counter: u64, + ) -> Result<(u64, bool)> { + let mut new_counter: u64 = 0; + let mut found_oom = false; + let fp = File::open(memory_events_file_path).await?; + let reader = BufReader::new(fp); + let mut lines = reader.lines(); + while let Some(line) = lines.next_line().await? { + if let Some(counter) = line.strip_prefix("oom ") { + let counter = counter.to_string().parse::()?; + if counter != last_counter { + new_counter = counter; + found_oom = true; + break; + } + } + } + Ok((new_counter, found_oom)) + } + + async fn write_oom_files(exit_paths: &[PathBuf]) -> Result<()> { + let paths = exit_paths.to_owned(); + let tasks: Vec<_> = paths + .into_iter() + .map(|path| { + tokio::spawn(async move { + if let Err(e) = File::create(&path).await { + error!("could not write oom file to {}: {}", path.display(), e); + } + }) + }) + .collect(); + for task in tasks { + task.await?; + } + Ok(()) + } + + async fn process_cgroup_subsystem_path( + pid: u32, + is_cgroupv2: bool, + subsystem: &str, + ) -> Result { + if is_cgroupv2 { + Self::process_cgroup_subsystem_path_cgroup_v2(pid).await + } else { + Self::process_cgroup_subsystem_path_cgroup_v1(pid, subsystem).await + } + } + + async fn process_cgroup_subsystem_path_cgroup_v1(pid: u32, subsystem: &str) -> Result { + lazy_static! { + static ref RE: Regex = Regex::new(".*:(.*):/(.*)").unwrap(); + } + + let cgroup_path = format!("/proc/{}/cgroup", pid); + debug!(pid, "using cgroup path : {}", cgroup_path); + let fp = File::open(cgroup_path).await?; + let reader = BufReader::new(fp); + let mut lines = reader.lines(); + while let Some(line) = lines.next_line().await? { + if let Some(caps) = RE.captures(&line) { + let system = caps[1].to_string(); + let path = caps[2].to_string(); + if system.contains(subsystem) || system.eq("") { + return Ok(PathBuf::from(CGROUP_ROOT).join(subsystem).join(path)); + } + } + } + Err(anyhow!("no path found")) + } + + async fn process_cgroup_subsystem_path_cgroup_v2(pid: u32) -> Result { + lazy_static! { + static ref RE: Regex = Regex::new(".*:.*:/(.*)").unwrap(); + } + + let fp = File::open(format!("/proc/{}/cgroup", pid)).await?; + let mut buffer = String::new(); + let mut reader = BufReader::new(fp); + if reader.read_line(&mut buffer).await? == 0 { + Err(anyhow!("invalid cgroup")) + } else if let Some(caps) = RE.captures(&buffer) { + Ok(Path::new(CGROUP_ROOT).join(&caps[1])) + } else { + Err(anyhow!("invalid cgroup")) + } + } +} diff --git a/conmon-rs/server/src/rpc.rs b/conmon-rs/server/src/rpc.rs index 481aea60bf..6b0990491e 100644 --- a/conmon-rs/server/src/rpc.rs +++ b/conmon-rs/server/src/rpc.rs @@ -85,7 +85,11 @@ impl conmon::Server for Server { let child_reaper = self.reaper().clone(); let args = pry_err!(self.generate_runtime_args(&id, bundle_path, &container_io, &pidfile)); let runtime = self.config().runtime().clone(); - let exit_paths = pry!(pry!(req.get_exit_paths()) + let exit_paths: Vec = pry!(pry!(req.get_exit_paths()) + .iter() + .map(|r| r.map(PathBuf::from)) + .collect()); + let oom_exit_paths: Vec = pry!(pry!(req.get_oom_exit_paths()) .iter() .map(|r| r.map(PathBuf::from)) .collect()); @@ -102,7 +106,7 @@ impl conmon::Server for Server { // register grandchild with server let io = SharedContainerIO::new(container_io); - let child = Child::new(id, grandchild_pid, exit_paths, None, io); + let child = Child::new(id, grandchild_pid, exit_paths, oom_exit_paths, None, io); capnp_err!(child_reaper.watch_grandchild(child))?; results @@ -161,8 +165,14 @@ impl conmon::Server for Server { // register grandchild with server let io = SharedContainerIO::new(container_io); let io_clone = io.clone(); - let child = - Child::new(id, grandchild_pid, vec![], time_to_timeout, io_clone); + let child = Child::new( + id, + grandchild_pid, + vec![], + vec![], + time_to_timeout, + io_clone, + ); let mut exit_rx = capnp_err!(child_reaper.watch_grandchild(child))?; diff --git a/conmon-rs/server/src/server.rs b/conmon-rs/server/src/server.rs index 0966071015..62fe8ecbfd 100644 --- a/conmon-rs/server/src/server.rs +++ b/conmon-rs/server/src/server.rs @@ -28,7 +28,7 @@ use tokio::{ task::{self, LocalSet}, }; use tokio_util::compat::TokioAsyncReadCompatExt; -use tracing::{debug, info}; +use tracing::{debug, error, info}; use tracing_subscriber::{filter::LevelFilter, prelude::*}; use twoparty::VatNetwork; @@ -39,8 +39,9 @@ pub struct Server { #[getset(get = "pub")] config: Config, + /// reaper instance #[getset(get = "pub(crate)")] - reaper: Arc, + pub reaper: Arc, } impl Server { @@ -134,11 +135,8 @@ impl Server { async fn spawn_tasks(self) -> Result<()> { let (shutdown_tx, shutdown_rx) = oneshot::channel(); let socket = self.config().socket(); - tokio::spawn(Self::start_signal_handler( - self.reaper().clone(), - socket, - shutdown_tx, - )); + let reaper = self.reaper.clone(); + task::spawn(Self::start_signal_handler(reaper, socket, shutdown_tx)); task::spawn_blocking(move || { Handle::current().block_on(async { @@ -177,7 +175,10 @@ impl Server { // TODO FIXME Ideally we would drop after socket file is removed, // but the removal is taking longer than 10 seconds, indicating someone // is keeping it open... - reaper.kill_grandchildren(handled_sig)?; + match reaper.kill_grandchildren(handled_sig) { + Ok(_) => (), + Err(e) => error!("could not kill grandchildren: {}", e), + } debug!("Removing socket file {}", socket.as_ref().display()); fs::remove_file(socket) diff --git a/pkg/client/client.go b/pkg/client/client.go index 9f121a3938..32bc1dac51 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -484,6 +484,9 @@ func (c *ConmonClient) CreateContainer( if err := stringSliceToTextList(cfg.OOMExitPaths, req.NewOomExitPaths); err != nil { return fmt.Errorf("convert oom exit paths string slice to text list: %w", err) } + if err := stringSliceToTextList(cfg.OOMExitPaths, req.NewOomExitPaths); err != nil { + return err + } if err := c.initLogDrivers(&req, cfg.LogDrivers); err != nil { return fmt.Errorf("init log drivers: %w", err) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 8794761a2a..3e0e90f85f 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -77,7 +77,7 @@ var _ = Describe("ConmonClient", func() { Eventually(func() error { return tr.rr.RunCommandCheckOutput("stopped", "list") - }, time.Second*10).Should(BeNil()) + }, time.Second*20).Should(BeNil()) }) It(testName("should return error if invalid command", terminal), func() { @@ -282,6 +282,7 @@ var _ = Describe("ConmonClient", func() { // Attach to the container socketPath := filepath.Join(tr.tmpDir, "attach") go func() { + defer GinkgoRecover() err := sut.AttachContainer(context.Background(), &client.AttachConfig{ ID: tr.ctrID, SocketPath: socketPath, diff --git a/pkg/client/suite_test.go b/pkg/client/suite_test.go index ab5a91b783..b2938cabe2 100644 --- a/pkg/client/suite_test.go +++ b/pkg/client/suite_test.go @@ -99,6 +99,10 @@ func (tr *testRunner) exitPath() string { return filepath.Join(tr.tmpDir, "exit") } +func (tr *testRunner) oomExitPath() string { + return filepath.Join(tr.tmpDir, "oom_exit") +} + func fileContents(path string) string { contents, err := os.ReadFile(path) Expect(err).To(BeNil()) @@ -108,10 +112,11 @@ func fileContents(path string) string { func (tr *testRunner) createContainer(sut *client.ConmonClient, terminal bool) { resp, err := sut.CreateContainer(context.Background(), &client.CreateContainerConfig{ - ID: tr.ctrID, - BundlePath: tr.tmpDir, - Terminal: terminal, - ExitPaths: []string{tr.exitPath()}, + ID: tr.ctrID, + BundlePath: tr.tmpDir, + Terminal: terminal, + ExitPaths: []string{tr.exitPath()}, + OOMExitPaths: []string{tr.oomExitPath()}, LogDrivers: []client.LogDriver{{ Type: client.LogDriverTypeContainerRuntimeInterface, Path: tr.logPath(),