diff --git a/Cargo.lock b/Cargo.lock index 5212716..86d105c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -785,6 +785,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1054,9 +1064,9 @@ checksum = "20145670ba436b55d91fc92d25e71160fbfbdd57831631c8d7d36377a476f1cb" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "base64", "bytes", @@ -1072,13 +1082,16 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "native-tls", "once_cell", "percent-encoding", "pin-project-lite", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-native-tls", @@ -1101,6 +1114,7 @@ dependencies = [ "clap", "futures-util", "openssl", + "reqwest", "rust_socketio", "serde", "serde_json", @@ -1108,6 +1122,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "walkdir", ] [[package]] @@ -1176,12 +1191,30 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64", +] + [[package]] name = "ryu" version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.23" @@ -1346,6 +1379,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "system-configuration" version = "0.5.1" @@ -1627,6 +1666,15 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.14" @@ -1689,6 +1737,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71d857dc86794ca4c280d616f7da00d2dbfd8cd788846559a6813e6aa4b54ee" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -1772,9 +1830,9 @@ checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "wasm-streams" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" dependencies = [ "futures-util", "js-sys", @@ -1809,6 +1867,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 3099026..ee6e867 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,5 +23,7 @@ tracing-subscriber = "0.3.18" openssl = { version = "0.10", features = ["vendored"] } termion = "3.0.0" +walkdir = "2.4.0" +reqwest = { version = "0.11.24", features = ["multipart"] } diff --git a/src/agent.rs b/src/agent.rs index 07505fd..94a42d2 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -1,15 +1,20 @@ -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Agent { pub api_key: String, + pub robot_server_url: String, } pub struct AgentBuilder { api_key: Option, + robot_server_url: Option, } impl Default for AgentBuilder { fn default() -> Self { - Self { api_key: None } + Self { + api_key: None, + robot_server_url: None, + } } } @@ -18,10 +23,14 @@ impl AgentBuilder { self.api_key = Some(api_key); self } - + pub fn robot_server_url(mut self, robot_server_url: String) -> Self { + self.robot_server_url = Some(robot_server_url); + self + } pub fn build(self) -> Agent { Agent { api_key: self.api_key.unwrap(), + robot_server_url: self.robot_server_url.unwrap(), } } } diff --git a/src/commands/docker.rs b/src/commands/docker.rs index 2997f50..93dab70 100644 --- a/src/commands/docker.rs +++ b/src/commands/docker.rs @@ -14,16 +14,21 @@ use tracing::{error, info}; use serde::{Deserialize, Serialize}; +use crate::agent; use crate::{ commands::{RobotJob, RobotJobResult}, store::{JobManager, Jobs}, + utils::files::{ + create_job_data_dir, get_files_in_directory_recursively, get_job_data_path, + get_merklebot_data_path, upload_content, + }, }; -pub async fn execute_launch(socket: Client, robot_job: RobotJob, jobs: Jobs) { +pub async fn execute_launch(socket: Client, robot_job: RobotJob, agent: agent::Agent, jobs: Jobs) { let args = serde_json::from_str::(&robot_job.args).unwrap(); info!("launching docker job {:?}", args); let docker_launch = DockerLaunch { args }; - let robot_job_result = match docker_launch.execute(robot_job.clone(), jobs).await { + let robot_job_result = match docker_launch.execute(robot_job.clone(), agent, jobs).await { Ok(result) => { info!("job successfully executed"); result @@ -48,6 +53,7 @@ pub struct DockerLaunchArgs { pub container_name: String, pub custom_cmd: Option, pub save_logs: Option, + pub store_data: Option, pub network_mode: String, pub ports: Vec, pub volumes: Vec, @@ -68,6 +74,7 @@ impl DockerLaunch { pub async fn execute( &self, robot_job: RobotJob, + agent: agent::Agent, jobs: Jobs, ) -> Result { info!("launching docker with image {}", self.args.image); @@ -92,6 +99,24 @@ impl DockerLaunch { volumes.push(format!("{}:{}", volume_pair.key, volume_pair.value)) } + match self.args.store_data { + Some(true) => { + // 1. create folder for the job + let create_job_dir_res = create_job_data_dir(&robot_job.id); + match create_job_dir_res { + Ok(path) => { + info!("Sharing dir {}", path); + // 2. Share folder as volume + volumes.push(format!("{}:{}", path, "/merklebot/job_data/")); + } + _ => { + error!("Couldn't create shared dir for job {}", robot_job.id); + } + } + } + _ => {} + } + let mut config = bollard::container::Config::<&str> { image: Some(&self.args.image), env: Some(self.args.env.iter().map(|s| s as &str).collect()), @@ -222,11 +247,38 @@ impl DockerLaunch { .await?; let robot_job_result = RobotJobResult { - job_id: robot_job.id, + job_id: robot_job.id.clone(), status: String::from("done"), logs: concatenated_logs, }; + let job_data_path = get_job_data_path(&robot_job.id); + match &self.args.store_data { + Some(true) => { + match get_files_in_directory_recursively(&job_data_path) { + //TODO: change to path + Ok(paths) => { + info!("{:?}", paths); + for path in paths { + let path_str = path.as_path().display().to_string(); + let key = path_str.replace(&get_merklebot_data_path(), ""); + upload_content( + agent.robot_server_url.clone(), + path, + key, + robot_job.id.clone(), + agent.api_key.clone(), + ) + .await; + } + } + _ => { + error!("Can't get resulting paths"); + } + } + } + _ => {} + } Ok(robot_job_result) } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 62f866d..3de6bec 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use tracing::info; +use crate::agent; use crate::store; mod docker; @@ -20,9 +21,9 @@ pub struct StartTunnelReq { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct MessageToRobot{ +pub struct MessageToRobot { pub job_id: String, - pub content: String + pub content: String, } #[derive(Clone, Serialize, Deserialize)] @@ -42,12 +43,17 @@ pub struct RobotJob { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RobotStartTunnelResponse{ +pub struct RobotStartTunnelResponse { is_ok: bool, - error: Option + error: Option, } -pub async fn launch_new_job(payload: Payload, socket: Client, jobs: store::Jobs) { +pub async fn launch_new_job( + payload: Payload, + socket: Client, + agent: agent::Agent, + jobs: store::Jobs, +) { match payload { Payload::String(str) => { let robot_job: RobotJob = serde_json::from_str(&str).unwrap(); @@ -63,7 +69,12 @@ pub async fn launch_new_job(payload: Payload, socket: Client, jobs: store::Jobs) robot_job.status.clone(), ); let shared_jobs = Arc::clone(&jobs); - tokio::spawn(docker::execute_launch(socket, robot_job, shared_jobs)); + tokio::spawn(docker::execute_launch( + socket, + robot_job, + agent, + shared_jobs, + )); } _ => {} } @@ -91,9 +102,9 @@ pub async fn start_tunnel_messanger( } pub async fn start_tunnel(payload: Payload, socket: Client, jobs: store::Jobs) { - let mut ack_result = RobotStartTunnelResponse{ + let mut ack_result = RobotStartTunnelResponse { is_ok: false, - error: None + error: None, }; match payload { Payload::String(str) => { @@ -130,7 +141,6 @@ pub async fn start_tunnel(payload: Payload, socket: Client, jobs: store::Jobs) { }; } - pub async fn message_to_robot(payload: Payload, socket: Client, jobs: store::Jobs) { match payload { Payload::String(str) => { @@ -141,7 +151,9 @@ pub async fn message_to_robot(payload: Payload, socket: Client, jobs: store::Job match job_manager.get_job_or_none(&message.job_id) { Some(job) => { - if let Some(channel) = job_manager.get_channel_to_job_tx_by_job_id(&message.job_id){ + if let Some(channel) = + job_manager.get_channel_to_job_tx_by_job_id(&message.job_id) + { channel.send(message.content).unwrap(); }; } diff --git a/src/main.rs b/src/main.rs index 48aad00..81eea18 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,14 +28,18 @@ mod cli; mod commands; mod develop; mod store; +mod utils; async fn main_normal(args: Args) -> Result<(), Box> { - let agent: Agent = AgentBuilder::default().api_key(args.api_key).build(); + let agent: Agent = AgentBuilder::default() + .api_key(args.api_key) + .robot_server_url(args.robot_server_url.clone()) + .build(); info!("Starting agent: {:?}", agent); let jobs: store::Jobs = Arc::new(Mutex::new(store::JobManager::default())); let mut socket = ClientBuilder::new(args.robot_server_url) - .auth(json!({"api_key": agent.api_key, "session_type": "ROBOT"})) + .auth(json!({"api_key": agent.api_key.clone(), "session_type": "ROBOT"})) .on("error", |err, _| { async move { eprintln!("Error: {:#?}", err) }.boxed() }); @@ -43,7 +47,9 @@ async fn main_normal(args: Args) -> Result<(), Box> { let shared_jobs: Arc> = Arc::clone(&jobs); socket = socket.on("new_job", move |payload: Payload, socket: Client| { let shared_jobs = Arc::clone(&shared_jobs); - async move { commands::launch_new_job(payload, socket, shared_jobs).await }.boxed() + let agent = agent.clone(); + async move { commands::launch_new_job(payload, socket, agent, shared_jobs).await } + .boxed() }) } @@ -57,11 +63,15 @@ async fn main_normal(args: Args) -> Result<(), Box> { } { let shared_jobs: Arc> = Arc::clone(&jobs); - socket = socket.on("message_to_robot", move |payload: Payload, socket: Client| { - info!("Start tunnel request"); - let shared_jobs = Arc::clone(&shared_jobs); - async move { commands::message_to_robot(payload, socket, shared_jobs).await }.boxed() - }) + socket = socket.on( + "message_to_robot", + move |payload: Payload, socket: Client| { + info!("Start tunnel request"); + let shared_jobs = Arc::clone(&shared_jobs); + async move { commands::message_to_robot(payload, socket, shared_jobs).await } + .boxed() + }, + ) } let socket = socket.connect().await.expect("Connection failed"); diff --git a/src/store.rs b/src/store.rs index 0f9c4b7..9633182 100644 --- a/src/store.rs +++ b/src/store.rs @@ -76,11 +76,12 @@ impl JobManager { None => None, } } - pub fn get_channel_to_job_tx_by_job_id(&self, job_id: &String) -> Option>{ + pub fn get_channel_to_job_tx_by_job_id( + &self, + job_id: &String, + ) -> Option> { match self.data.get(job_id) { - Some(job) => { - Some(job.channel_to_job_tx.clone()) - }, + Some(job) => Some(job.channel_to_job_tx.clone()), None => None, } } diff --git a/src/utils/files.rs b/src/utils/files.rs new file mode 100644 index 0000000..2488dc8 --- /dev/null +++ b/src/utils/files.rs @@ -0,0 +1,81 @@ +use reqwest; +use std::env; +use std::fs; +use std::path::{Path, PathBuf}; +use tracing::error; +use tracing::info; +use walkdir; + +fn get_home_dir() -> String { + match env::var_os("HOME") { + Some(val) => val.to_str().unwrap().to_string(), + None => return String::from(""), + } +} + +pub fn get_merklebot_data_path() -> String { + let path = get_home_dir() + "/.merklebot/"; + path +} + +pub fn get_job_data_path(job_id: &str) -> String { + let path = get_merklebot_data_path() + "job-" + job_id + "/"; + path +} + +pub fn create_job_data_dir(job_id: &str) -> std::io::Result { + let path = get_job_data_path(job_id); + fs::create_dir_all(path.clone())?; + Ok(path) +} + +pub fn get_files_in_directory_recursively(path: &str) -> Result, walkdir::Error> { + let mut result_files: Vec = Vec::new(); + for entry in walkdir::WalkDir::new(path) { + match entry { + Ok(entry) => match (entry.metadata()) { + Ok(md) => { + if md.is_file() { + result_files.push(entry.path().to_path_buf()); + } + } + _ => {} + }, + _ => {} + } + } + + Ok(result_files) +} + +pub async fn upload_content( + robot_server_url: String, + path: PathBuf, + key: String, + job_id: String, + api_key: String, +) { + let splitted_path: Vec<&str> = key.split("/").collect(); + let filename = splitted_path.last().unwrap().to_owned(); + info!("splitted path: {:?}", splitted_path); + let client = reqwest::Client::new(); + let url = reqwest::Url::parse(&(robot_server_url + "/upload_content")).unwrap(); + info!("{:?}", url); + let file_fs = fs::read(path).unwrap(); + let part = reqwest::multipart::Part::bytes(file_fs).file_name(filename.to_owned()); + let form = reqwest::multipart::Form::new() + .text("key", key) + .text("job_id", job_id) + .text("api_key", api_key) + .part("file", part); + + match client.post(url).multipart(form).send().await { + Ok(res) => { + let res_txt = res.text().await.unwrap_or("no message".to_string()); + info!("content upload res: {:?}", res_txt); + } + Err(_) => { + error!("Error while uploading content"); + } + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..d3ab969 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod files;