From 0a6af9e72cc7ea7cd95e8fda1c81dbf0890d072a Mon Sep 17 00:00:00 2001 From: Bas Zalmstra Date: Mon, 11 Jan 2021 23:19:19 +0100 Subject: [PATCH] feat(language server): removed async code and switched to lsp_server --- crates/mun_language_server/Cargo.toml | 6 +- crates/mun_language_server/src/cancelation.rs | 7 + crates/mun_language_server/src/conversion.rs | 4 +- crates/mun_language_server/src/dispatcher.rs | 164 ++++++ crates/mun_language_server/src/lib.rs | 20 +- crates/mun_language_server/src/main_loop.rs | 497 ++++++++---------- crates/mun_language_server/src/protocol.rs | 8 - .../src/protocol/connection.rs | 106 ---- .../mun_language_server/src/protocol/error.rs | 14 - .../src/protocol/message.rs | 290 ---------- .../mun_language_server/src/protocol/stdio.rs | 36 -- crates/mun_language_server/src/workspace.rs | 3 +- crates/mun_language_server/tests/support.rs | 65 +-- crates/mun_vfs/src/monitor.rs | 18 +- 14 files changed, 460 insertions(+), 778 deletions(-) create mode 100644 crates/mun_language_server/src/dispatcher.rs delete mode 100644 crates/mun_language_server/src/protocol.rs delete mode 100644 crates/mun_language_server/src/protocol/connection.rs delete mode 100644 crates/mun_language_server/src/protocol/error.rs delete mode 100644 crates/mun_language_server/src/protocol/message.rs delete mode 100644 crates/mun_language_server/src/protocol/stdio.rs diff --git a/crates/mun_language_server/Cargo.toml b/crates/mun_language_server/Cargo.toml index 0fb4fe8f3..a3e332def 100644 --- a/crates/mun_language_server/Cargo.toml +++ b/crates/mun_language_server/Cargo.toml @@ -16,13 +16,12 @@ categories = ["game-development", "mun"] [dependencies] rustc-hash="1.1.0" -lsp-types = "0.74" +lsp-types = "0.86.0" +lsp-server = "0.5.0" log = "0.4" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -async-std = "1.6" -futures = "0.3" anyhow = "1.0" thiserror = "1.0" salsa = "0.15.0" @@ -35,6 +34,7 @@ mun_target = { version = "=0.2.0", path = "../mun_target" } mun_syntax = { version = "=0.2.0", path = "../mun_syntax" } mun_diagnostics = { version = "=0.1.0", path = "../mun_diagnostics" } crossbeam-channel = "0.5.0" +parking_lot="0.11.1" paths = {path="../mun_paths", package="mun_paths"} [dev-dependencies] diff --git a/crates/mun_language_server/src/cancelation.rs b/crates/mun_language_server/src/cancelation.rs index c343120b9..0eff65b0e 100644 --- a/crates/mun_language_server/src/cancelation.rs +++ b/crates/mun_language_server/src/cancelation.rs @@ -1,3 +1,5 @@ +use std::error::Error; + /// An error signifying a cancelled operation. pub struct Canceled { // This is here so that you cannot construct a Canceled @@ -29,3 +31,8 @@ impl std::fmt::Debug for Canceled { } impl std::error::Error for Canceled {} + +/// Returns true if the specified error is of type [`Canceled`] +pub(crate) fn is_canceled(e: &(dyn Error + 'static)) -> bool { + e.downcast_ref::().is_some() +} diff --git a/crates/mun_language_server/src/conversion.rs b/crates/mun_language_server/src/conversion.rs index b51f57289..439fea79f 100644 --- a/crates/mun_language_server/src/conversion.rs +++ b/crates/mun_language_server/src/conversion.rs @@ -63,8 +63,8 @@ pub fn convert_unit( ) -> lsp_types::Position { let line_col = line_index.line_col(range); lsp_types::Position { - line: line_col.line.into(), - character: line_col.col.into(), + line: line_col.line, + character: line_col.col, } } diff --git a/crates/mun_language_server/src/dispatcher.rs b/crates/mun_language_server/src/dispatcher.rs new file mode 100644 index 000000000..5365c9843 --- /dev/null +++ b/crates/mun_language_server/src/dispatcher.rs @@ -0,0 +1,164 @@ +use crate::cancelation::is_canceled; +use crate::from_json; +use crate::main_loop::LanguageServerState; +use anyhow::Result; +use serde::de::DeserializeOwned; +use serde::Serialize; + +/// A helper struct to ergonomically dispatch LSP requests to functions. +pub(crate) struct RequestDispatcher<'a> { + state: &'a mut LanguageServerState, + request: Option, +} + +impl<'a> RequestDispatcher<'a> { + /// Constructs a new dispatcher for the specified request + pub fn new(state: &'a mut LanguageServerState, request: lsp_server::Request) -> Self { + RequestDispatcher { + state, + request: Some(request), + } + } + + /// Try to dispatch the event as the given Request type. + pub fn on( + &mut self, + f: fn(&mut LanguageServerState, R::Params) -> Result, + ) -> Result<&mut Self> + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + 'static, + R::Result: Serialize + 'static, + { + let (id, params) = match self.parse::() { + Some(it) => it, + None => return Ok(self), + }; + + let result = f(self.state, params); + let response = result_to_response::(id, result); + self.state.respond(response); + Ok(self) + } + + /// Tries to parse the request as the specified type. If the request is of the specified type, + /// the request is transferred and any subsequent call to this method will return None. If an + /// error is encountered during parsing of the request parameters an error is send to the + /// client. + fn parse(&mut self) -> Option<(lsp_server::RequestId, R::Params)> + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + 'static, + { + let req = match &self.request { + Some(req) if req.method == R::METHOD => self.request.take().unwrap(), + _ => return None, + }; + + match from_json(R::METHOD, req.params) { + Ok(params) => Some((req.id, params)), + Err(err) => { + let response = lsp_server::Response::new_err( + req.id, + lsp_server::ErrorCode::InvalidParams as i32, + err.to_string(), + ); + self.state.respond(response); + None + } + } + } + + /// Wraps-up the dispatcher. If the the request was not handled, report back that this is an + /// unknown request. + pub fn finish(&mut self) { + if let Some(req) = self.request.take() { + log::error!("unknown request: {:?}", req); + let response = lsp_server::Response::new_err( + req.id, + lsp_server::ErrorCode::MethodNotFound as i32, + "unknown request".to_string(), + ); + self.state.respond(response); + } + } +} + +pub(crate) struct NotificationDispatcher<'a> { + state: &'a mut LanguageServerState, + notification: Option, +} + +impl<'a> NotificationDispatcher<'a> { + /// Constructs a new dispatcher for the specified request + pub fn new(state: &'a mut LanguageServerState, notification: lsp_server::Notification) -> Self { + NotificationDispatcher { + state, + notification: Some(notification), + } + } + + /// Try to dispatch the event as the given Notification type. + pub fn on( + &mut self, + f: fn(&mut LanguageServerState, N::Params) -> Result<()>, + ) -> Result<&mut Self> + where + N: lsp_types::notification::Notification + 'static, + N::Params: DeserializeOwned + Send + 'static, + { + let notification = match self.notification.take() { + Some(it) => it, + None => return Ok(self), + }; + let params = match notification.extract::(N::METHOD) { + Ok(it) => it, + Err(notification) => { + self.notification = Some(notification); + return Ok(self); + } + }; + f(self.state, params)?; + Ok(self) + } + + /// Wraps-up the dispatcher. If the notification was not handled, log an error. + pub fn finish(&mut self) { + if let Some(notification) = &self.notification { + if !notification.method.starts_with("$/") { + log::error!("unhandled notification: {:?}", notification); + } + } + } +} + +/// Converts the specified results of an LSP request into an LSP response handling any errors that +/// may have occurred. +fn result_to_response( + id: lsp_server::RequestId, + result: Result, +) -> lsp_server::Response +where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + 'static, + R::Result: Serialize + 'static, +{ + match result { + Ok(resp) => lsp_server::Response::new_ok(id, &resp), + Err(e) => { + if is_canceled(&*e) { + lsp_server::Response::new_err( + id, + lsp_server::ErrorCode::ContentModified as i32, + "content modified".to_string(), + ) + } else { + lsp_server::Response::new_err( + id, + lsp_server::ErrorCode::InternalError as i32, + e.to_string(), + ) + } + } + } +} diff --git a/crates/mun_language_server/src/lib.rs b/crates/mun_language_server/src/lib.rs index f3819d951..9b3e9697b 100644 --- a/crates/mun_language_server/src/lib.rs +++ b/crates/mun_language_server/src/lib.rs @@ -6,9 +6,9 @@ mod config; mod conversion; mod db; mod diagnostics; +mod dispatcher; mod main_loop; mod project_manifest; -pub mod protocol; mod workspace; pub use config::Config; @@ -33,14 +33,14 @@ pub fn to_json(value: T) -> Result { } /// Main entry point for the language server -pub async fn run_server_async() -> Result<()> { +pub fn run_server() -> Result<()> { log::info!("language server started"); // Setup IO connections - let mut connection = protocol::Connection::stdio(); + let (connection, io_threads) = lsp_server::Connection::stdio(); // Wait for a client to connect - let (initialize_id, initialize_params) = connection.initialize_start().await?; + let (initialize_id, initialize_params) = connection.initialize_start()?; let initialize_params = from_json::("InitializeParams", initialize_params)?; @@ -57,9 +57,7 @@ pub async fn run_server_async() -> Result<()> { let initialize_result = serde_json::to_value(initialize_result).unwrap(); - connection - .initialize_finish(initialize_id, initialize_result) - .await?; + connection.initialize_finish(initialize_id, initialize_result)?; if let Some(client_info) = initialize_params.client_info { log::info!( @@ -122,12 +120,8 @@ pub async fn run_server_async() -> Result<()> { config }; - main_loop(connection, config).await?; + main_loop(connection, config)?; + io_threads.join()?; Ok(()) } - -/// Main entry point for the language server -pub fn run_server() -> Result<()> { - async_std::task::block_on(run_server_async()) -} diff --git a/crates/mun_language_server/src/main_loop.rs b/crates/mun_language_server/src/main_loop.rs index 96022fe6f..c1a343c36 100644 --- a/crates/mun_language_server/src/main_loop.rs +++ b/crates/mun_language_server/src/main_loop.rs @@ -1,41 +1,45 @@ +use crate::dispatcher::{NotificationDispatcher, RequestDispatcher}; use crate::{ analysis::{Analysis, AnalysisSnapshot, Cancelable}, change::AnalysisChange, config::Config, conversion::{convert_range, convert_uri, url_from_path_with_drive_lowercasing}, - protocol::{Connection, Message, Notification, Request, RequestId}, - Result, -}; -use async_std::sync::RwLock; -use futures::{ - channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, - SinkExt, StreamExt, + to_json, Result, }; +use crossbeam_channel::{select, unbounded, Receiver, Sender}; +use lsp_server::{Connection, ReqQueue}; +use lsp_types::notification::Notification; use lsp_types::{notification::PublishDiagnostics, PublishDiagnosticsParams, Url}; +use parking_lot::RwLock; use paths::AbsPathBuf; use rustc_hash::FxHashSet; -use serde::{de::DeserializeOwned, Serialize}; -use std::{cell::RefCell, collections::HashSet, ops::Deref, sync::Arc}; +use std::time::Instant; +use std::{ops::Deref, sync::Arc}; use vfs::VirtualFileSystem; /// A `Task` is something that is send from async tasks to the entry point for processing. This /// enables synchronizing resources like the connection with the client. #[derive(Debug)] -enum Task { - Notify(Notification), +pub(crate) enum Task { + Notify(lsp_server::Notification), } #[derive(Debug)] -enum Event { - Msg(Message), +pub(crate) enum Event { Vfs(vfs::MonitorMessage), Task(Task), + Lsp(lsp_server::Message), } +pub(crate) type RequestHandler = fn(&mut LanguageServerState, lsp_server::Response); + /// State for the language server pub(crate) struct LanguageServerState { - /// The connection with the client - pub connection: ConnectionState, + /// Channel to send language server messages to the client + sender: Sender, + + /// The request queue keeps track of all incoming and outgoing requests. + request_queue: lsp_server::ReqQueue<(String, Instant), RequestHandler>, /// The configuration passed by the client pub config: Config, @@ -43,6 +47,12 @@ pub(crate) struct LanguageServerState { /// Thread pool for async execution pub thread_pool: rayon::ThreadPool, + /// Channel to send tasks to from background operations + pub task_sender: Sender, + + /// Channel to receive tasks on from background operations + pub task_receiver: Receiver, + /// The virtual filesystem that holds all the file contents pub vfs: Arc>, @@ -50,7 +60,7 @@ pub(crate) struct LanguageServerState { pub vfs_monitor: Box, /// The receiver of vfs monitor messages - pub vfs_monitor_receiver: UnboundedReceiver, + pub vfs_monitor_receiver: Receiver, /// Documents that are currently kept in memory from the client pub open_docs: FxHashSet, @@ -60,6 +70,9 @@ pub(crate) struct LanguageServerState { /// All the packages known to the server pub packages: Arc>, + + /// True if the client requested that we shut down + pub shutdown_requested: bool, } /// A snapshot of the state of the language server @@ -74,42 +87,13 @@ pub(crate) struct LanguageServerSnapshot { pub packages: Arc>, } -/// State maintained for the connection. This includes everything that is required to be able to -/// properly communicate with the client but has nothing to do with any Mun related state. -pub(crate) struct ConnectionState { - pub(crate) connection: Connection, - - next_request_id: u64, - pending_responses: HashSet, -} - -impl ConnectionState { - /// Constructs a new `ConnectionState` - fn new(connection: Connection) -> Self { - Self { - connection, - next_request_id: 0, - pending_responses: Default::default(), - } - } - - /// Constructs a new request ID and stores that we are still awaiting a response. - fn next_request_id(&mut self) -> RequestId { - self.next_request_id += 1; - let res: RequestId = self.next_request_id.into(); - let inserted = self.pending_responses.insert(res.clone()); - debug_assert!(inserted); - res - } -} - impl LanguageServerState { - pub fn new(connection: Connection, config: Config) -> Self { + pub fn new(sender: Sender, config: Config) -> Self { // Construct the virtual filesystem monitor let (vfs_monitor_sender, vfs_monitor_receiver) = unbounded::(); - let vfs_monitor_sender = RefCell::new(vfs_monitor_sender); let vfs_monitor: vfs::NotifyMonitor = vfs::Monitor::new(Box::new(move |msg| { - async_std::task::block_on(vfs_monitor_sender.borrow_mut().send(msg)) + vfs_monitor_sender + .send(msg) .expect("error sending vfs monitor message to foreground") })); let vfs_monitor = Box::new(vfs_monitor) as Box; @@ -121,135 +105,133 @@ impl LanguageServerState { .build() .expect("unable to spin up thread pool"); - // Apply the initial changes + // Construct a task channel + let (task_sender, task_receiver) = unbounded(); + + // Construct the state that will hold all the analysis and apply the initial state + let mut analysis = Analysis::new(); let mut change = AnalysisChange::new(); change.set_packages(Default::default()); change.set_roots(Default::default()); - - // Construct the state that will hold all the analysis - let mut analysis = Analysis::new(); analysis.apply_change(change); LanguageServerState { - connection: ConnectionState::new(connection), + sender, + request_queue: ReqQueue::default(), config, vfs: Arc::new(RwLock::new(Default::default())), vfs_monitor, vfs_monitor_receiver, open_docs: FxHashSet::default(), thread_pool, + task_sender, + task_receiver, analysis, packages: Arc::new(Vec::new()), + shutdown_requested: false, + } + } + + /// Blocks until a new event is received from on of the many channels the language server + /// listens to. Returns the first event that is received. + fn next_event(&self, receiver: &Receiver) -> Option { + select! { + recv(receiver) -> msg => msg.ok().map(Event::Lsp), + recv(self.vfs_monitor_receiver) -> task => Some(Event::Vfs(task.unwrap())), + recv(self.task_receiver) -> task => Some(Event::Task(task.unwrap())) } } /// Runs the language server to completion - pub async fn run(mut self) -> Result<()> { + pub fn run(mut self, receiver: Receiver) -> Result<()> { // Start by updating the current workspace self.fetch_workspaces(); - // Process events as the pass - let (task_sender, mut task_receiver) = futures::channel::mpsc::unbounded::(); - loop { - // Determine what to do next. This selects from different channels, the first message to - // arrive is returned. If an error occurs on one of the channel the main loop is shutdown - // with an error. - let event = futures::select! { - msg = self.connection.connection.receiver.next() => match msg { - Some(msg) => Event::Msg(msg), - None => return Err(anyhow::anyhow!("client exited without shutdown")), - }, - msg = self.vfs_monitor_receiver.next() => match msg { - Some(msg) => Event::Vfs(msg), - None => return Err(anyhow::anyhow!("client exited without shutdown")), - }, - task = task_receiver.next() => Event::Task(task.unwrap()), - }; - - // Handle the event - match handle_event(event, &task_sender, &mut self).await? { - LoopState::Continue => {} - LoopState::Shutdown => { - break; + while let Some(event) = self.next_event(&receiver) { + if let Event::Lsp(lsp_server::Message::Notification(notification)) = &event { + if notification.method == lsp_types::notification::Exit::METHOD { + return Ok(()); } } + self.handle_event(event)?; } Ok(()) } -} -/// Runs the main loop of the language server. This will receive requests and handle them. -pub async fn main_loop(connection: Connection, config: Config) -> Result<()> { - log::info!("initial config: {:#?}", config); - LanguageServerState::new(connection, config).run().await -} + /// Handles an event from one of the many sources that the language server subscribes to. + fn handle_event(&mut self, event: Event) -> Result<()> { + let start_time = Instant::now(); + log::info!("handling event: {:?}", event); + + // Process the incoming event + match event { + Event::Task(task) => handle_task(task, self)?, + Event::Lsp(msg) => match msg { + lsp_server::Message::Request(req) => self.on_request(req, start_time)?, + lsp_server::Message::Response(resp) => self.complete_request(resp), + lsp_server::Message::Notification(not) => self.on_notification(not)?, + }, + Event::Vfs(task) => handle_vfs_task(task, self)?, + }; -/// A `LoopState` enumerator determines the state of the main loop -enum LoopState { - Continue, - Shutdown, -} + // Process any changes to the vfs + let state_changed = self.process_vfs_changes(); + if state_changed { + let snapshot = self.snapshot(); + let task_sender = self.task_sender.clone(); + // Spawn the diagnostics in the threadpool + self.thread_pool.spawn(move || { + handle_diagnostics(snapshot, task_sender).unwrap(); + }); + } -/// Handles a received request -async fn handle_request(request: Request, state: &mut LanguageServerState) -> Result { - if state - .connection - .connection - .handle_shutdown(&request) - .await? - { - return Ok(LoopState::Shutdown); - }; - Ok(LoopState::Continue) -} + Ok(()) + } -/// Handles a received notification -async fn on_notification( - notification: Notification, - state: &mut LanguageServerState, -) -> Result { - let notification = - // When a a text document is opened - match cast_notification::(notification) { - Ok(params) => { - if let Ok(path) = convert_uri(¶ms.text_document.uri) { - state.open_docs.insert(path.clone()); - state.vfs.write().await.set_file_contents(&path, Some(params.text_document.text.into_bytes())); - } - return Ok(LoopState::Continue); - } - Err(not) => not, - }; + /// Handles a language server protocol request + fn on_request( + &mut self, + request: lsp_server::Request, + request_received: Instant, + ) -> Result<()> { + self.register_request(&request, request_received); + + // If a shutdown was requested earlier, immediately respond with an error + if self.shutdown_requested { + self.respond(lsp_server::Response::new_err( + request.id, + lsp_server::ErrorCode::InvalidRequest as i32, + "shutdown was requested".to_owned(), + )); + return Ok(()); + } + + // Dispatch the event based on the type of event + RequestDispatcher::new(self, request) + .on::(|state, _request| { + state.shutdown_requested = true; + Ok(()) + })? + .finish(); + + Ok(()) + } - // When a text document is closed - let notification = - match cast_notification::(notification) { - Ok(params) => { + /// Handles a notification from the language server client + fn on_notification(&mut self, notification: lsp_server::Notification) -> Result<()> { + NotificationDispatcher::new(self, notification) + .on::(|state, params| { if let Ok(path) = convert_uri(¶ms.text_document.uri) { - state.open_docs.remove(&path); - state.vfs_monitor.reload(&path); + state.open_docs.insert(path.clone()); + state + .vfs + .write() + .set_file_contents(&path, Some(params.text_document.text.into_bytes())); } - let params = lsp_types::PublishDiagnosticsParams { - uri: params.text_document.uri, - diagnostics: Vec::new(), - version: None, - }; - let not = build_notification::(params); - state - .connection - .connection - .sender - .try_send(not.into()) - .unwrap(); - return Ok(LoopState::Continue); - } - Err(not) => not, - }; - - let notification = - match cast_notification::(notification) { - Ok(params) => { + Ok(()) + })? + .on::(|state, params| { let lsp_types::DidChangeTextDocumentParams { text_document, content_changes, @@ -259,68 +241,102 @@ async fn on_notification( state .vfs .write() - .await .set_file_contents(&path, Some(new_content.into_bytes())); } - return Ok(LoopState::Continue); - } - Err(not) => not, - }; - - let _notification = - match cast_notification::(notification) { - Ok(params) => { + Ok(()) + })? + .on::(|state, params| { + if let Ok(path) = convert_uri(¶ms.text_document.uri) { + state.open_docs.remove(&path); + state.vfs_monitor.reload(&path); + } + // Clear any diagnostics that we may have send + state.send_notification::( + lsp_types::PublishDiagnosticsParams { + uri: params.text_document.uri, + diagnostics: Vec::new(), + version: None, + }, + ); + Ok(()) + })? + .on::(|state, params| { for change in params.changes { if let Ok(path) = convert_uri(&change.uri) { state.vfs_monitor.reload(&path); } } - return Ok(LoopState::Continue); - } - Err(not) => not, - }; + Ok(()) + })? + .finish(); + Ok(()) + } - Ok(LoopState::Continue) -} + /// Registers a request with the server. We register all these request to make sure they all get + /// handled and so we can measure the time it takes for them to complete from the point of view + /// of the client. + fn register_request(&mut self, request: &lsp_server::Request, request_received: Instant) { + self.request_queue.incoming.register( + request.id.clone(), + (request.method.clone(), request_received), + ) + } + + /// Sends a request to the client and registers the request so that we can handle the response. + pub(crate) fn send_request( + &mut self, + params: R::Params, + handler: RequestHandler, + ) { + let request = self + .request_queue + .outgoing + .register(R::METHOD.to_string(), params, handler); + self.send(request.into()); + } + + /// Sends a notification to the client + pub(crate) fn send_notification( + &mut self, + params: N::Params, + ) { + let not = lsp_server::Notification::new(N::METHOD.to_string(), params); + self.send(not.into()); + } -/// Handles an incoming event. Returns a `LoopState` state which determines whether processing -/// should continue. -async fn handle_event( - event: Event, - task_sender: &UnboundedSender, - state: &mut LanguageServerState, -) -> Result { - log::info!("handling event: {:?}", event); - - // Process the incoming event - let loop_state = match event { - Event::Task(task) => handle_task(task, state).await?, - Event::Msg(msg) => handle_lsp_message(msg, state).await?, - Event::Vfs(task) => handle_vfs_task(task, state).await?, - }; - - // Process any changes to the vfs - let state_changed = state.process_vfs_changes().await; - dbg!(state_changed); - if state_changed { - let snapshot = state.snapshot(); - let task_sender = task_sender.clone(); - // Spawn the diagnostics in the threadpool - state.thread_pool.spawn(move || { - let _result = async_std::task::block_on(handle_diagnostics(snapshot, task_sender)); - }); + /// Handles a response to a request we made. The response gets forwarded to where we made the + /// request from. + fn complete_request(&mut self, response: lsp_server::Response) { + let handler = self.request_queue.outgoing.complete(response.id.clone()); + handler(self, response) + } + + /// Sends a response to a request to the client. This method logs the time it took us to reply + /// to a request from the client. + pub(crate) fn respond(&mut self, response: lsp_server::Response) { + if let Some((_method, start)) = self.request_queue.incoming.complete(response.id.clone()) { + let duration = start.elapsed(); + log::info!("handled req#{} in {:?}", response.id, duration); + self.send(response.into()); + } } - Ok(loop_state) + /// Sends a message to the client + fn send(&mut self, message: lsp_server::Message) { + self.sender + .send(message) + .expect("error sending lsp message to the outgoing channel") + } } -/// Send all diagnostics of all files -async fn handle_diagnostics( - state: LanguageServerSnapshot, - mut sender: UnboundedSender, -) -> Cancelable<()> { - dbg!(&state.packages); +/// Runs the main loop of the language server. This will receive requests and handle them. +pub fn main_loop(connection: Connection, config: Config) -> Result<()> { + log::info!("initial config: {:#?}", config); + LanguageServerState::new(connection.sender, config).run(connection.receiver) +} +/// Send all diagnostics of all files +fn handle_diagnostics(state: LanguageServerSnapshot, sender: Sender) -> Cancelable<()> { // Iterate over all files for (idx, _package) in state.packages.iter().enumerate() { let package_id = hir::PackageId(idx as u32); @@ -331,7 +347,7 @@ async fn handle_diagnostics( // Publish all diagnostics for file in files { let line_index = state.analysis.file_line_index(file)?; - let uri = state.file_id_to_uri(file).await.unwrap(); + let uri = state.file_id_to_uri(file).unwrap(); let diagnostics = state.analysis.diagnostics(file)?; let diagnostics = { @@ -341,6 +357,7 @@ async fn handle_diagnostics( range: convert_range(d.range, &line_index), severity: Some(lsp_types::DiagnosticSeverity::Error), code: None, + code_description: None, source: Some("mun".to_string()), message: d.message, related_information: { @@ -351,7 +368,6 @@ async fn handle_diagnostics( location: lsp_types::Location { uri: state .file_id_to_uri(annotation.range.file_id) - .await .unwrap(), range: convert_range( annotation.range.value, @@ -371,52 +387,45 @@ async fn handle_diagnostics( } }, tags: None, + data: None, }); } lsp_diagnostics }; sender - .send(Task::Notify(build_notification::( - PublishDiagnosticsParams { + .send(Task::Notify(lsp_server::Notification { + method: PublishDiagnostics::METHOD.to_owned(), + params: to_json(PublishDiagnosticsParams { uri, diagnostics, version: None, - }, - ))) - .await - .unwrap(); + }) + .unwrap(), + })) + .unwrap() } } Ok(()) } /// Handles a task send by another async task -async fn handle_task(task: Task, state: &mut LanguageServerState) -> Result { +fn handle_task(task: Task, state: &mut LanguageServerState) -> Result<()> { match task { Task::Notify(notification) => { - state - .connection - .connection - .sender - .send(notification.into()) - .await? + state.send(notification.into()); } } - - Ok(LoopState::Continue) + Ok(()) } /// Handles a change to the underlying virtual file system. -async fn handle_vfs_task( - mut task: vfs::MonitorMessage, - state: &mut LanguageServerState, -) -> Result { +fn handle_vfs_task(mut task: vfs::MonitorMessage, state: &mut LanguageServerState) -> Result<()> { loop { match task { vfs::MonitorMessage::Progress { .. } => {} vfs::MonitorMessage::Loaded { files } => { - let vfs = &mut *state.vfs.write().await; + let vfs = &mut *state.vfs.write(); for (path, contents) in files { vfs.set_file_contents(&path, contents); } @@ -424,58 +433,12 @@ async fn handle_vfs_task( } // Coalesce many VFS events into a single loop turn - task = match state.vfs_monitor_receiver.try_next() { - Ok(Some(task)) => task, + task = match state.vfs_monitor_receiver.try_recv() { + Ok(task) => task, _ => break, } } - Ok(LoopState::Continue) -} - -/// Handles an incoming message via the language server protocol. -async fn handle_lsp_message(msg: Message, state: &mut LanguageServerState) -> Result { - match msg { - Message::Request(req) => handle_request(req, state).await, - Message::Response(response) => { - let removed = state.connection.pending_responses.remove(&response.id); - if !removed { - log::error!("unexpected response: {:?}", response) - } - - Ok(LoopState::Continue) - } - Message::Notification(notification) => on_notification(notification, state).await, - } -} - -/// Constructs a new notification with the specified parameters. -fn build_notification(params: N::Params) -> Notification -where - N: lsp_types::notification::Notification, - N::Params: Serialize, -{ - Notification::new(N::METHOD.to_string(), params) -} - -/// Casts a notification to the specified type. -fn cast_notification(notification: Notification) -> std::result::Result -where - N: lsp_types::notification::Notification, - N::Params: DeserializeOwned, -{ - notification.try_extract(N::METHOD) -} - -impl LanguageServerState { - /// Sends a new request to the client - pub fn send_request(&mut self, params: R::Params) { - let request = Request::new( - self.connection.next_request_id(), - R::METHOD.to_string(), - params, - ); - async_std::task::block_on(self.connection.connection.sender.send(request.into())).unwrap(); - } + Ok(()) } impl LanguageServerState { @@ -491,10 +454,10 @@ impl LanguageServerState { /// Processes any and all changes that have been applied to the virtual filesystem. Generates /// an `AnalysisChange` and applies it if there are changes. True is returned if things changed, /// otherwise false. - pub async fn process_vfs_changes(&mut self) -> bool { + pub fn process_vfs_changes(&mut self) -> bool { // Get all the changes since the last time we processed let changed_files = { - let mut vfs = self.vfs.write().await; + let mut vfs = self.vfs.write(); vfs.take_changes() }; if changed_files.is_empty() { @@ -502,7 +465,7 @@ impl LanguageServerState { } // Construct an AnalysisChange to apply to the analysis - let vfs = self.vfs.read().await; + let vfs = self.vfs.read(); let mut analysis_change = AnalysisChange::new(); let mut has_created_or_deleted_entries = false; for file in changed_files { @@ -539,8 +502,8 @@ impl LanguageServerState { impl LanguageServerSnapshot { /// Converts the specified `hir::FileId` to a `Url` - pub async fn file_id_to_uri(&self, id: hir::FileId) -> Result { - let vfs = self.vfs.read().await; + pub fn file_id_to_uri(&self, id: hir::FileId) -> Result { + let vfs = self.vfs.read(); let path = vfs.file_path(vfs::FileId(id.0)); let url = url_from_path_with_drive_lowercasing(path)?; diff --git a/crates/mun_language_server/src/protocol.rs b/crates/mun_language_server/src/protocol.rs deleted file mode 100644 index f9a723314..000000000 --- a/crates/mun_language_server/src/protocol.rs +++ /dev/null @@ -1,8 +0,0 @@ -mod connection; -mod error; -mod message; -mod stdio; - -pub use connection::Connection; -pub use error::ProtocolError; -pub use message::{Message, Notification, Request, RequestId, Response, ResponseError}; diff --git a/crates/mun_language_server/src/protocol/connection.rs b/crates/mun_language_server/src/protocol/connection.rs deleted file mode 100644 index 06490984d..000000000 --- a/crates/mun_language_server/src/protocol/connection.rs +++ /dev/null @@ -1,106 +0,0 @@ -use super::{Message, ProtocolError, Request, RequestId, Response}; -use async_std::future::{timeout, TimeoutError}; -use futures::channel::mpsc; -use futures::{SinkExt, StreamExt}; -use std::time::Duration; - -/// Represents a connection between a language server server and a language server client. -pub struct Connection { - pub sender: mpsc::Sender, - pub receiver: mpsc::Receiver, -} - -impl Connection { - /// Creates a connection that communicates over stdout and stdin. This enables inter-process - /// communication. - pub fn stdio() -> Connection { - let (sender, receiver) = super::stdio::stdio_transport(); - Connection { sender, receiver } - } - - /// Creates a pair of connected connections. This enables in-process communication, especially - /// useful for testing. - pub fn memory() -> (Connection, Connection) { - let (s1, r1) = mpsc::channel(0); - let (s2, r2) = mpsc::channel(0); - ( - Connection { - sender: s1, - receiver: r2, - }, - Connection { - sender: s2, - receiver: r1, - }, - ) - } - - /// Starts the initialization process by waiting for an initialize request from the client. - pub async fn initialize_start( - &mut self, - ) -> Result<(RequestId, serde_json::Value), ProtocolError> { - let req = match self.receiver.next().await { - Some(Message::Request(req)) => { - if req.is_initialize() { - req - } else { - return Err(ProtocolError::UnexpectedMessage { - expected: "initialize".to_owned(), - received: Some(Message::Request(req)), - }); - } - } - msg => { - return Err(ProtocolError::UnexpectedMessage { - expected: "initialize".to_owned(), - received: msg, - }) - } - }; - Ok((req.id, req.params)) - } - - /// Finishes the initialization process by sending an `InitializeResult` to the client - pub async fn initialize_finish( - &mut self, - initialize_id: RequestId, - initialize_result: serde_json::Value, - ) -> Result<(), ProtocolError> { - let resp = Response::new_ok(initialize_id, initialize_result); - self.sender.send(resp.into()).await.unwrap(); - match self.receiver.next().await { - Some(Message::Notification(n)) if n.is_initialized() => (), - m => { - return Err(ProtocolError::UnexpectedMessage { - expected: "initialized".to_owned(), - received: m, - }) - } - }; - Ok(()) - } - - /// If `req` is a `Shutdown`, responds to it and returns `true`, otherwise returns `false`. - pub async fn handle_shutdown(&mut self, req: &Request) -> Result { - if !req.is_shutdown() { - return Ok(false); - } - let resp = Response::new_ok(req.id.clone(), ()); - let _ = self.sender.send(resp.into()).await; - match timeout(Duration::from_secs(30), self.receiver.next()).await { - Ok(Some(Message::Notification(n))) if n.is_exit() => {} - Err(TimeoutError { .. }) => { - return Err(ProtocolError::Timeout { - waiting_for: "exit".to_owned(), - }) - } - Ok(m) => { - return Err(ProtocolError::UnexpectedMessage { - expected: "exit".to_owned(), - received: m, - }) - } - } - Ok(true) - } -} diff --git a/crates/mun_language_server/src/protocol/error.rs b/crates/mun_language_server/src/protocol/error.rs deleted file mode 100644 index 5d5104f78..000000000 --- a/crates/mun_language_server/src/protocol/error.rs +++ /dev/null @@ -1,14 +0,0 @@ -use super::Message; -use thiserror::Error; - -#[derive(Debug, Clone, Error)] -pub enum ProtocolError { - #[error("expected '{expected}' request, got '{received:?}'")] - UnexpectedMessage { - expected: String, - received: Option, - }, - - #[error("timeout while waiting for {waiting_for}")] - Timeout { waiting_for: String }, -} diff --git a/crates/mun_language_server/src/protocol/message.rs b/crates/mun_language_server/src/protocol/message.rs deleted file mode 100644 index 5400e8aeb..000000000 --- a/crates/mun_language_server/src/protocol/message.rs +++ /dev/null @@ -1,290 +0,0 @@ -use async_std::io; -use futures::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::fmt; - -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(untagged)] -pub enum Message { - Request(Request), - Response(Response), - Notification(Notification), -} - -impl From for Message { - fn from(request: Request) -> Message { - Message::Request(request) - } -} - -impl From for Message { - fn from(response: Response) -> Message { - Message::Response(response) - } -} - -impl From for Message { - fn from(notification: Notification) -> Message { - Message::Notification(notification) - } -} - -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[serde(transparent)] -pub struct RequestId(Id); - -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[serde(untagged)] -enum Id { - U64(u64), - String(String), -} - -impl From for RequestId { - fn from(id: u64) -> RequestId { - RequestId(Id::U64(id)) - } -} - -impl From for RequestId { - fn from(id: String) -> RequestId { - RequestId(Id::String(id)) - } -} - -impl fmt::Display for RequestId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match &self.0 { - Id::U64(id) => write!(f, "{}", id), - Id::String(id) => write!(f, "\"{}\"", id), - } - } -} - -/// A request message to describe a request between the client and the server. Every processed -/// request must send a response back to the sender of the request. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Request { - pub id: RequestId, - pub method: String, - pub params: serde_json::Value, -} - -/// A Response Message sent as a result of a `Request`. If a request doesn’t provide a result value -/// the receiver of a request still needs to return a response message to conform to the JSON RPC -/// specification. The result property of the ResponseMessage should be set to null in this case to -/// signal a successful request. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Response { - pub id: RequestId, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, -} - -/// An error object in case a request failed. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ResponseError { - pub code: i32, - pub message: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub data: Option, -} - -/// An error code indicating the error type that occurred. -#[derive(Clone, Copy, Debug)] -#[allow(unused)] -pub enum ErrorCode { - // Defined by JSON RPC - ParseError = -32700, - InvalidRequest = -32600, - MethodNotFound = -32601, - InvalidParams = -32602, - InternalError = -32603, - ServerErrorStart = -32099, - ServerErrorEnd = -32000, - ServerNotInitialized = -32002, - UnknownErrorCode = -32001, - - // Defined by the protocol. - RequestCanceled = -32800, - ContentModified = -32801, -} - -/// A notification message. A processed notification message must not send a response back. They -/// work like events. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Notification { - pub method: String, - pub params: serde_json::Value, -} - -impl Message { - /// Reads an RPC message from the given stream - pub async fn read(stream: &mut R) -> io::Result> { - let text = match read_message_string(stream).await? { - None => return Ok(None), - Some(text) => text, - }; - Ok(Some(serde_json::from_str(&text)?)) - } - - /// Writes the RPC message to the given stream - pub async fn write(self, stream: &mut R) -> io::Result<()> { - #[derive(Serialize)] - struct RpcMessage { - jsonrpc: &'static str, - #[serde(flatten)] - msg: Message, - } - let text = serde_json::to_string(&RpcMessage { - jsonrpc: "2.0", - msg: self, - })?; - write_message_string(stream, &text).await - } -} - -impl Response { - /// Constructs a `Response` object signaling the succesfull handling of a request with the - /// specified id. - pub fn new_ok(id: RequestId, result: R) -> Self { - Self { - id, - result: Some(serde_json::to_value(result).unwrap()), - error: None, - } - } - - /// Constructs a `Response` object signalling failure to handle the request with the specified - /// id - pub fn new_err(id: RequestId, code: i32, message: String) -> Self { - Self { - id, - result: None, - error: Some(ResponseError { - code, - message, - data: None, - }), - } - } -} - -impl Request { - /// Constructs a new Request object - pub fn new(id: RequestId, method: String, params: P) -> Self { - Self { - id, - method, - params: serde_json::to_value(params).unwrap(), - } - } - - /// Tries to extract the specific request parameters from this request. - pub fn try_extract(self, method: &str) -> Result<(RequestId, P), Request> { - if self.method == method { - let params = serde_json::from_value(self.params).unwrap_or_else(|err| { - panic!("Invalid request\nMethod: {}\nerror: {}", method, err) - }); - Ok((self.id, params)) - } else { - Err(self) - } - } - - pub(crate) fn is_shutdown(&self) -> bool { - self.method == "shutdown" - } - - pub(crate) fn is_initialize(&self) -> bool { - self.method == "initialize" - } -} - -impl Notification { - /// Constructs a new `Notification` from the specified method name and parameters - pub fn new(method: String, params: P) -> Self { - Self { - method, - params: serde_json::to_value(params).unwrap(), - } - } - - /// Tries to extract the specific notification parameters from this notification. - pub fn try_extract(self, method: &str) -> Result { - if self.method == method { - let params = serde_json::from_value(self.params).unwrap_or_else(|err| { - panic!("Invalid request\nMethod: {}\nerror: {}", method, err) - }); - Ok(params) - } else { - Err(self) - } - } - - pub(crate) fn is_exit(&self) -> bool { - self.method == "exit" - } - pub(crate) fn is_initialized(&self) -> bool { - self.method == "initialized" - } -} - -/// Reads an RPC message from the specified stream. -async fn read_message_string( - stream: &mut R, -) -> io::Result> { - /// Constructs an `InvalidData` error with a cause - fn invalid_data(error: impl Into>) -> io::Error { - io::Error::new(io::ErrorKind::InvalidData, error) - } - - // Loop over all headers of the incoming message. - let mut size = None; - let mut buf = String::new(); - loop { - buf.clear(); - if stream.read_line(&mut buf).await? == 0 { - return Ok(None); - } - if !buf.ends_with("\r\n") { - return Err(invalid_data(format!("malformed header: {:?}", buf))); - } - - // If there are no more headers, break to parse the rest of the message - let buf = &buf[..buf.len() - 2]; - if buf.is_empty() { - break; - } - - // If this is the `Content-Length` header, parse the size of the message - let mut parts = buf.splitn(2, ": "); - let header_name = parts.next().unwrap(); - let header_value = parts - .next() - .ok_or_else(|| invalid_data(format!("malformed header: {:?}", buf)))?; - if header_name == "Content-Length" { - size = Some(header_value.parse::().map_err(invalid_data)?); - } - } - - let size: usize = size.ok_or_else(|| invalid_data("no Content-Length".to_owned()))?; - let mut buf = buf.into_bytes(); - buf.resize(size, 0); - stream.read_exact(&mut buf).await?; - let buf = String::from_utf8(buf).map_err(invalid_data)?; - log::debug!("< {}", buf); - Ok(Some(buf)) -} - -/// Writes an RPC message to the specified stream. -async fn write_message_string(stream: &mut R, msg: &str) -> io::Result<()> { - log::debug!("> {}", msg); - let header = format!("Content-Length: {}\r\n\r\n", msg.len()); - stream.write_all(header.as_bytes()).await?; - stream.write_all(msg.as_bytes()).await?; - stream.flush().await?; - Ok(()) -} diff --git a/crates/mun_language_server/src/protocol/stdio.rs b/crates/mun_language_server/src/protocol/stdio.rs deleted file mode 100644 index 1ac030bef..000000000 --- a/crates/mun_language_server/src/protocol/stdio.rs +++ /dev/null @@ -1,36 +0,0 @@ -use super::Message; -use async_std::io::BufReader; -use futures::{channel::mpsc, SinkExt, StreamExt}; - -/// Constructs a communication channel over stdin (input) and stdout (output) -pub(crate) fn stdio_transport() -> (mpsc::Sender, mpsc::Receiver) { - let (writer_sender, mut writer_receiver) = mpsc::channel::(0); - let (mut reader_sender, reader_receiver) = mpsc::channel::(0); - - // Receive messages over the channel and forward them to stdout - async_std::task::spawn(async move { - let mut stdout = async_std::io::stdout(); - while let Some(msg) = writer_receiver.next().await { - msg.write(&mut stdout).await.unwrap(); - } - }); - - // Receive data over stdin and forward to the application - async_std::task::spawn(async move { - let mut stdin = BufReader::new(async_std::io::stdin()); - while let Some(msg) = Message::read(&mut stdin).await.unwrap() { - let is_exit = match &msg { - Message::Notification(n) => n.is_exit(), - _ => false, - }; - - reader_sender.send(msg).await.unwrap(); - - if is_exit { - break; - } - } - }); - - (writer_sender, reader_receiver) -} diff --git a/crates/mun_language_server/src/workspace.rs b/crates/mun_language_server/src/workspace.rs index 454b4af42..890624421 100644 --- a/crates/mun_language_server/src/workspace.rs +++ b/crates/mun_language_server/src/workspace.rs @@ -51,6 +51,7 @@ impl LanguageServerState { lsp_types::RegistrationParams { registrations: vec![registration], }, + |_, _| {}, ); } @@ -114,7 +115,7 @@ impl LanguageServerState { // Iterate over all files and find to which source directory they belong, including their // relative path - let vfs = &*async_std::task::block_on(self.vfs.read()); + let vfs = &*self.vfs.read(); for (file_id, path) in vfs.iter() { if let Some((idx, relative_path)) = source_dirs diff --git a/crates/mun_language_server/tests/support.rs b/crates/mun_language_server/tests/support.rs index ca716aa3c..f49f84007 100644 --- a/crates/mun_language_server/tests/support.rs +++ b/crates/mun_language_server/tests/support.rs @@ -1,7 +1,6 @@ -use async_std::future::timeout; -use futures::{SinkExt, StreamExt}; +use crossbeam_channel::{after, select}; +use lsp_server::{Connection, Message, Notification, Request}; use lsp_types::{notification::Exit, request::Shutdown}; -use mun_language_server::protocol::{Connection, Message, Notification, Request}; use mun_language_server::{main_loop, Config}; use paths::AbsPathBuf; use serde::Serialize; @@ -12,7 +11,7 @@ use std::time::Duration; /// An object that runs the language server main loop and enables sending and receiving messages /// to and from it. pub struct Server { - next_request_id: u64, + next_request_id: i32, worker: Option>, client: Connection, _temp_path: tempdir::TempDir, @@ -31,9 +30,7 @@ impl Server { .expect("temp_path is not an absolute path"), ); let worker = std::thread::spawn(move || { - async_std::task::block_on(async move { - main_loop(connection, config).await.unwrap(); - }) + main_loop(connection, config).unwrap(); }); Self { @@ -45,19 +42,19 @@ impl Server { } /// Sends a request to the main loop and expects the specified value to be returned - async fn assert_request( + fn assert_request( &mut self, params: R::Params, expected_response: Value, ) where R::Params: Serialize, { - let result = self.send_request::(params).await; + let result = self.send_request::(params); assert_eq!(result, expected_response); } /// Sends a request to main loop, returning the response - async fn send_request(&mut self, params: R::Params) -> Value + fn send_request(&mut self, params: R::Params) -> Value where R::Params: Serialize, { @@ -65,32 +62,28 @@ impl Server { self.next_request_id += 1; let r = Request::new(id.into(), R::METHOD.to_string(), params); - self.send_and_receive(r).await + self.send_and_receive(r) } /// Sends an LSP notification to the main loop. - async fn notification(&mut self, params: N::Params) + fn notification(&mut self, params: N::Params) where N::Params: Serialize, { let r = Notification::new(N::METHOD.to_string(), params); - self.send_notification(r).await + self.send_notification(r) } /// Sends a server notification to the main loop - async fn send_notification(&mut self, not: Notification) { - self.client - .sender - .send(Message::Notification(not)) - .await - .unwrap(); + fn send_notification(&mut self, not: Notification) { + self.client.sender.send(Message::Notification(not)).unwrap(); } /// Sends a request to the main loop and receives its response - async fn send_and_receive(&mut self, r: Request) -> Value { + fn send_and_receive(&mut self, r: Request) -> Value { let id = r.id.clone(); - self.client.sender.send(r.into()).await.unwrap(); - while let Some(msg) = self.recv().await { + self.client.sender.send(r.into()).unwrap(); + while let Some(msg) = self.recv() { match msg { Message::Request(req) => panic!( "did not expect a request as a response to a request: {:?}", @@ -113,26 +106,24 @@ impl Server { } /// Receives a message from the message or timeout. - async fn recv(&mut self) -> Option { - let duration = Duration::from_secs(60); - timeout(duration, self.client.receiver.next()) - .await - .unwrap() + fn recv(&mut self) -> Option { + let timeout = Duration::from_secs(120); + select! { + recv(self.client.receiver) -> msg => msg.ok(), + recv(after(timeout)) -> _ => panic!("timed out"), + } } } impl Drop for Server { fn drop(&mut self) { - // Send a shutdown request - async_std::task::block_on(async { - // Send the proper shutdown sequence to ensure the main loop terminates properly - self.assert_request::((), Value::Null).await; - self.notification::(()).await; + // Send the proper shutdown sequence to ensure the main loop terminates properly + self.assert_request::((), Value::Null); + self.notification::(()); - // Cancel the main_loop - if let Some(worker) = self.worker.take() { - worker.join().unwrap(); - } - }); + // Cancel the main_loop + if let Some(worker) = self.worker.take() { + worker.join().unwrap(); + } } } diff --git a/crates/mun_vfs/src/monitor.rs b/crates/mun_vfs/src/monitor.rs index 040c9f190..73eb09fc9 100644 --- a/crates/mun_vfs/src/monitor.rs +++ b/crates/mun_vfs/src/monitor.rs @@ -5,6 +5,7 @@ mod notify_monitor; pub use notify_monitor::NotifyMonitor; use crate::{AbsPath, AbsPathBuf}; +use std::fmt; /// Describes something to be monitored by a `Monitor`. #[derive(Debug, Clone)] @@ -46,7 +47,6 @@ pub struct MonitorConfig { } /// A message that might be communicated from a [`Monitor`] -#[derive(Debug)] pub enum MonitorMessage { /// A message that indicates the progress status of the monitor Progress { total: usize, done: usize }, @@ -153,6 +153,22 @@ impl MonitorEntry { } } +impl fmt::Debug for MonitorMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MonitorMessage::Loaded { files } => f + .debug_struct("Loaded") + .field("files", &files.len()) + .finish(), + MonitorMessage::Progress { total, done } => f + .debug_struct("Progress") + .field("total", total) + .field("done", done) + .finish(), + } + } +} + #[cfg(test)] mod tests { use super::{AbsPathBuf, Monitor, MonitorDirectories};