Skip to content

Commit

Permalink
Add support for pause
Browse files Browse the repository at this point in the history
This experiment adds support for running a global shared pause instance
side by side to `conmonrs`. The idea is that this instance keeps the
required namespaces open and provides them as part of the container
creation response to the consumers.

Signed-off-by: Sascha Grunert <[email protected]>
  • Loading branch information
saschagrunert committed Nov 18, 2022
1 parent 6d21df6 commit cad12b1
Show file tree
Hide file tree
Showing 12 changed files with 1,055 additions and 334 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ linters-settings:
cyclop:
max-complexity: 20
gocognit:
min-complexity: 30
min-complexity: 35
nestif:
min-complexity: 15
errcheck:
Expand Down
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions conmon-rs/common/proto/conmon.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ interface Conmon {

struct CreateContainerResponse {
containerPid @0 :UInt32;
namespacesPath @1 :Text;
}

createContainer @1 (request: CreateContainerRequest) -> (response: CreateContainerResponse);
Expand Down Expand Up @@ -120,4 +121,16 @@ interface Conmon {
}

setWindowSizeContainer @5 (request: SetWindowSizeRequest) -> (response: SetWindowSizeResponse);

###############################################
# CreateNamespaces
struct CreateNamespacesRequest {
metadata @0 :Data;
}

struct CreateNamespacesResponse {
path @0 :Text;
}

createNamespaces @6 (request: CreateNamespacesRequest) -> (response: CreateNamespacesResponse);
}
2 changes: 2 additions & 0 deletions conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ memchr = "2.5.0"
multimap = "0.8.3"
nix = "0.25.0"
notify = "5.0.0"
once_cell = "1.16.0"
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11.0"
opentelemetry-semantic-conventions = "0.10.0"
Expand All @@ -31,6 +32,7 @@ sendfd = { version = "0.4.3", features = ["tokio"] }
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.87"
shadow-rs = "=0.16.1"
signal-hook = "0.3.14"
strum = { version = "0.24.1", features = ["derive"] }
tempfile = "3.3.0"
tokio = { version = "1.21.2", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "time"] }
Expand Down
5 changes: 5 additions & 0 deletions conmon-rs/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ pub struct Config {
/// Show version information, specify "full" for verbose output.
version: Option<Verbosity>,

#[get = "pub"]
#[clap(long("pause"), short('p'), value_name("PATH"))]
/// Run the pause process instead of the server.
pause: Option<PathBuf>,

#[get = "pub"]
#[clap(
default_value_t,
Expand Down
1 change: 1 addition & 0 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod init;
mod journal;
mod listener;
mod oom_watcher;
mod pause;
mod rpc;
mod server;
mod streams;
Expand Down
2 changes: 1 addition & 1 deletion conmon-rs/server/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct DefaultListener;

impl ListenerImpl for DefaultListener {
fn bind(&self, path: &Path) -> io::Result<UnixListener> {
UnixListener::bind(&path)
UnixListener::bind(path)
}

fn create_dir_all(&self, path: &Path) -> io::Result<()> {
Expand Down
183 changes: 183 additions & 0 deletions conmon-rs/server/src/pause.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use anyhow::{bail, Context, Result};
use getset::{CopyGetters, Getters};
use libc::pid_t;
use nix::{
mount::{mount, umount, MsFlags},
sched::{unshare, CloneFlags},
sys::signal::{kill, Signal},
unistd::{fork, ForkResult, Pid},
};
use once_cell::sync::OnceCell;
use signal_hook::{consts::TERM_SIGNALS, iterator::Signals};
use std::{
env,
fs::{self, File},
io::Write,
path::{Path, PathBuf},
process::{exit, Command},
};
use strum::{AsRefStr, Display, EnumIter, EnumString, IntoEnumIterator, IntoStaticStr};
use tracing::{debug, error, info};
use uuid::Uuid;

/// The main structure for this module.
#[derive(Debug, CopyGetters, Getters)]
pub struct Pause {
#[get = "pub"]
path: PathBuf,

#[get_copy]
pid: Pid,
}

/// The global shared multiple pause instance.
static PAUSE: OnceCell<Pause> = OnceCell::new();

/// The global path for storing bin mounted namespaces.
const PAUSE_PATH: &str = "/var/run/conmonrs";

/// The file path for storing the pause PID.
const PAUSE_PID_FILE: &str = ".pause_pid";

impl Pause {
/// Retrieve the global instance of pause
pub fn shared() -> Result<&'static Pause> {
PAUSE.get_or_try_init(|| Self::init().context("init pause"))
}

/// Retrieve the global instance of pause if initialized.
pub fn maybe_shared() -> Option<&'static Pause> {
PAUSE.get()
}

/// Stop the global pause instance.
pub fn stop(&self) {
info!("Stopping pause");
for namespace in Namespace::iter() {
if let Err(e) = namespace.umount(self.path()) {
debug!("Unable to umount namespace {namespace}: {:#}", e);
}
}
if let Err(e) = fs::remove_dir_all(self.path()) {
error!(
"Unable to remove pause path {}: {:#}",
self.path().display(),
e
);
}

info!("Killing pause PID: {}", self.pid());
if let Err(e) = kill(self.pid(), Signal::SIGTERM) {
error!("Unable to kill pause PID {}: {:#}", self.pid(), e);
}
}

/// Initialize a new pause instance.
fn init() -> Result<Self> {
debug!("Initializing pause");

let path = PathBuf::from(PAUSE_PATH).join(Uuid::new_v4().to_string());
fs::create_dir_all(&path).context("create base path")?;

let program = env::args().next().context("no args set")?;
let mut child = Command::new(program)
.arg("-p")
.arg(&path)
.spawn()
.context("run pause")?;

let status = child.wait().context("wait for pause child")?;
if !status.success() {
bail!("exit status not ok: {status}")
}

let pid = fs::read_to_string(path.join(PAUSE_PID_FILE))
.context("read pause PID path")?
.trim()
.parse::<u32>()
.context("parse pause PID")?;
info!("Pause PID is: {pid}");

Ok(Self {
path,
pid: Pid::from_raw(pid as pid_t),
})
}

/// Run a new pause instance.
pub fn run<T: AsRef<Path>>(path: T) -> Result<()> {
let flags = CloneFlags::CLONE_NEWPID
| CloneFlags::CLONE_NEWIPC
| CloneFlags::CLONE_NEWNET
| CloneFlags::CLONE_NEWUTS;

unshare(flags).context("unshare with clone flags")?;

match unsafe { fork().context("forking process")? } {
ForkResult::Parent { child } => {
let mut file = File::create(path.as_ref().join(PAUSE_PID_FILE))
.context("create pause PID file")?;
write!(file, "{child}").context("write child to pause file")?;
exit(0);
}
ForkResult::Child => (),
}

for namespace in Namespace::iter() {
namespace
.bind(path.as_ref())
.context("bind namespace to {path}")?;
}

let mut signals = Signals::new(TERM_SIGNALS).context("register signals")?;
signals.forever().next().context("no signal number")?;
Ok(())
}
}

#[derive(
AsRefStr, Clone, Copy, Debug, Display, EnumIter, EnumString, Eq, IntoStaticStr, PartialEq,
)]
#[strum(serialize_all = "lowercase")]
/// All available linux namespaces.
enum Namespace {
/// The PID namespace. The child process becomes PID 1.
Pid,

/// IPC namespace. This creates new namespace for System V IPC POSIX message queues and
/// similar.
Ipc,

/// The network namespace. The namespace is empty and has no conectivity, even localhost
/// network, unless some setup is done afterwards.
Net,

/// The UTS namespace, which allows to change hostname of the new container.
Uts,
}

impl Namespace {
/// Bind the namespace to the provided path.
pub fn bind<T: AsRef<Path>>(&self, path: T) -> Result<()> {
let bind_path = path.as_ref().join(self.as_ref());
File::create(&bind_path).context("create namespace bind path")?;
let source_path = PathBuf::from("/proc/self/ns").join(self.as_ref());

mount(
Some(&source_path),
&bind_path,
None::<&Path>,
MsFlags::MS_BIND,
None::<&[u8]>,
)
.context("mount namespace")?;

Ok(())
}

/// Umount the namespace.
pub fn umount<T: AsRef<Path>>(&self, path: T) -> Result<()> {
let bind_path = path.as_ref().join(self.as_ref());
umount(&bind_path).context("umount namespace")
}
}
25 changes: 25 additions & 0 deletions conmon-rs/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
child::Child,
container_io::{ContainerIO, SharedContainerIO},
container_log::ContainerLog,
pause::Pause,
server::Server,
telemetry::Telemetry,
version::Version,
Expand Down Expand Up @@ -349,4 +350,28 @@ impl conmon::Server for Server {
.instrument(debug_span!("promise")),
)
}

/// Create a new set of namespaces.
fn create_namespaces(
&mut self,
params: conmon::CreateNamespacesParams,
mut results: conmon::CreateNamespacesResults,
) -> Promise<(), capnp::Error> {
debug!("Got a create namespaces request");
let req = pry!(pry!(params.get()).get_request());

let span = debug_span!(
"create_namespaces",
uuid = Uuid::new_v4().to_string().as_str()
);
let _enter = span.enter();
pry_err!(Telemetry::set_parent_context(pry!(req.get_metadata())));

let pause = pry_err!(Pause::shared());

let mut response = results.get().init_response();
response.set_path(&pause.path().display().to_string());

Promise::ok(())
}
}
10 changes: 10 additions & 0 deletions conmon-rs/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
init::{DefaultInit, Init},
journal::Journal,
listener::{DefaultListener, Listener},
pause::Pause,
telemetry::Telemetry,
version::Version,
};
Expand Down Expand Up @@ -62,6 +63,11 @@ impl Server {
process::exit(0);
}

if let Some(path) = server.config().pause() {
Pause::run(path).context("run pause")?;
process::exit(0);
}

server.config().validate().context("validate config")?;

Self::init().context("init self")?;
Expand Down Expand Up @@ -207,6 +213,10 @@ impl Server {
}
};

if let Some(pause) = Pause::maybe_shared() {
pause.stop();
}

debug!("Starting grandchildren cleanup task");
reaper
.kill_grandchildren(handled_sig)
Expand Down
Loading

0 comments on commit cad12b1

Please sign in to comment.