From 87fb6c7286771eec415c092f5574f910a0e30af9 Mon Sep 17 00:00:00 2001 From: Ling Wang Date: Mon, 21 Oct 2024 19:25:33 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat:=20Add=20IPC=20between=20Pytho?= =?UTF-8?q?n=20and=20main=20process!?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 26 +++ Cargo.toml | 1 + src/ui/code_editor.rs | 85 ++++++- src/ui/ipc.rs | 155 +++++++++++++ src/ui/main.rs | 55 +++-- src/ui/mod.rs | 30 ++- src/ui/pyenv.rs | 17 +- src/ui/terminal.rs | 224 +++++++++++++----- src/ui/ui_cli_exec.rs | 525 ++++++++++++++++++++++++++++++++++++++++++ src/ui/util.rs | 34 +++ 10 files changed, 1059 insertions(+), 93 deletions(-) create mode 100644 src/ui/ipc.rs create mode 100644 src/ui/ui_cli_exec.rs create mode 100644 src/ui/util.rs diff --git a/Cargo.lock b/Cargo.lock index 07dfaa6..f39a9d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1134,6 +1134,12 @@ dependencies = [ "libloading 0.8.5", ] +[[package]] +name = "doctest-file" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562" + [[package]] name = "document-features" version = "0.2.10" @@ -2036,6 +2042,19 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "interprocess" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2f4e4a06d42fab3e85ab1b419ad32b09eab58b901d40c57935ff92db3287a13" +dependencies = [ + "doctest-file", + "libc", + "recvmsg", + "widestring", + "windows-sys 0.52.0", +] + [[package]] name = "inventory" version = "0.3.15" @@ -3444,6 +3463,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "recvmsg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175" + [[package]] name = "redox_syscall" version = "0.2.16" @@ -4136,6 +4161,7 @@ dependencies = [ "egui_extras", "enigo", "image", + "interprocess", "inventory", "nix 0.29.0", "portable-pty", diff --git a/Cargo.toml b/Cargo.toml index aea1970..ad27bac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ inventory = "0.3.15" eframe = "0.28.1" egui_extras = "0.28.1" termwiz = "0.22.0" +interprocess = "2.2.1" [toolchain] channel = "nightly" diff --git a/src/ui/code_editor.rs b/src/ui/code_editor.rs index 5be5400..df86c36 100644 --- a/src/ui/code_editor.rs +++ b/src/ui/code_editor.rs @@ -1,29 +1,68 @@ -use std::{fs::File, io::Write}; +use std::{ + fs::File, + io::Write, + sync::mpsc::{channel, Sender}, + thread::{spawn, JoinHandle}, +}; use eframe::egui::{Context, Id, ScrollArea, TextEdit, TextStyle, Ui, Window}; use egui_extras::syntax_highlighting::{highlight, CodeTheme}; +use interprocess::local_socket::Stream; -use crate::{err, impl_sub_window}; +use crate::{err, impl_sub_window, info}; -use super::{main::SubWindow, pyenv::PyEnv}; +use super::{main::SubWindow, pyenv::PyEnv, util::get_main_virt}; + +enum PyEnyMsg { + Code(String), + End, +} pub struct CodeEditor { code: String, save_to: String, - pyenv: PyEnv, + handler: Option>, + tx: Option>, } impl Default for CodeEditor { fn default() -> Self { - Self { + let mut res = Self { code: "\ from tester import * print('Hello, world!') -".to_string(), +s=Shell() +e=ui.UiExec(s, 1) +" + .to_string(), save_to: "".to_string(), - pyenv: PyEnv::default(), - } + handler: None, + tx: None, + }; + + let (tx, rx) = channel(); + + let handler = spawn(move || { + let mut pyenv = PyEnv::build(&get_main_virt()); + loop { + match rx.recv() { + Ok(PyEnyMsg::Code(code)) => { + pyenv.run_code(&code); + } + Ok(PyEnyMsg::End) => { + break; + } + Err(e) => { + err!("CodeEditor IPC error: {}", e); + } + } + } + }); + + res.tx = Some(tx); + res.handler = Some(handler); + res } } @@ -49,7 +88,15 @@ impl CodeEditor { }); } fn run_code(&mut self) { - self.pyenv.run_code(&self.code); + match &self.tx { + Some(tx) => { + let code = self.code.clone(); + let _ = tx.send(PyEnyMsg::Code(code)); + } + None => { + err!("CodeEditor IPC error: tx is None"); + } + } } fn save_code(&mut self, ui: &mut Ui) { if ui.button("Write to file").clicked() { @@ -98,13 +145,31 @@ impl CodeEditor { } } +impl Drop for CodeEditor { + fn drop(&mut self) { + match &self.tx { + Some(tx) => { + let _ = tx.send(PyEnyMsg::End); + } + None => {} + } + let _ = self.handler.take(); + } +} + impl SubWindow for CodeEditor { fn show(&mut self, ctx: &Context, title: &str, id: &Id, open: &mut bool) { - let window = Window::new(title).id(id.to_owned()).open(open).resizable([true, true]); + let window = Window::new(title) + .id(id.to_owned()) + .open(open) + .resizable([true, true]); window.show(ctx, |ui| { self.show(ui); }); } + fn on_ipc(&mut self, msg: &str, _conn: &mut Stream) { + info!("CodeEditor received IPC message: {}", msg); + } } impl_sub_window!(CodeEditor, "CodeEditor"); diff --git a/src/ui/ipc.rs b/src/ui/ipc.rs new file mode 100644 index 0000000..75d1785 --- /dev/null +++ b/src/ui/ipc.rs @@ -0,0 +1,155 @@ +//! IPC between windows and sub-windows. +//! +//! As the program may run in multiple processes, or in same process but different address space, +//! we need a way to communicate between windows and sub-windows. + +use std::{ + error::Error, + io::{BufRead, BufReader, ErrorKind, Write}, +}; + +use interprocess::local_socket::{ + prelude::*, traits::ListenerExt, GenericNamespaced, Listener, ListenerNonblockingMode, + ListenerOptions, Stream, ToNsName, +}; +use serde::{Deserialize, Serialize}; + +use crate::{err, log, ui::util::get_sub_virt}; + +use super::{main::MyApp, util::get_main_virt}; + +#[derive(Serialize, Deserialize)] +pub struct WindowIpcMessage { + pub window_id: u64, + pub message: String, +} + +pub fn init_ipc() -> Result> { + let sock_name = get_sock_name(get_main_virt().to_owned(), None); + let sock_name = sock_name.to_ns_name::()?; + + let opts = ListenerOptions::new() + .name(sock_name) + .nonblocking(ListenerNonblockingMode::Both); + + let listener = opts.create_sync()?; + Ok(listener) +} + +impl MyApp { + pub(super) fn handle_ipc(&mut self) { + for m in self.listener.incoming() { + match m { + Err(e) if e.kind() == ErrorKind::WouldBlock => { + break; + } + Err(e) => { + err!("IPC error: {}", e); + continue; + } + Ok(mut stream) => { + log!("IPC message received"); + let mut reader = BufReader::new(&mut stream); + let mut buf = String::new(); + let _ = reader.read_line(&mut buf); + log!("Received IPC message: {}", buf); + // let msg: WindowIpcMessage = match from_reader(&mut reader) { + let msg: WindowIpcMessage = match serde_json::from_str(&buf) { + Ok(m) => m, + Err(e) => { + err!("IPC message decode error: {}", e); + continue; + } + }; + log!( + "Received IPC message from window {}: {}", + msg.window_id, + msg.message + ); + for w in self.sub_windows.iter_mut() { + if w.idx == msg.window_id { + w.window.on_ipc(&msg.message, &mut stream); + } + } + } + } + } + } +} + +pub fn get_sock_name(base_name: String, window_id: Option) -> String { + log!( + "Try get sock name with base name: {}, window id: {:?}", + base_name, + window_id + ); + if let Some(id) = window_id { + format!("{}_{}.sock", base_name, id) + } else { + base_name + ".sock" + } +} + +pub fn parse_sock_id(sock_name: &str) -> Option { + log!("Try parse sock id from name: {}", sock_name); + let parts = sock_name.split('_'); + let mut id = parts.rev().next()?; + if id.ends_with(".sock") { + id = &id[..id.len() - 5]; + } + log!("Parsed sock id: {}", id); + id.parse::().ok() +} + +pub fn sub_send_msg(msg: WindowIpcMessage) { + log!("IPC send message to main window, no response"); + let name = get_sock_name(get_sub_virt().to_owned(), None); + log!("Try connected to main window {}", name); + let name = name.to_ns_name::().unwrap(); + let mut conn = Stream::connect(name).unwrap(); + log!( + "Send message to main window {}, {}", + msg.window_id, + msg.message + ); + serde_json::to_writer(&mut conn, &msg).unwrap(); + conn.write(b"\n").unwrap(); +} + +pub fn sub_send_msg_wait_msg(msg: WindowIpcMessage) -> Result> { + log!("IPC send message to main window, with wait for response"); + let name = get_sock_name(get_sub_virt().to_owned(), None); + log!("Try connected to main window {}", name); + let name = name.to_ns_name::()?; + let mut conn = Stream::connect(name)?; + log!( + "Send message to main window {}, {}", + msg.window_id, + msg.message + ); + serde_json::to_writer(&mut conn, &msg)?; + conn.write(b"\n")?; + log!("Wait for response from main window"); + let mut reader = BufReader::new(&mut conn); + let mut buf = String::new(); + reader.read_line(&mut buf)?; + let msg: WindowIpcMessage = serde_json::from_str(&buf)?; + log!( + "Received message from main window {}, {}", + msg.window_id, + msg.message + ); + Ok(msg) +} + +pub fn main_send_msg(msg: WindowIpcMessage, conn: &mut Stream) { + log!("IPC send message to sub window, no response"); + log!( + "Send message to sub window {}, {}", + msg.window_id, + msg.message + ); + conn.write(serde_json::to_string(&msg).unwrap().as_bytes()) + .unwrap(); + conn.write(b"\n").unwrap(); +} diff --git a/src/ui/main.rs b/src/ui/main.rs index d07e1a6..5d0326e 100644 --- a/src/ui/main.rs +++ b/src/ui/main.rs @@ -1,14 +1,17 @@ //! Main UI render for the APP -use std::error::Error; +use std::{error::Error, thread::sleep, time::Duration}; use eframe::{ egui::{Context, Id, SidePanel, Ui, ViewportBuilder}, run_native, App, Frame, NativeOptions, }; +use interprocess::local_socket::{Listener, Stream}; use crate::{info, util::anybase::AnyBase}; +use super::ipc::init_ipc; + /// Main UI struct /// /// NOTICE! NOTICE! This will block the main thread. If you have any other tasks to do, please run them in a separate thread. @@ -35,17 +38,19 @@ impl AppUi { } } -struct SubWindowHolder { - window: Box, - id: Id, - title: String, - open: bool, +pub struct SubWindowHolder { + pub window: Box, + pub id: Id, + pub idx: u64, + pub title: String, + pub open: bool, } -struct MyApp { +pub struct MyApp { sub_window_creator: Vec>, // We ensure that the sub windows only work in the main thread - sub_windows: Vec, + pub(super) sub_windows: Vec, sub_window_idx: usize, + pub(super) listener: Listener, } impl Default for MyApp { @@ -60,6 +65,7 @@ impl Default for MyApp { sub_window_creator, sub_windows: Vec::new(), sub_window_idx: 0, + listener: init_ipc().unwrap(), } } } @@ -71,21 +77,29 @@ impl MyApp { for creator in &self.sub_window_creator { let name = creator.name(); if ui.button(name).clicked() { - let title = format!("{}: {}", name, self.sub_window_idx); - let id = Id::new(self.sub_window_idx); - info!("Try create sub window: {}", title); + let idx = self.sub_window_idx as u64; self.sub_window_idx += 1; - self.sub_windows.push(SubWindowHolder { + let title = format!("{}: {}", name, idx); + let id = Id::new(idx); + info!("Try create sub window: {}", title); + sleep(Duration::from_millis(5)); + let sub_windows = &mut self.sub_windows; + sub_windows.push(SubWindowHolder { window: creator.open(), id, + idx, title, open: true, }); } } - self.sub_windows.retain(|w| w.open); - for w in &mut self.sub_windows { - w.window.show(ctx, &w.title, &w.id, &mut w.open); + { + sleep(Duration::from_millis(5)); + let sub_windows = &mut self.sub_windows; + sub_windows.retain(|w| w.open); + for w in sub_windows.iter_mut() { + w.window.show(ctx, &w.title, &w.id, &mut w.open); + } } }); } @@ -94,6 +108,7 @@ impl MyApp { impl App for MyApp { fn update(&mut self, ctx: &Context, frame: &mut Frame) { let _ = frame; + self.handle_ipc(); SidePanel::right("SubWindow Panel") .default_width(200.0) .show(ctx, |ui| { @@ -111,6 +126,10 @@ pub trait SubWindow: AnyBase { /// Show the window, this will be called every frame. Your window is identified by the `id` parameter. /// However, that doesn't mean you should change the title, as this contains the window number, useful for the user. fn show(&mut self, ctx: &Context, title: &str, id: &Id, open: &mut bool); + + /// For IPC, this will be called when the IPC message is received. + /// The message is a string, you can use it as you like. + fn on_ipc(&mut self, msg: &str, stream: &mut Stream); } #[doc(hidden)] @@ -120,15 +139,15 @@ pub trait SubWindowCreator { } /// Snippet to register a sub window -/// +/// /// # Arguments /// $name: The struct name of the sub window /// $window_name: The name of the window, will become the title of the window -/// +/// /// # Example /// `impl_sub_window!(TestUiStruct, "TestUiName");` /// where TestUiStruct implements SubWindow trait and Default trait -/// +/// /// # Notice /// If you found rust-analyzer gives "invalid metavariable expression", this is a nightly feature, you can ignore it. It will work. /// The problem is on `${concat()}` macro. Just suppress it. diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 56bf727..82ca292 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -1,12 +1,34 @@ //! Parts to handle the UI render part -//! +//! //! If only use CLI feats, this part can be ignored; //! however, GUI part may need this part show what's going on. //! Or, use to create needle for GUI part. -//! +//! -pub mod main; +// pub mod cli_hooker; pub mod code_editor; +pub mod main; pub mod pyenv; pub mod terminal; -pub mod cli_hooker; \ No newline at end of file +pub mod ui_cli_exec; +pub mod util; +pub mod ipc; + +mod test_window; + +use pyo3::{ + types::{PyModule, PyModuleMethods}, + wrap_pyfunction, Bound, PyResult, +}; +use ui_cli_exec::UiExec; +use util::__init_sub_virt__; + +pub fn register_ui(parent_module: &Bound<'_, PyModule>) -> PyResult<()> { + let m = PyModule::new_bound(parent_module.py(), "ui")?; + m.add_class::()?; + + m.add_function(wrap_pyfunction!(__init_sub_virt__, &m)?)?; + + parent_module.add_submodule(&m)?; + Ok(()) +} diff --git a/src/ui/pyenv.rs b/src/ui/pyenv.rs index 76545af..fd52fba 100644 --- a/src/ui/pyenv.rs +++ b/src/ui/pyenv.rs @@ -3,18 +3,19 @@ use std::sync::LazyLock; use pyo3::{ ffi::{c_str, PyImport_AddModule, PyModule_GetDict}, prepare_freethreaded_python, - types::{PyDict, PyDictMethods}, Py, Python, + types::{PyDict, PyDictMethods}, + Py, Python, }; -use crate::err; +use crate::{err, info}; pub struct PyEnv { globals: Py, locals: Py, } -impl Default for PyEnv { - fn default() -> Self { +impl PyEnv { + pub fn build(virt_info: &str) -> Self { static GLOBALS: LazyLock> = LazyLock::new(|| { prepare_freethreaded_python(); Python::with_gil(|py| unsafe { @@ -29,21 +30,25 @@ impl Default for PyEnv { prepare_freethreaded_python(); Python::with_gil(|py| { let globals = GLOBALS.clone_ref(py); - globals.bind(py).set_item("__virt__", 1).unwrap(); // Force to copy globals dict, otherwise drop one PyEnv will affect others + globals.bind(py).set_item("__virt__", virt_info).unwrap(); // Force to copy globals dict, otherwise drop one PyEnv will affect others let locals = unsafe { let mptr = globals.as_ptr(); Py::from_owned_ptr(py, mptr) }; - Self { globals, locals } + let mut res = Self { globals, locals }; + res.run_code("import tester\ntester.ui.__init_sub_virt__(__virt__)\n"); + res }) } } impl PyEnv { pub fn run_code(&mut self, code: &str) { + // Python mat block the main thread... Use async outside plz... Python::with_gil(|py| { let globals = self.globals.bind(py); let locals = self.locals.bind(py); + info!("Run code: ```\n{}\n```", code); let e = py.run_bound(code, Some(globals), Some(locals)); if let Err(e) = e { err!("Run code error: {}", e); diff --git a/src/ui/terminal.rs b/src/ui/terminal.rs index ac9ff03..6020703 100644 --- a/src/ui/terminal.rs +++ b/src/ui/terminal.rs @@ -1,11 +1,9 @@ use std::{ cmp::{max, min}, error::Error, + io::{BufRead, BufReader, ErrorKind}, ops::Range, - sync::{ - mpsc::{self, Receiver, Sender}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, thread::{sleep, spawn, JoinHandle}, time::Duration, }; @@ -13,21 +11,19 @@ use std::{ use eframe::egui::{ scroll_area::ScrollBarVisibility, Context, FontId, Id, RichText, ScrollArea, Ui, Window, }; +use interprocess::local_socket::{ + traits::ListenerExt, GenericNamespaced, Listener, ListenerNonblockingMode, ListenerOptions, + Stream, ToNsName, +}; -use crate::{consts::DURATION, impl_sub_window}; - -use super::main::SubWindow; +use crate::{consts::DURATION, err, impl_sub_window, info, log, ui::ipc::main_send_msg}; -pub enum TerminalMessage { - Data(Vec), - Close, -} +use super::{ipc::WindowIpcMessage, main::SubWindow, ui_cli_exec::UiCliIpcMsg}; pub struct Terminal { size: (u32, u32), buf: Arc>>, - send: Arc>>>, // Only Close will be sent, indicating that the terminal is closed - recv: Arc>>>, + listener: Arc>>, handle: Option>, stop: Arc>, } @@ -40,15 +36,13 @@ impl Default for Terminal { let mut res = Terminal { size: (24, 80), buf, - send: Arc::new(Mutex::new(None)), - recv: Arc::new(Mutex::new(None)), + listener: Arc::new(Mutex::new(None)), handle: None, stop: Arc::new(Mutex::new(false)), }; let buf = res.buf.clone(); - let send = res.send.clone(); - let recv = res.recv.clone(); + let listener = res.listener.clone(); let stop = res.stop.clone(); let handler = spawn(move || loop { @@ -59,28 +53,66 @@ impl Default for Terminal { break; } } - let mut recv_o = recv.lock().unwrap(); - if recv_o.is_none() { - continue; - } - let recv = recv_o.as_mut().unwrap(); - match recv.try_recv() { - Ok(TerminalMessage::Data(data)) => { - let mut buf = buf.lock().unwrap(); - buf.extend(data); + { + let mut _listener = listener.lock().unwrap(); + if _listener.is_none() { + continue; } - Ok(TerminalMessage::Close) => { - let mut send_o = send.lock().unwrap(); - let send = send_o.as_ref().unwrap(); - send.send(TerminalMessage::Close).unwrap(); - recv_o.take(); - send_o.take(); - let mut buf = buf.lock().unwrap(); - buf.clear(); - buf.extend(b"Hello, world!\n"); + let listener = _listener.as_mut().unwrap(); + let mut end = false; + for m in listener.incoming() { + match m { + Err(e) if e.kind() == ErrorKind::WouldBlock => { + break; + } + Err(e) => { + err!("IPC error: {}", e); + continue; + } + Ok(m) => { + let mut reader = BufReader::new(m); + let mut b = String::new(); + match reader.read_line(&mut b) { + // Err(e) if e.kind() == ErrorKind::WouldBlock => { + // break; + // } + Err(e) => { + err!("IPC Read error: {}", e); + continue; + } + Ok(_) => {} + }; + info!("Terminal got message: {:?}", b); + let msg: UiCliIpcMsg = match serde_json::from_str(&b) { + Err(e) => { + err!("IPC message decode error: {}", e); + continue; + } + Ok(m) => m, + }; + match msg { + UiCliIpcMsg::BUILD(_) => { + err!("Already hooked, unexpected message"); + unreachable!(); + } + UiCliIpcMsg::REBUILD(_) => { + err!("Unexpected message, you should not send REBUILD if not broken"); + unreachable!(); + } + UiCliIpcMsg::CONSOLE(data) => { + let mut buf = buf.lock().unwrap(); + buf.extend(data); + } + UiCliIpcMsg::EXIT => { + end = true; + break; + } + } + } + } } - Err(_) => { - continue; + if end { + _listener.take(); } } }); @@ -95,27 +127,38 @@ impl Drop for Terminal { fn drop(&mut self) { let mut stop = self.stop.lock().unwrap(); *stop = true; - let send = self.send.lock().unwrap(); - if let Some(send) = send.as_ref() { - send.send(TerminalMessage::Close).unwrap(); - } } } impl Terminal { - pub fn try_hook( - &mut self, - recv: Receiver, - ) -> Result, Box> { - let mut recv_o = self.recv.lock().unwrap(); - if recv_o.is_some() { + pub fn try_hook(&mut self, msg: WindowIpcMessage) -> Result<(), Box> { + let listener = self.listener.clone(); + let mut listener = listener.lock().unwrap(); + if listener.is_some() { return Err("Terminal already hooked".into()); } - *recv_o = Some(recv); - let (rsend, rrecv) = mpsc::channel(); - let mut send = self.send.lock().unwrap(); - *send = Some(rsend); - Ok(rrecv) + let msg = msg.message; + let msg = serde_json::from_str::(&msg)?; + let sock_name = match msg { + UiCliIpcMsg::BUILD(sock_name) => sock_name, + _ => return Err("Invalid message".into()), + }; + let sock_name = sock_name.to_ns_name::()?; + let opts = ListenerOptions::new() + .name(sock_name) + .nonblocking(ListenerNonblockingMode::Both); + let lst = opts.create_sync()?; + *listener = Some(lst); + Ok(()) + } + pub fn un_hook(&mut self) -> Result<(), Box> { + let listener = self.listener.clone(); + let mut listener = listener.lock().unwrap(); + if listener.is_none() { + return Ok(()); + } + let _listener = listener.take().unwrap(); + Ok(()) } } @@ -140,8 +183,6 @@ impl Terminal { } fn render_row(&mut self, ui: &mut Ui, rows: Range) { self.gc_bufs(); - // ui.label("01234567890123456789012345678901234567890123456789012345678901234567890123456789"); - // Use this to make the line above the same length as the terminal ui.label(" "); let buf = self.buf.clone(); let buf = buf.lock().unwrap(); @@ -152,7 +193,7 @@ impl Terminal { .filter(|&(_, &c)| c == b'\n') .map(|(i, _)| i), ); - if !buf[buf.len() - 1] == b'\n' { + if !buf.is_empty() && !(buf[buf.len() - 1] == b'\n') { data.push(buf.len()); } let begin_line = min(data.len() - 1, rows.start); @@ -205,6 +246,79 @@ impl SubWindow for Terminal { self.show(ui); }); } + fn on_ipc(&mut self, msg: &str, _conn: &mut Stream) { + log!("Terminal got message: {}", msg); + let msg = serde_json::from_str::(msg); + match msg { + Ok(msg) => match msg { + UiCliIpcMsg::BUILD(s_name) => { + let sock_name = s_name.clone().to_ns_name::().unwrap(); + let opts = ListenerOptions::new() + .name(sock_name) + .nonblocking(ListenerNonblockingMode::Both); + let listener = opts.create_sync().unwrap(); + let self_listener = self.listener.clone(); + let mut self_listener = self_listener.lock().unwrap(); + if self_listener.is_some() { + err!("Already hooked, unexpected message"); + main_send_msg( + WindowIpcMessage { + window_id: 0, + message: serde_json::to_string(&UiCliIpcMsg::EXIT).unwrap(), + }, + _conn, + ); + return; + } + *self_listener = Some(listener); + info!("Terminal hooked with sock {}", s_name); + { + let mut buf = self.buf.lock().unwrap(); + buf.clear(); + } + main_send_msg( + WindowIpcMessage { + window_id: 0, + message: serde_json::to_string(&UiCliIpcMsg::BUILD(s_name)).unwrap(), + }, + _conn, + ); + } + UiCliIpcMsg::REBUILD(s_name) => { + let sock_name = s_name.clone().to_ns_name::().unwrap(); + let opts = ListenerOptions::new() + .name(sock_name) + .nonblocking(ListenerNonblockingMode::Both); + let listener = opts.create_sync().unwrap(); + let self_listener = self.listener.clone(); + let mut self_listener = self_listener.lock().unwrap(); + *self_listener = Some(listener); + log!("Terminal rebuild with sock {}", s_name); + main_send_msg( + WindowIpcMessage { + window_id: 0, + message: serde_json::to_string(&UiCliIpcMsg::REBUILD(s_name)).unwrap(), + }, + _conn, + ); + } + UiCliIpcMsg::CONSOLE(msg) => { + let mut buf = self.buf.lock().unwrap(); + buf.extend(msg); + // The other way has some issue now... Use the main ipc as a workaround + // err!("Unexpected message, you should not send CONSOLE message at handshake"); + // unreachable!(); + } + UiCliIpcMsg::EXIT => { + err!("Unexpected message, you should not send EXIT message at handshake"); + unreachable!(); + } + }, + Err(e) => { + err!("Terminal IPC message decode error: {}", e); + } + } + } } impl_sub_window!(Terminal, "Terminal"); diff --git a/src/ui/ui_cli_exec.rs b/src/ui/ui_cli_exec.rs new file mode 100644 index 0000000..28a3576 --- /dev/null +++ b/src/ui/ui_cli_exec.rs @@ -0,0 +1,525 @@ +use std::{ + error::Error, + io::Write, + sync::Arc, + thread::{sleep, spawn, JoinHandle}, + time::{Duration, Instant}, +}; + +use eframe::egui::mutex::Mutex; +use interprocess::local_socket::{prelude::*, GenericNamespaced, Stream, ToNsName}; +use pyo3::{exceptions::PyRuntimeError, pyclass, pymethods, PyRefMut, PyResult}; +use serde::{Deserialize, Serialize}; + +use crate::{ + cli::tty::{DynTty, Tty, WrapperTty}, + consts::DURATION, + err, + exec::cli_api::{CliTestApi, SudoCliTestApi}, + impl_any, info, log, + pythonapi::shell_like::{handle_wrap, py_tty_inner, PyTty, PyTtyInner}, + ui::{ipc::parse_sock_id, util::get_sub_virt}, + util::{anybase::heap_raw, util::rand_string}, +}; + +use super::ipc::{get_sock_name, sub_send_msg, sub_send_msg_wait_msg, WindowIpcMessage}; + +pub struct UiCliTester { + inner: Arc>>, + stream: Arc>>, + buf: Arc>>, + handle: Option>, + exit: Arc>, +} + +#[derive(Serialize, Deserialize)] +pub enum UiCliIpcMsg { + BUILD(String), // build ipc stream with given socket + REBUILD(String), // rebuild ipc stream with given socket, so why pipe will broken? + CONSOLE(Vec), // transfer a string + EXIT, // end ipc +} + +fn send_once(sock_name: &str, msg: &UiCliIpcMsg) -> Result<(), Box> { + log!("Send to term: {}", serde_json::to_string(msg)?); + let sock_name = sock_name.to_ns_name::()?; + let mut conn = Stream::connect(sock_name)?; + serde_json::to_writer(&conn, msg)?; + conn.write(b"\n")?; + Ok(()) +} + +pub fn rebuild_ipc(ori_name: String) -> Result> { + let window_id = parse_sock_id(&ori_name); + let window_id = match window_id { + Some(x) => x, + None => return Err("Invalid window id".into()), + }; + let sock = get_sock_name(get_sub_virt() + &rand_string(5) + "cliui", Some(window_id)); + let handshake = sub_send_msg_wait_msg(WindowIpcMessage { + window_id, + message: serde_json::to_string(&UiCliIpcMsg::REBUILD(sock))?, + })?; + let handshake: UiCliIpcMsg = serde_json::from_str(&handshake.message)?; + let sock = match handshake { + UiCliIpcMsg::REBUILD(sock) => sock, + _ => return Err("Invalid handshake".into()), + }; + log!("IPC rebuilt: {:?}", sock); + Ok(sock) +} + +impl UiCliTester { + pub fn begin_ipc(&mut self, window_id: u64, sock: String) -> Result<(), Box> { + let self_stream = self.stream.clone(); + let mut self_stream = self_stream.lock(); + if self_stream.is_some() { + return Err("Already has IPC".into()); + } + let handshake = sub_send_msg_wait_msg(WindowIpcMessage { + window_id, + message: serde_json::to_string(&UiCliIpcMsg::BUILD(sock))?, + })?; + let handshake: UiCliIpcMsg = serde_json::from_str(&handshake.message)?; + let sock = match handshake { + UiCliIpcMsg::BUILD(sock) => sock, + _ => return Err("Invalid handshake".into()), + }; + info!("IPC connected: {:?}", sock); + *self_stream = Some(sock); + Ok(()) + } + pub fn end_ipc(&mut self) { + let self_stream = self.stream.clone(); + let mut self_stream = self_stream.lock(); + let stream = self_stream.take(); + let stream = match stream { + Some(x) => x, + None => return, + }; + send_once(&stream, &UiCliIpcMsg::EXIT).unwrap(); + } + pub fn build(inner: DynTty, term_id: u64) -> Result> { + let mut res = Self { + inner: Arc::new(Mutex::new(Some(inner))), + stream: Arc::new(Mutex::new(None)), + buf: Arc::new(Mutex::new(Vec::new())), + handle: None, + exit: Arc::new(Mutex::new(false)), + }; + + let sock_name = get_sub_virt() + &rand_string(5) + "cliui"; + let sock_name = get_sock_name(sock_name, Some(term_id)); + res.begin_ipc(term_id, sock_name)?; + + let inner = res.inner.clone(); + let stream = res.stream.clone(); + let buf = res.buf.clone(); + let exit = res.exit.clone(); + let handle = spawn(move || loop { + sleep(Duration::from_millis(DURATION)); + { + let exit = exit.lock(); + if *exit { + break; + } + } + let data; + { + let mut inner = inner.lock(); + if inner.is_none() { + continue; + } + let inner = inner.as_mut().unwrap(); + let d = inner.read(); + if let Err(e) = d { + err!("read error: {}", e); + break; + } + data = d.unwrap(); + let mut buf = buf.lock(); + buf.extend(data.clone()); + } + { + let mut _stream = stream.lock(); + if _stream.is_none() { + continue; + } + if data.is_empty() { + continue; + } + // let mut retry = 0; + sub_send_msg(WindowIpcMessage { + window_id: term_id, + message: serde_json::to_string(&UiCliIpcMsg::CONSOLE(data.clone())).unwrap(), + }); + // loop { + // sleep(Duration::from_millis(DURATION)); + // let stream = _stream.as_mut().unwrap(); + // let e = match send_once(&stream, &UiCliIpcMsg::CONSOLE(data.clone())) { + // Ok(_) => break, + // Err(e) => e, + // }; + // err!("IPC Send error: {}", e); + // if retry > 3 { + // break; + // } + // warn!("IPC broken, try to rebuild. Times: {}", retry); + // let ori_name = _stream.take(); + // let ori_name = match ori_name { + // Some(x) => x, + // None => { + // err!("No IPC"); + // break; + // } + // }; + // *_stream = Some(rebuild_ipc(ori_name).unwrap()); + // retry += 1; + // continue; + // } + } + }); + + res.handle = Some(handle); + + Ok(res) + } +} + +impl UiCliTester { + fn run_command(&mut self, command: &String) -> Result<(), Box> { + info!("Write to shell: {}", command); + sleep(Duration::from_millis(DURATION)); + let inner = self.inner.clone(); + let mut inner = inner.lock(); + let inner = inner.as_mut().unwrap(); + inner.write(command.as_bytes()) + } + fn __exit(&mut self) { + self.end_ipc(); + let mut exit = self.exit.lock(); + *exit = true; + } +} +// impl Drop for UiCliTester { +// fn drop(&mut self) { +// self.__exit(); +// } +// } +impl_any!(UiCliTester); +impl Tty for UiCliTester { + fn read(&mut self) -> Result, Box> { + let mut buf = self.buf.lock(); + let res = buf.clone(); + buf.clear(); + Ok(res) + } + fn read_line(&mut self) -> Result, Box> { + let mut buf = self.buf.lock(); + let res = buf.clone(); + buf.clear(); + Ok(res) + } + fn write(&mut self, data: &[u8]) -> Result<(), Box> { + let inner = self.inner.clone(); + let mut inner = inner.lock(); + let inner = inner.as_mut().unwrap(); + inner.write(data) + } +} +impl WrapperTty for UiCliTester { + fn exit(mut self) -> DynTty { + self.__exit(); + self.inner.lock().take().unwrap() + } + + fn inner_ref(&self) -> &DynTty { + // &self.inner + panic!("You should not call this method"); + } + + fn inner_mut(&mut self) -> &mut DynTty { + // &mut self.inner + panic!("You should not call this method"); + } +} + +impl UiCliTester { + fn filter_assert_echo(&self, expected: &str, buf: &mut Vec) -> Result<(), Box> { + let expected = "echo ".to_owned() + expected; + let expected = expected.as_bytes(); + for (pos, window) in buf.windows(expected.len()).enumerate() { + if window == expected { + let i = pos + expected.len(); + buf.drain(0..=i); + break; + } + } + Ok(()) + } + + fn kmp_next(&self, target: &Vec) -> Vec { + let mut next = vec![0usize; target.len()]; + let mut i = 1; + let mut j = 0; + while i < target.len() - 1 { + if target[i] == target[j] { + next[i] = j + 1; + i += 1; + j += 1; + } else { + if j == 0 { + next[i] = 0; + i += 1; + } else { + j = next[j - 1] as usize; + } + } + } + next + } + + fn kmp_search(&self, content: &Vec, target: &Vec) -> Option { + let next = self.kmp_next(target); + let mut i = 0; + let mut j = 0; + let mut res = None; + while i < content.len() && j < target.len() { + if content[i] == target[j] { + if res.is_none() { + res = Some(i); + } + i += 1; + j += 1; + if j >= target.len() { + break; + } + } else { + if j == 0 { + i += 1; + } else { + j = next[j - 1]; + } + res = None; + } + } + res + } + + fn do_wait_serial( + &mut self, + expected: &str, + timeout: u32, + filter_echo_back: Option<&str>, + ) -> Result> { + let begin = Instant::now(); + info!("Waiting for string {{{}}}", expected); + loop { + sleep(Duration::from_millis(DURATION)); + let buf = self.buf.lock(); + let mut buf = buf.clone(); + if let Some(filter) = filter_echo_back { + self.filter_assert_echo(filter, &mut buf)?; + } + // The reason we compare raw u8 is... What if the data is corrupted? + let target = expected.as_bytes(); + if let Some(pos) = self.kmp_search(&buf, &target.to_vec()) { + info!("Matched string {{{}}}", expected); + let res = buf.split_off(pos + target.len()); + let res = String::from_utf8(res)?; + buf.drain(0..pos + target.len()); + return Ok(res); + } + if begin.elapsed().as_secs() > timeout as u64 { + err!( + "Timeout! Expected: {}, Actual: {}", + expected, + String::from_utf8(buf.clone()).unwrap() + ); + return Err(Box::::from("Timeout")); + } + } + } +} +impl CliTestApi for UiCliTester { + fn wait_serial(&mut self, expected: &str, timeout: u32) -> Result> { + self.do_wait_serial(expected, timeout, None) + } + fn script_run(&mut self, script: &str, timeout: u32) -> Result> { + let mut cmd = script.to_owned(); + let echo_content_rand = rand_string(8); + + cmd += " && echo "; + cmd += &echo_content_rand; + cmd += " \n"; + + self.run_command(&cmd)?; + + self.do_wait_serial(&echo_content_rand, timeout, Some(&echo_content_rand)) + } + fn background_script_run(&mut self, script: &str) -> Result<(), Box> { + let mut cmd = script.to_owned(); + cmd += " &\n"; + self.run_command(&cmd) + } + fn writeln(&mut self, script: &str) -> Result<(), Box> { + let mut cmd = script.to_owned(); + cmd += "\n"; + self.run_command(&cmd) + } +} +impl SudoCliTestApi for UiCliTester { + fn script_sudo( + &mut self, + script: &str, + timeout: u32, + ) -> Result> { + let mut cmd = String::from("sudo "); + cmd += script; + cmd += " "; + self.script_run(&cmd, timeout) + } +} + +pub fn handle_uiclitester(inner: &mut Option, term_id: u64) -> PyResult<()> { + if inner.is_none() { + return Err(PyRuntimeError::new_err( + "You must define at least one valid object", + )); + } + let mut be_wrapped = inner.take().unwrap(); + let tty = be_wrapped.safe_take()?; + let tty = Box::into_inner(tty); + let res = + UiCliTester::build(tty, term_id).map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + let res = Box::new(res); + *inner = Some(py_tty_inner(heap_raw(res))); + return Ok(()); +} + +#[pyclass(extends=PyTty, subclass)] +pub struct UiExec {} + +#[pymethods] +impl UiExec { + #[new] + #[pyo3(signature = (be_wrapped, term_id))] + fn py_new(be_wrapped: &mut PyTty, term_id: u64) -> PyResult<(Self, PyTty)> { + let mut inner = None; + + handle_wrap(&mut inner, Some(be_wrapped))?; + handle_uiclitester(&mut inner, term_id)?; + + Ok((UiExec {}, PyTty::build(inner.unwrap()))) + } + #[pyo3(signature = (script, timeout=None))] + fn script_run( + mut self_: PyRefMut<'_, Self>, + script: &str, + timeout: Option, + ) -> PyResult { + let self_ = self_.as_mut(); + let inner = self_.inner.get_mut()?; + let inner = inner.as_any_mut(); + + let timeout = timeout.unwrap_or(30); + + if inner.downcast_ref::().is_some() { + let inner = inner.downcast_mut::().unwrap(); + let res = inner + .script_run(script, timeout) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + Ok(res) + } else { + Err(PyRuntimeError::new_err( + "Can't find the right object to run the script", + )) + } + } + + fn background_script_run(mut self_: PyRefMut<'_, Self>, script: &str) -> PyResult<()> { + let self_ = self_.as_mut(); + let inner = self_.inner.get_mut()?; + let inner = inner.as_any_mut(); + + if inner.downcast_ref::().is_some() { + let inner = inner.downcast_mut::().unwrap(); + inner + .background_script_run(script) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + } else { + return Err(PyRuntimeError::new_err( + "Can't find the right object to run the script", + )); + } + Ok(()) + } + + fn writeln(mut self_: PyRefMut<'_, Self>, script: &str) -> PyResult<()> { + let self_ = self_.as_mut(); + let inner = self_.inner.get_mut()?; + let inner = inner.as_any_mut(); + + if inner.downcast_ref::().is_some() { + let inner = inner.downcast_mut::().unwrap(); + inner + .writeln(script) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + } else { + return Err(PyRuntimeError::new_err( + "Can't find the right object to run the script", + )); + } + Ok(()) + } + + #[pyo3(signature = (expected, timeout=None))] + fn wait_serial( + mut self_: PyRefMut<'_, Self>, + expected: &str, + timeout: Option, + ) -> PyResult { + let self_ = self_.as_mut(); + let inner = self_.inner.get_mut()?; + let inner = inner.as_any_mut(); + + let timeout = timeout.unwrap_or(30); + + if inner.downcast_ref::().is_some() { + let inner = inner.downcast_mut::().unwrap(); + let res = inner + .wait_serial(expected, timeout) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + Ok(res) + } else { + Err(PyRuntimeError::new_err( + "Can't find the right object to run the script", + )) + } + } + + #[pyo3(signature = (script, timeout=None))] + fn script_sudo( + mut self_: PyRefMut<'_, Self>, + script: &str, + timeout: Option, + ) -> PyResult { + let self_ = self_.as_mut(); + let inner = self_.inner.get_mut()?; + let inner = inner.as_any_mut(); + + let timeout = timeout.unwrap_or(30); + + if inner.downcast_ref::().is_some() { + let inner = inner.downcast_mut::().unwrap(); + let res = inner + .script_sudo(script, timeout) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + Ok(res) + } else { + Err(PyRuntimeError::new_err( + "Can't find the right object to run the script", + )) + } + } +} diff --git a/src/ui/util.rs b/src/ui/util.rs new file mode 100644 index 0000000..ffc8588 --- /dev/null +++ b/src/ui/util.rs @@ -0,0 +1,34 @@ +use std::sync::LazyLock; + +use pyo3::pyfunction; + +use crate::{info, util::util::rand_string}; + +/// To generate a base sock name inside main gui app. +#[doc(hidden)] +pub fn get_main_virt() -> &'static str { + static VIRT: LazyLock = LazyLock::new(|| { + let res = String::from("ter") + &rand_string(4); + info!("A python sub exec created with {}", res); + res + }); + VIRT.as_str() +} + +// This will only be write once on init time, so no lock is needed. +static mut SUB_VIRT: String = String::new(); + +#[doc(hidden)] +#[pyfunction] +pub fn __init_sub_virt__(s: &str) { + unsafe { + SUB_VIRT = String::from(s); + } +} + +pub fn get_sub_virt() -> String { + unsafe { + #[allow(static_mut_refs)] + return SUB_VIRT.clone(); + } +}