diff --git a/crates/mun_language_server/Cargo.toml b/crates/mun_language_server/Cargo.toml index a3e332def..aadec71e6 100644 --- a/crates/mun_language_server/Cargo.toml +++ b/crates/mun_language_server/Cargo.toml @@ -26,8 +26,7 @@ anyhow = "1.0" thiserror = "1.0" salsa = "0.15.0" hir = { version = "=0.2.0", path="../mun_hir", package="mun_hir" } -rayon = "1.3" -num_cpus = "1.13.0" +threadpool="1.8.1" vfs = { path = "../mun_vfs", package="mun_vfs" } project = { path = "../mun_project", package="mun_project" } mun_target = { version = "=0.2.0", path = "../mun_target" } @@ -39,3 +38,4 @@ paths = {path="../mun_paths", package="mun_paths"} [dev-dependencies] tempdir = "0.3.7" +mun_test = { path = "../mun_test"} diff --git a/crates/mun_language_server/src/analysis.rs b/crates/mun_language_server/src/analysis.rs index 9a8aa282d..b0d155e0a 100644 --- a/crates/mun_language_server/src/analysis.rs +++ b/crates/mun_language_server/src/analysis.rs @@ -37,6 +37,11 @@ impl Analysis { db: self.db.snapshot(), } } + + /// Requests any outstanding snapshot to cancel computations. + pub fn request_cancellation(&mut self) { + self.db.request_cancellation(); + } } /// The `AnalysisSnapshot` is a snapshot of the state of the source, it enables querying for diff --git a/crates/mun_language_server/src/config.rs b/crates/mun_language_server/src/config.rs index ab3b2af48..7bfd17dbe 100644 --- a/crates/mun_language_server/src/config.rs +++ b/crates/mun_language_server/src/config.rs @@ -1,5 +1,5 @@ -use crate::project_manifest::ProjectManifest; use paths::AbsPathBuf; +use project::ProjectManifest; /// The configuration used by the language server. #[derive(Debug, Clone)] diff --git a/crates/mun_language_server/src/db.rs b/crates/mun_language_server/src/db.rs index a4383991b..3ed60880d 100644 --- a/crates/mun_language_server/src/db.rs +++ b/crates/mun_language_server/src/db.rs @@ -3,7 +3,7 @@ use crate::cancelation::Canceled; use hir::{HirDatabase, Upcast}; use mun_target::spec::Target; -use salsa::{Database, Snapshot}; +use salsa::{Database, Durability, Snapshot}; use std::panic; /// The `AnalysisDatabase` provides the database for all analyses. A database is given input and @@ -38,6 +38,11 @@ impl AnalysisDatabase { db } + + /// Triggers a simple write on the database which will cancell all outstanding snapshots. + pub fn request_cancellation(&mut self) { + self.salsa_runtime_mut().synthetic_write(Durability::LOW); + } } impl salsa::Database for AnalysisDatabase { diff --git a/crates/mun_language_server/src/lib.rs b/crates/mun_language_server/src/lib.rs index 9b3e9697b..ebf75fa35 100644 --- a/crates/mun_language_server/src/lib.rs +++ b/crates/mun_language_server/src/lib.rs @@ -1,3 +1,13 @@ +use std::convert::TryFrom; + +use serde::{de::DeserializeOwned, Serialize}; + +pub use config::{Config, FilesWatcher}; +pub use main_loop::main_loop; +use paths::AbsPathBuf; +use project::ProjectManifest; +pub(crate) use state::LanguageServerState; + mod analysis; mod cancelation; mod capabilities; @@ -6,18 +16,8 @@ mod config; mod conversion; mod db; mod diagnostics; -mod dispatcher; mod main_loop; -mod project_manifest; -mod workspace; - -pub use config::Config; -pub use main_loop::main_loop; - -use crate::{config::FilesWatcher, project_manifest::ProjectManifest}; -use paths::AbsPathBuf; -use serde::{de::DeserializeOwned, Serialize}; -use std::convert::TryFrom; +mod state; pub type Result = anyhow::Result; diff --git a/crates/mun_language_server/src/main_loop.rs b/crates/mun_language_server/src/main_loop.rs index c1a343c36..b7c12e9c6 100644 --- a/crates/mun_language_server/src/main_loop.rs +++ b/crates/mun_language_server/src/main_loop.rs @@ -1,512 +1,8 @@ -use crate::dispatcher::{NotificationDispatcher, RequestDispatcher}; -use crate::{ - analysis::{Analysis, AnalysisSnapshot, Cancelable}, - change::AnalysisChange, - config::Config, - conversion::{convert_range, convert_uri, url_from_path_with_drive_lowercasing}, - to_json, Result, -}; -use crossbeam_channel::{select, unbounded, Receiver, Sender}; -use lsp_server::{Connection, ReqQueue}; -use lsp_types::notification::Notification; -use lsp_types::{notification::PublishDiagnostics, PublishDiagnosticsParams, Url}; -use parking_lot::RwLock; -use paths::AbsPathBuf; -use rustc_hash::FxHashSet; -use std::time::Instant; -use std::{ops::Deref, sync::Arc}; -use vfs::VirtualFileSystem; - -/// A `Task` is something that is send from async tasks to the entry point for processing. This -/// enables synchronizing resources like the connection with the client. -#[derive(Debug)] -pub(crate) enum Task { - Notify(lsp_server::Notification), -} - -#[derive(Debug)] -pub(crate) enum Event { - Vfs(vfs::MonitorMessage), - Task(Task), - Lsp(lsp_server::Message), -} - -pub(crate) type RequestHandler = fn(&mut LanguageServerState, lsp_server::Response); - -/// State for the language server -pub(crate) struct LanguageServerState { - /// Channel to send language server messages to the client - sender: Sender, - - /// The request queue keeps track of all incoming and outgoing requests. - request_queue: lsp_server::ReqQueue<(String, Instant), RequestHandler>, - - /// The configuration passed by the client - pub config: Config, - - /// Thread pool for async execution - pub thread_pool: rayon::ThreadPool, - - /// Channel to send tasks to from background operations - pub task_sender: Sender, - - /// Channel to receive tasks on from background operations - pub task_receiver: Receiver, - - /// The virtual filesystem that holds all the file contents - pub vfs: Arc>, - - /// The vfs monitor - pub vfs_monitor: Box, - - /// The receiver of vfs monitor messages - pub vfs_monitor_receiver: Receiver, - - /// Documents that are currently kept in memory from the client - pub open_docs: FxHashSet, - - /// Holds the state of the analysis process - pub analysis: Analysis, - - /// All the packages known to the server - pub packages: Arc>, - - /// True if the client requested that we shut down - pub shutdown_requested: bool, -} - -/// A snapshot of the state of the language server -pub(crate) struct LanguageServerSnapshot { - /// The virtual filesystem that holds all the file contents - pub vfs: Arc>, - - /// Holds the state of the analysis process - pub analysis: AnalysisSnapshot, - - /// All the packages known to the server - pub packages: Arc>, -} - -impl LanguageServerState { - pub fn new(sender: Sender, config: Config) -> Self { - // Construct the virtual filesystem monitor - let (vfs_monitor_sender, vfs_monitor_receiver) = unbounded::(); - let vfs_monitor: vfs::NotifyMonitor = vfs::Monitor::new(Box::new(move |msg| { - vfs_monitor_sender - .send(msg) - .expect("error sending vfs monitor message to foreground") - })); - let vfs_monitor = Box::new(vfs_monitor) as Box; - - // Create a thread pool to dispatch the async commands - // Use the num_cpus to get a nice thread count estimation - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(num_cpus::get()) - .build() - .expect("unable to spin up thread pool"); - - // Construct a task channel - let (task_sender, task_receiver) = unbounded(); - - // Construct the state that will hold all the analysis and apply the initial state - let mut analysis = Analysis::new(); - let mut change = AnalysisChange::new(); - change.set_packages(Default::default()); - change.set_roots(Default::default()); - analysis.apply_change(change); - - LanguageServerState { - sender, - request_queue: ReqQueue::default(), - config, - vfs: Arc::new(RwLock::new(Default::default())), - vfs_monitor, - vfs_monitor_receiver, - open_docs: FxHashSet::default(), - thread_pool, - task_sender, - task_receiver, - analysis, - packages: Arc::new(Vec::new()), - shutdown_requested: false, - } - } - - /// Blocks until a new event is received from on of the many channels the language server - /// listens to. Returns the first event that is received. - fn next_event(&self, receiver: &Receiver) -> Option { - select! { - recv(receiver) -> msg => msg.ok().map(Event::Lsp), - recv(self.vfs_monitor_receiver) -> task => Some(Event::Vfs(task.unwrap())), - recv(self.task_receiver) -> task => Some(Event::Task(task.unwrap())) - } - } - - /// Runs the language server to completion - pub fn run(mut self, receiver: Receiver) -> Result<()> { - // Start by updating the current workspace - self.fetch_workspaces(); - - while let Some(event) = self.next_event(&receiver) { - if let Event::Lsp(lsp_server::Message::Notification(notification)) = &event { - if notification.method == lsp_types::notification::Exit::METHOD { - return Ok(()); - } - } - self.handle_event(event)?; - } - - Ok(()) - } - - /// Handles an event from one of the many sources that the language server subscribes to. - fn handle_event(&mut self, event: Event) -> Result<()> { - let start_time = Instant::now(); - log::info!("handling event: {:?}", event); - - // Process the incoming event - match event { - Event::Task(task) => handle_task(task, self)?, - Event::Lsp(msg) => match msg { - lsp_server::Message::Request(req) => self.on_request(req, start_time)?, - lsp_server::Message::Response(resp) => self.complete_request(resp), - lsp_server::Message::Notification(not) => self.on_notification(not)?, - }, - Event::Vfs(task) => handle_vfs_task(task, self)?, - }; - - // Process any changes to the vfs - let state_changed = self.process_vfs_changes(); - if state_changed { - let snapshot = self.snapshot(); - let task_sender = self.task_sender.clone(); - // Spawn the diagnostics in the threadpool - self.thread_pool.spawn(move || { - handle_diagnostics(snapshot, task_sender).unwrap(); - }); - } - - Ok(()) - } - - /// Handles a language server protocol request - fn on_request( - &mut self, - request: lsp_server::Request, - request_received: Instant, - ) -> Result<()> { - self.register_request(&request, request_received); - - // If a shutdown was requested earlier, immediately respond with an error - if self.shutdown_requested { - self.respond(lsp_server::Response::new_err( - request.id, - lsp_server::ErrorCode::InvalidRequest as i32, - "shutdown was requested".to_owned(), - )); - return Ok(()); - } - - // Dispatch the event based on the type of event - RequestDispatcher::new(self, request) - .on::(|state, _request| { - state.shutdown_requested = true; - Ok(()) - })? - .finish(); - - Ok(()) - } - - /// Handles a notification from the language server client - fn on_notification(&mut self, notification: lsp_server::Notification) -> Result<()> { - NotificationDispatcher::new(self, notification) - .on::(|state, params| { - if let Ok(path) = convert_uri(¶ms.text_document.uri) { - state.open_docs.insert(path.clone()); - state - .vfs - .write() - .set_file_contents(&path, Some(params.text_document.text.into_bytes())); - } - Ok(()) - })? - .on::(|state, params| { - let lsp_types::DidChangeTextDocumentParams { - text_document, - content_changes, - } = params; - if let Ok(path) = convert_uri(&text_document.uri) { - let new_content = content_changes.get(0).unwrap().text.clone(); - state - .vfs - .write() - .set_file_contents(&path, Some(new_content.into_bytes())); - } - Ok(()) - })? - .on::(|state, params| { - if let Ok(path) = convert_uri(¶ms.text_document.uri) { - state.open_docs.remove(&path); - state.vfs_monitor.reload(&path); - } - // Clear any diagnostics that we may have send - state.send_notification::( - lsp_types::PublishDiagnosticsParams { - uri: params.text_document.uri, - diagnostics: Vec::new(), - version: None, - }, - ); - Ok(()) - })? - .on::(|state, params| { - for change in params.changes { - if let Ok(path) = convert_uri(&change.uri) { - state.vfs_monitor.reload(&path); - } - } - Ok(()) - })? - .finish(); - Ok(()) - } - - /// Registers a request with the server. We register all these request to make sure they all get - /// handled and so we can measure the time it takes for them to complete from the point of view - /// of the client. - fn register_request(&mut self, request: &lsp_server::Request, request_received: Instant) { - self.request_queue.incoming.register( - request.id.clone(), - (request.method.clone(), request_received), - ) - } - - /// Sends a request to the client and registers the request so that we can handle the response. - pub(crate) fn send_request( - &mut self, - params: R::Params, - handler: RequestHandler, - ) { - let request = self - .request_queue - .outgoing - .register(R::METHOD.to_string(), params, handler); - self.send(request.into()); - } - - /// Sends a notification to the client - pub(crate) fn send_notification( - &mut self, - params: N::Params, - ) { - let not = lsp_server::Notification::new(N::METHOD.to_string(), params); - self.send(not.into()); - } - - /// Handles a response to a request we made. The response gets forwarded to where we made the - /// request from. - fn complete_request(&mut self, response: lsp_server::Response) { - let handler = self.request_queue.outgoing.complete(response.id.clone()); - handler(self, response) - } - - /// Sends a response to a request to the client. This method logs the time it took us to reply - /// to a request from the client. - pub(crate) fn respond(&mut self, response: lsp_server::Response) { - if let Some((_method, start)) = self.request_queue.incoming.complete(response.id.clone()) { - let duration = start.elapsed(); - log::info!("handled req#{} in {:?}", response.id, duration); - self.send(response.into()); - } - } - - /// Sends a message to the client - fn send(&mut self, message: lsp_server::Message) { - self.sender - .send(message) - .expect("error sending lsp message to the outgoing channel") - } -} +use crate::{Config, LanguageServerState}; +use lsp_server::Connection; /// Runs the main loop of the language server. This will receive requests and handle them. -pub fn main_loop(connection: Connection, config: Config) -> Result<()> { +pub fn main_loop(connection: Connection, config: Config) -> anyhow::Result<()> { log::info!("initial config: {:#?}", config); LanguageServerState::new(connection.sender, config).run(connection.receiver) } - -/// Send all diagnostics of all files -fn handle_diagnostics(state: LanguageServerSnapshot, sender: Sender) -> Cancelable<()> { - // Iterate over all files - for (idx, _package) in state.packages.iter().enumerate() { - let package_id = hir::PackageId(idx as u32); - - // Get all the files - let files = state.analysis.package_source_files(package_id)?; - - // Publish all diagnostics - for file in files { - let line_index = state.analysis.file_line_index(file)?; - let uri = state.file_id_to_uri(file).unwrap(); - let diagnostics = state.analysis.diagnostics(file)?; - - let diagnostics = { - let mut lsp_diagnostics = Vec::with_capacity(diagnostics.len()); - for d in diagnostics { - lsp_diagnostics.push(lsp_types::Diagnostic { - range: convert_range(d.range, &line_index), - severity: Some(lsp_types::DiagnosticSeverity::Error), - code: None, - code_description: None, - source: Some("mun".to_string()), - message: d.message, - related_information: { - let mut annotations = - Vec::with_capacity(d.additional_annotations.len()); - for annotation in d.additional_annotations { - annotations.push(lsp_types::DiagnosticRelatedInformation { - location: lsp_types::Location { - uri: state - .file_id_to_uri(annotation.range.file_id) - .unwrap(), - range: convert_range( - annotation.range.value, - state - .analysis - .file_line_index(annotation.range.file_id)? - .deref(), - ), - }, - message: annotation.message, - }); - } - if annotations.is_empty() { - None - } else { - Some(annotations) - } - }, - tags: None, - data: None, - }); - } - lsp_diagnostics - }; - - sender - .send(Task::Notify(lsp_server::Notification { - method: PublishDiagnostics::METHOD.to_owned(), - params: to_json(PublishDiagnosticsParams { - uri, - diagnostics, - version: None, - }) - .unwrap(), - })) - .unwrap() - } - } - Ok(()) -} - -/// Handles a task send by another async task -fn handle_task(task: Task, state: &mut LanguageServerState) -> Result<()> { - match task { - Task::Notify(notification) => { - state.send(notification.into()); - } - } - Ok(()) -} - -/// Handles a change to the underlying virtual file system. -fn handle_vfs_task(mut task: vfs::MonitorMessage, state: &mut LanguageServerState) -> Result<()> { - loop { - match task { - vfs::MonitorMessage::Progress { .. } => {} - vfs::MonitorMessage::Loaded { files } => { - let vfs = &mut *state.vfs.write(); - for (path, contents) in files { - vfs.set_file_contents(&path, contents); - } - } - } - - // Coalesce many VFS events into a single loop turn - task = match state.vfs_monitor_receiver.try_recv() { - Ok(task) => task, - _ => break, - } - } - Ok(()) -} - -impl LanguageServerState { - /// Creates a snapshot of the state - pub fn snapshot(&self) -> LanguageServerSnapshot { - LanguageServerSnapshot { - vfs: self.vfs.clone(), - analysis: self.analysis.snapshot(), - packages: self.packages.clone(), - } - } - - /// Processes any and all changes that have been applied to the virtual filesystem. Generates - /// an `AnalysisChange` and applies it if there are changes. True is returned if things changed, - /// otherwise false. - pub fn process_vfs_changes(&mut self) -> bool { - // Get all the changes since the last time we processed - let changed_files = { - let mut vfs = self.vfs.write(); - vfs.take_changes() - }; - if changed_files.is_empty() { - return false; - } - - // Construct an AnalysisChange to apply to the analysis - let vfs = self.vfs.read(); - let mut analysis_change = AnalysisChange::new(); - let mut has_created_or_deleted_entries = false; - for file in changed_files { - // If the file was deleted or created we have to remember that so that we update the - // source roots as well. - if file.is_created_or_deleted() { - has_created_or_deleted_entries = true; - } - - // Convert the contents of the file to a string - let bytes = vfs - .file_contents(file.file_id) - .map(Vec::from) - .unwrap_or_default(); - let text = match String::from_utf8(bytes).ok() { - Some(text) => Some(Arc::from(text)), - None => None, - }; - - // Notify the database about this change - analysis_change.change_file(hir::FileId(file.file_id.0), text); - } - - // If an entry was created or deleted we have to recreate all source roots - if has_created_or_deleted_entries { - analysis_change.set_roots(self.recompute_source_roots()); - } - - // Apply the change - self.analysis.apply_change(analysis_change); - true - } -} - -impl LanguageServerSnapshot { - /// Converts the specified `hir::FileId` to a `Url` - pub fn file_id_to_uri(&self, id: hir::FileId) -> Result { - let vfs = self.vfs.read(); - let path = vfs.file_path(vfs::FileId(id.0)); - let url = url_from_path_with_drive_lowercasing(path)?; - - Ok(url) - } -} diff --git a/crates/mun_language_server/src/state.rs b/crates/mun_language_server/src/state.rs new file mode 100644 index 000000000..12c02901e --- /dev/null +++ b/crates/mun_language_server/src/state.rs @@ -0,0 +1,388 @@ +use std::time::Instant; +use std::{ops::Deref, sync::Arc}; + +use crossbeam_channel::{select, unbounded, Receiver, Sender}; +use lsp_server::ReqQueue; +use lsp_types::notification::Notification; +use lsp_types::{notification::PublishDiagnostics, PublishDiagnosticsParams, Url}; +use parking_lot::RwLock; +use rustc_hash::FxHashSet; + +use paths::AbsPathBuf; +use vfs::VirtualFileSystem; + +use crate::state::utils::Progress; +use crate::{ + analysis::{Analysis, AnalysisSnapshot}, + change::AnalysisChange, + config::Config, + conversion::{convert_range, url_from_path_with_drive_lowercasing}, + to_json, Result, +}; + +mod protocol; +mod utils; +mod workspace; + +/// A `Task` is something that is send from async tasks to the entry point for processing. This +/// enables synchronizing resources like the connection with the client. +#[derive(Debug)] +pub(crate) enum Task { + Notify(lsp_server::Notification), +} + +#[derive(Debug)] +pub(crate) enum Event { + Vfs(vfs::MonitorMessage), + Task(Task), + Lsp(lsp_server::Message), +} + +pub(crate) type RequestHandler = fn(&mut LanguageServerState, lsp_server::Response); + +/// State for the language server +pub(crate) struct LanguageServerState { + /// Channel to send language server messages to the client + pub(crate) sender: Sender, + + /// The request queue keeps track of all incoming and outgoing requests. + pub(crate) request_queue: lsp_server::ReqQueue<(String, Instant), RequestHandler>, + + /// The configuration passed by the client + pub config: Config, + + /// Thread pool for async execution + pub thread_pool: threadpool::ThreadPool, + + /// Channel to send tasks to from background operations + pub task_sender: Sender, + + /// Channel to receive tasks on from background operations + pub task_receiver: Receiver, + + /// The virtual filesystem that holds all the file contents + pub vfs: Arc>, + + /// The vfs monitor + pub vfs_monitor: Box, + + /// The receiver of vfs monitor messages + pub vfs_monitor_receiver: Receiver, + + /// Documents that are currently kept in memory from the client + pub open_docs: FxHashSet, + + /// Holds the state of the analysis process + pub analysis: Analysis, + + /// All the packages known to the server + pub packages: Arc>, + + /// True if the client requested that we shut down + pub shutdown_requested: bool, +} + +/// A snapshot of the state of the language server +pub(crate) struct LanguageServerSnapshot { + /// The virtual filesystem that holds all the file contents + pub vfs: Arc>, + + /// Holds the state of the analysis process + pub analysis: AnalysisSnapshot, + + /// All the packages known to the server + pub packages: Arc>, +} + +impl LanguageServerState { + pub fn new(sender: Sender, config: Config) -> Self { + // Construct the virtual filesystem monitor + let (vfs_monitor_sender, vfs_monitor_receiver) = unbounded::(); + let vfs_monitor: vfs::NotifyMonitor = vfs::Monitor::new(Box::new(move |msg| { + vfs_monitor_sender + .send(msg) + .expect("error sending vfs monitor message to foreground") + })); + let vfs_monitor = Box::new(vfs_monitor) as Box; + + // Construct a task channel + let (task_sender, task_receiver) = unbounded(); + + // Construct the state that will hold all the analysis and apply the initial state + let mut analysis = Analysis::new(); + let mut change = AnalysisChange::new(); + change.set_packages(Default::default()); + change.set_roots(Default::default()); + analysis.apply_change(change); + + LanguageServerState { + sender, + request_queue: ReqQueue::default(), + config, + vfs: Arc::new(RwLock::new(Default::default())), + vfs_monitor, + vfs_monitor_receiver, + open_docs: FxHashSet::default(), + thread_pool: threadpool::ThreadPool::default(), + task_sender, + task_receiver, + analysis, + packages: Arc::new(Vec::new()), + shutdown_requested: false, + } + } + + /// Blocks until a new event is received from on of the many channels the language server + /// listens to. Returns the first event that is received. + fn next_event(&self, receiver: &Receiver) -> Option { + select! { + recv(receiver) -> msg => msg.ok().map(Event::Lsp), + recv(self.vfs_monitor_receiver) -> task => Some(Event::Vfs(task.unwrap())), + recv(self.task_receiver) -> task => Some(Event::Task(task.unwrap())) + } + } + + /// Runs the language server to completion + pub fn run(mut self, receiver: Receiver) -> Result<()> { + // Start by updating the current workspace + self.fetch_workspaces(); + + while let Some(event) = self.next_event(&receiver) { + if let Event::Lsp(lsp_server::Message::Notification(notification)) = &event { + if notification.method == lsp_types::notification::Exit::METHOD { + return Ok(()); + } + } + self.handle_event(event)?; + } + + Ok(()) + } + + /// Handles an event from one of the many sources that the language server subscribes to. + fn handle_event(&mut self, event: Event) -> Result<()> { + let start_time = Instant::now(); + log::info!("handling event: {:?}", event); + + // Process the incoming event + match event { + Event::Task(task) => self.handle_task(task)?, + Event::Lsp(msg) => match msg { + lsp_server::Message::Request(req) => self.on_request(req, start_time)?, + lsp_server::Message::Response(resp) => self.complete_request(resp), + lsp_server::Message::Notification(not) => self.on_notification(not)?, + }, + Event::Vfs(task) => self.handle_vfs_task(task)?, + }; + + // Process any changes to the vfs + let state_changed = self.process_vfs_changes(); + if state_changed { + let snapshot = self.snapshot(); + let task_sender = self.task_sender.clone(); + // Spawn the diagnostics in the threadpool + self.thread_pool.execute(move || { + let _result = handle_diagnostics(snapshot, task_sender); + }); + } + + Ok(()) + } + + /// Handles a task send by another async task + fn handle_task(&mut self, task: Task) -> Result<()> { + match task { + Task::Notify(notification) => { + self.send(notification.into()); + } + } + Ok(()) + } + + /// Handles a change to the underlying virtual file system. + fn handle_vfs_task(&mut self, mut task: vfs::MonitorMessage) -> Result<()> { + loop { + match task { + vfs::MonitorMessage::Progress { total, done } => { + let progress_state = if done == 0 { + Progress::Begin + } else if done < total { + Progress::Report + } else { + Progress::End + }; + self.report_progress( + "projects scanned", + progress_state, + Some(format!("{}/{}", done, total)), + Some(Progress::fraction(done, total)), + ) + } + vfs::MonitorMessage::Loaded { files } => { + let vfs = &mut *self.vfs.write(); + for (path, contents) in files { + vfs.set_file_contents(&path, contents); + } + } + } + + // Coalesce many VFS events into a single loop turn + task = match self.vfs_monitor_receiver.try_recv() { + Ok(task) => task, + _ => break, + } + } + Ok(()) + } +} + +/// Send all diagnostics of all files +fn handle_diagnostics(state: LanguageServerSnapshot, sender: Sender) -> Result<()> { + // Iterate over all files + for (idx, _package) in state.packages.iter().enumerate() { + let package_id = hir::PackageId(idx as u32); + + // Get all the files + let files = state.analysis.package_source_files(package_id)?; + + // Publish all diagnostics + for file in files { + let line_index = state.analysis.file_line_index(file)?; + let uri = state.file_id_to_uri(file).unwrap(); + let diagnostics = state.analysis.diagnostics(file)?; + + let diagnostics = { + let mut lsp_diagnostics = Vec::with_capacity(diagnostics.len()); + for d in diagnostics { + lsp_diagnostics.push(lsp_types::Diagnostic { + range: convert_range(d.range, &line_index), + severity: Some(lsp_types::DiagnosticSeverity::Error), + code: None, + code_description: None, + source: Some("mun".to_string()), + message: d.message, + related_information: { + let mut annotations = + Vec::with_capacity(d.additional_annotations.len()); + for annotation in d.additional_annotations { + annotations.push(lsp_types::DiagnosticRelatedInformation { + location: lsp_types::Location { + uri: state + .file_id_to_uri(annotation.range.file_id) + .unwrap(), + range: convert_range( + annotation.range.value, + state + .analysis + .file_line_index(annotation.range.file_id)? + .deref(), + ), + }, + message: annotation.message, + }); + } + if annotations.is_empty() { + None + } else { + Some(annotations) + } + }, + tags: None, + data: None, + }); + } + lsp_diagnostics + }; + + sender + .send(Task::Notify(lsp_server::Notification { + method: PublishDiagnostics::METHOD.to_owned(), + params: to_json(PublishDiagnosticsParams { + uri, + diagnostics, + version: None, + }) + .unwrap(), + })) + .unwrap(); + } + } + Ok(()) +} + +impl LanguageServerState { + /// Creates a snapshot of the state + pub fn snapshot(&self) -> LanguageServerSnapshot { + LanguageServerSnapshot { + vfs: self.vfs.clone(), + analysis: self.analysis.snapshot(), + packages: self.packages.clone(), + } + } + + /// Processes any and all changes that have been applied to the virtual filesystem. Generates + /// an `AnalysisChange` and applies it if there are changes. True is returned if things changed, + /// otherwise false. + pub fn process_vfs_changes(&mut self) -> bool { + // Get all the changes since the last time we processed + let changed_files = { + let mut vfs = self.vfs.write(); + vfs.take_changes() + }; + if changed_files.is_empty() { + return false; + } + + // Construct an AnalysisChange to apply to the analysis + let vfs = self.vfs.read(); + let mut analysis_change = AnalysisChange::new(); + let mut has_created_or_deleted_entries = false; + for file in changed_files { + // If the file was deleted or created we have to remember that so that we update the + // source roots as well. + if file.is_created_or_deleted() { + has_created_or_deleted_entries = true; + } + + // Convert the contents of the file to a string + let bytes = vfs + .file_contents(file.file_id) + .map(Vec::from) + .unwrap_or_default(); + let text = match String::from_utf8(bytes).ok() { + Some(text) => Some(Arc::from(text)), + None => None, + }; + + // Notify the database about this change + analysis_change.change_file(hir::FileId(file.file_id.0), text); + } + + // If an entry was created or deleted we have to recreate all source roots + if has_created_or_deleted_entries { + analysis_change.set_roots(self.recompute_source_roots()); + } + + // Apply the change + self.analysis.apply_change(analysis_change); + true + } +} + +impl LanguageServerSnapshot { + /// Converts the specified `hir::FileId` to a `Url` + pub fn file_id_to_uri(&self, id: hir::FileId) -> Result { + let vfs = self.vfs.read(); + let path = vfs.file_path(vfs::FileId(id.0)); + let url = url_from_path_with_drive_lowercasing(path)?; + + Ok(url) + } +} + +impl Drop for LanguageServerState { + fn drop(&mut self) { + self.analysis.request_cancellation(); + self.thread_pool.join(); + } +} diff --git a/crates/mun_language_server/src/state/protocol.rs b/crates/mun_language_server/src/state/protocol.rs new file mode 100644 index 000000000..cce84e614 --- /dev/null +++ b/crates/mun_language_server/src/state/protocol.rs @@ -0,0 +1,169 @@ +use std::time::Instant; + +use anyhow::Result; + +use dispatcher::{NotificationDispatcher, RequestDispatcher}; + +use super::LanguageServerState; +use crate::conversion::convert_uri; +use crate::state::RequestHandler; +use lsp_types::notification::{ + DidChangeTextDocument, DidChangeWatchedFiles, DidCloseTextDocument, DidOpenTextDocument, +}; + +pub mod dispatcher; + +impl LanguageServerState { + /// Called when a `DidOpenTextDocument` notification was received. + fn on_did_open_text_document( + &mut self, + params: lsp_types::DidOpenTextDocumentParams, + ) -> anyhow::Result<()> { + if let Ok(path) = convert_uri(¶ms.text_document.uri) { + self.open_docs.insert(path.clone()); + self.vfs + .write() + .set_file_contents(&path, Some(params.text_document.text.into_bytes())); + } + Ok(()) + } + + /// Called when a `DidChangeTextDocument` notification was received. + fn on_did_change_text_document( + &mut self, + params: lsp_types::DidChangeTextDocumentParams, + ) -> anyhow::Result<()> { + let lsp_types::DidChangeTextDocumentParams { + text_document, + content_changes, + } = params; + if let Ok(path) = convert_uri(&text_document.uri) { + let new_content = content_changes.get(0).unwrap().text.clone(); + self.vfs + .write() + .set_file_contents(&path, Some(new_content.into_bytes())); + } + Ok(()) + } + + /// Called when a `DidCloseTextDocument` notification was received. + fn on_did_close_text_document( + &mut self, + params: lsp_types::DidCloseTextDocumentParams, + ) -> anyhow::Result<()> { + if let Ok(path) = convert_uri(¶ms.text_document.uri) { + self.open_docs.remove(&path); + self.vfs_monitor.reload(&path); + } + Ok(()) + } + + /// Called when a `DidChangeWatchedFiles` was received + fn on_did_change_watched_files( + &mut self, + params: lsp_types::DidChangeWatchedFilesParams, + ) -> anyhow::Result<()> { + for change in params.changes { + if let Ok(path) = convert_uri(&change.uri) { + self.vfs_monitor.reload(&path); + } + } + Ok(()) + } + + /// Handles a language server protocol request + pub(super) fn on_request( + &mut self, + request: lsp_server::Request, + request_received: Instant, + ) -> Result<()> { + self.register_request(&request, request_received); + + // If a shutdown was requested earlier, immediately respond with an error + if self.shutdown_requested { + self.respond(lsp_server::Response::new_err( + request.id, + lsp_server::ErrorCode::InvalidRequest as i32, + "shutdown was requested".to_owned(), + )); + return Ok(()); + } + + // Dispatch the event based on the type of event + RequestDispatcher::new(self, request) + .on::(|state, _request| { + state.shutdown_requested = true; + Ok(()) + })? + .finish(); + + Ok(()) + } + + /// Handles a notification from the language server client + pub(super) fn on_notification(&mut self, notification: lsp_server::Notification) -> Result<()> { + NotificationDispatcher::new(self, notification) + .on::(LanguageServerState::on_did_open_text_document)? + .on::(LanguageServerState::on_did_change_text_document)? + .on::(LanguageServerState::on_did_close_text_document)? + .on::(LanguageServerState::on_did_change_watched_files)? + .finish(); + Ok(()) + } + + /// Registers a request with the server. We register all these request to make sure they all get + /// handled and so we can measure the time it takes for them to complete from the point of view + /// of the client. + fn register_request(&mut self, request: &lsp_server::Request, request_received: Instant) { + self.request_queue.incoming.register( + request.id.clone(), + (request.method.clone(), request_received), + ) + } + + /// Sends a request to the client and registers the request so that we can handle the response. + pub(crate) fn send_request( + &mut self, + params: R::Params, + handler: RequestHandler, + ) { + let request = self + .request_queue + .outgoing + .register(R::METHOD.to_string(), params, handler); + self.send(request.into()); + } + + /// Sends a notification to the client + pub(crate) fn send_notification( + &mut self, + params: N::Params, + ) { + let not = lsp_server::Notification::new(N::METHOD.to_string(), params); + self.send(not.into()); + } + + /// Handles a response to a request we made. The response gets forwarded to where we made the + /// request from. + pub(super) fn complete_request(&mut self, response: lsp_server::Response) { + let handler = self.request_queue.outgoing.complete(response.id.clone()); + handler(self, response) + } + + /// Sends a response to a request to the client. This method logs the time it took us to reply + /// to a request from the client. + fn respond(&mut self, response: lsp_server::Response) { + if let Some((_method, start)) = self.request_queue.incoming.complete(response.id.clone()) { + let duration = start.elapsed(); + log::info!("handled req#{} in {:?}", response.id, duration); + self.send(response.into()); + } + } + + /// Sends a message to the client + pub(crate) fn send(&mut self, message: lsp_server::Message) { + self.sender + .send(message) + .expect("error sending lsp message to the outgoing channel") + } +} diff --git a/crates/mun_language_server/src/dispatcher.rs b/crates/mun_language_server/src/state/protocol/dispatcher.rs similarity index 99% rename from crates/mun_language_server/src/dispatcher.rs rename to crates/mun_language_server/src/state/protocol/dispatcher.rs index 5365c9843..20c885bf9 100644 --- a/crates/mun_language_server/src/dispatcher.rs +++ b/crates/mun_language_server/src/state/protocol/dispatcher.rs @@ -1,6 +1,6 @@ +use super::LanguageServerState; use crate::cancelation::is_canceled; use crate::from_json; -use crate::main_loop::LanguageServerState; use anyhow::Result; use serde::de::DeserializeOwned; use serde::Serialize; diff --git a/crates/mun_language_server/src/state/utils.rs b/crates/mun_language_server/src/state/utils.rs new file mode 100644 index 000000000..449721b39 --- /dev/null +++ b/crates/mun_language_server/src/state/utils.rs @@ -0,0 +1,74 @@ +use super::LanguageServerState; + +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum Progress { + Begin, + Report, + End, +} + +impl Progress { + /// Builds a fractional progress value + pub(crate) fn fraction(done: usize, total: usize) -> f64 { + assert!(done <= total); + done as f64 / total.max(1) as f64 + } +} + +impl LanguageServerState { + /// Sends a notification to the client to display the specified message to the user. + pub(crate) fn show_message(&mut self, typ: lsp_types::MessageType, message: impl AsRef) { + let message = message.as_ref().to_owned(); + self.send_notification::( + lsp_types::ShowMessageParams { typ, message }, + ) + } + + /// Reports progress to the user via the `WorkDoneProgress` protocol. + pub(crate) fn report_progress( + &mut self, + title: &str, + state: Progress, + message: Option, + fraction: Option, + ) { + // TODO: Ensure that the client supports WorkDoneProgress + + let percentage = fraction.map(|f| { + (0.0..=1.0).contains(&f); + (f * 100.0) as u32 + }); + let token = lsp_types::ProgressToken::String(format!("mun/{}", title)); + let work_done_progress = match state { + Progress::Begin => { + self.send_request::( + lsp_types::WorkDoneProgressCreateParams { + token: token.clone(), + }, + |_, _| (), + ); + + lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin { + title: title.into(), + cancellable: None, + message, + percentage, + }) + } + Progress::Report => { + lsp_types::WorkDoneProgress::Report(lsp_types::WorkDoneProgressReport { + cancellable: None, + message, + percentage, + }) + } + Progress::End => { + lsp_types::WorkDoneProgress::End(lsp_types::WorkDoneProgressEnd { message }) + } + }; + self.send_notification::(lsp_types::ProgressParams { + token, + value: lsp_types::ProgressParamsValue::WorkDone(work_done_progress), + }); + } +} diff --git a/crates/mun_language_server/src/workspace.rs b/crates/mun_language_server/src/state/workspace.rs similarity index 93% rename from crates/mun_language_server/src/workspace.rs rename to crates/mun_language_server/src/state/workspace.rs index 890624421..9aa264552 100644 --- a/crates/mun_language_server/src/workspace.rs +++ b/crates/mun_language_server/src/state/workspace.rs @@ -1,4 +1,5 @@ -use crate::{change::AnalysisChange, config::FilesWatcher, main_loop::LanguageServerState}; +use super::LanguageServerState; +use crate::{change::AnalysisChange, config::FilesWatcher}; use paths::{AbsPathBuf, RelativePath}; use std::{ convert::{TryFrom, TryInto}, @@ -12,13 +13,16 @@ impl LanguageServerState { let packages = self .config .discovered_projects - .as_ref() + .clone() .into_iter() .flatten() .filter_map(|project| match project::Package::from_file(&project.path) { Ok(package) => Some(package), - Err(_) => { - // TODO: Show error + Err(err) => { + self.show_message( + lsp_types::MessageType::Error, + format!("mun failed to load package: {:#}", err), + ); None } }) diff --git a/crates/mun_language_server/tests/initialization.rs b/crates/mun_language_server/tests/initialization.rs index eccb5e64c..722a775d2 100644 --- a/crates/mun_language_server/tests/initialization.rs +++ b/crates/mun_language_server/tests/initialization.rs @@ -1,6 +1,22 @@ mod support; +use support::Project; + #[test] fn test_server() { - let _server = support::Server::new(); + let _server = Project::with_fixture( + r#" +//- /mun.toml +[package] +name = "foo" +version = "0.0.0" + +//- /src/mod.mun +fn add(a: i32, b: i32) -> i32 { + a + b +} +"#, + ) + .server() + .wait_until_workspace_is_loaded(); } diff --git a/crates/mun_language_server/tests/support.rs b/crates/mun_language_server/tests/support.rs index f49f84007..15dda8f4c 100644 --- a/crates/mun_language_server/tests/support.rs +++ b/crates/mun_language_server/tests/support.rs @@ -1,43 +1,129 @@ use crossbeam_channel::{after, select}; use lsp_server::{Connection, Message, Notification, Request}; -use lsp_types::{notification::Exit, request::Shutdown}; -use mun_language_server::{main_loop, Config}; +use lsp_types::{ + notification::Exit, request::Shutdown, ProgressParams, ProgressParamsValue, WorkDoneProgress, +}; +use mun_language_server::{main_loop, Config, FilesWatcher}; +use mun_test::Fixture; use paths::AbsPathBuf; +use project::ProjectManifest; use serde::Serialize; use serde_json::Value; -use std::convert::TryFrom; +use std::cell::{Cell, RefCell}; +use std::convert::TryInto; +use std::fs; use std::time::Duration; +/// A `Project` represents a project that a language server can work with. Call the `server` method +/// to instantiate a language server that will serve information about the project. +pub struct Project<'a> { + fixture: &'a str, + tmp_dir: Option, +} + +impl<'a> Project<'a> { + /// Construct a project from a fixture. + pub fn with_fixture(fixture: &str) -> Project { + Project { + fixture, + tmp_dir: None, + } + } + + /// Instantiate a language server for this project. + pub fn server(self) -> Server { + // Get or create a temporary directory + let tmp_dir = self + .tmp_dir + .unwrap_or_else(|| tempdir::TempDir::new("testdir").unwrap()); + + // Write all fixtures to a folder + for entry in Fixture::parse(self.fixture) { + let path = entry.relative_path.to_path(tmp_dir.path()); + fs::create_dir_all(path.parent().unwrap()).unwrap(); + fs::write(path.as_path(), entry.text.as_bytes()).unwrap(); + } + + let tmp_dir_path: AbsPathBuf = tmp_dir + .path() + .to_path_buf() + .try_into() + .expect("could not convert temp dir to absolute path"); + let roots = vec![tmp_dir_path.clone()]; + + let discovered_projects = ProjectManifest::discover_all(roots.into_iter()); + + // Construct a default configuration for the server + let config = Config { + discovered_projects: Some(discovered_projects), + watcher: FilesWatcher::Client, + ..Config::new(tmp_dir_path) + }; + + // TODO: Provide the ability to modify the configuration externally + + Server::new(tmp_dir, config) + } +} + /// An object that runs the language server main loop and enables sending and receiving messages /// to and from it. pub struct Server { - next_request_id: i32, + next_request_id: Cell, + messages: RefCell>, worker: Option>, client: Connection, - _temp_path: tempdir::TempDir, + _tmp_dir: tempdir::TempDir, } impl Server { /// Constructs and initializes a new `Server` - pub fn new() -> Self { + pub fn new(tmp_dir: tempdir::TempDir, config: Config) -> Self { let (connection, client) = Connection::memory(); - let temp_path = tempdir::TempDir::new("mun_language_server") - .expect("unable to create temporary directory"); - - let config = Config::new( - AbsPathBuf::try_from(temp_path.path().to_path_buf()) - .expect("temp_path is not an absolute path"), - ); let worker = std::thread::spawn(move || { main_loop(connection, config).unwrap(); }); Self { - next_request_id: Default::default(), + next_request_id: Cell::new(1), + messages: RefCell::new(Vec::new()), worker: Some(worker), client, - _temp_path: temp_path, + _tmp_dir: tmp_dir, + } + } + + /// Waits until all projects in the workspace have been loaded + pub fn wait_until_workspace_is_loaded(self) -> Server { + self.wait_for_message_cond(1, &|msg: &Message| match msg { + Message::Notification(n) if n.method == "$/progress" => { + match n.clone().extract::("$/progress").unwrap() { + ProgressParams { + token: lsp_types::ProgressToken::String(ref token), + value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(_)), + } if token == "mun/projects scanned" => true, + _ => false, + } + } + _ => false, + }); + self + } + + /// A function to wait for a specific message to arrive + fn wait_for_message_cond(&self, n: usize, cond: &dyn Fn(&Message) -> bool) { + let mut total = 0; + for msg in self.messages.borrow().iter() { + if cond(msg) { + total += 1 + } + } + while total < n { + let msg = self.recv().expect("no response"); + if cond(&msg) { + total += 1; + } } } @@ -54,19 +140,19 @@ impl Server { } /// Sends a request to main loop, returning the response - fn send_request(&mut self, params: R::Params) -> Value + fn send_request(&self, params: R::Params) -> Value where R::Params: Serialize, { - let id = self.next_request_id; - self.next_request_id += 1; + let id = self.next_request_id.get(); + self.next_request_id.set(id.wrapping_add(1)); let r = Request::new(id.into(), R::METHOD.to_string(), params); self.send_and_receive(r) } /// Sends an LSP notification to the main loop. - fn notification(&mut self, params: N::Params) + fn notification(&self, params: N::Params) where N::Params: Serialize, { @@ -75,12 +161,12 @@ impl Server { } /// Sends a server notification to the main loop - fn send_notification(&mut self, not: Notification) { + fn send_notification(&self, not: Notification) { self.client.sender.send(Message::Notification(not)).unwrap(); } /// Sends a request to the main loop and receives its response - fn send_and_receive(&mut self, r: Request) -> Value { + fn send_and_receive(&self, r: Request) -> Value { let id = r.id.clone(); self.client.sender.send(r.into()).unwrap(); while let Some(msg) = self.recv() { @@ -106,12 +192,16 @@ impl Server { } /// Receives a message from the message or timeout. - fn recv(&mut self) -> Option { + fn recv(&self) -> Option { let timeout = Duration::from_secs(120); - select! { + let msg = select! { recv(self.client.receiver) -> msg => msg.ok(), recv(after(timeout)) -> _ => panic!("timed out"), + }; + if let Some(ref msg) = msg { + self.messages.borrow_mut().push(msg.clone()); } + msg } } diff --git a/crates/mun_project/Cargo.toml b/crates/mun_project/Cargo.toml index ea030f650..ba3aa35eb 100644 --- a/crates/mun_project/Cargo.toml +++ b/crates/mun_project/Cargo.toml @@ -5,8 +5,10 @@ authors = ["The Mun Team "] edition = "2018" [dependencies] +rustc-hash = "1.1.0" serde = "1.0" serde_derive = "1.0" toml = "0.5" semver = { version = "0.10", features = ["serde"] } anyhow = "1.0" +paths = { path="../mun_paths", package="mun_paths" } diff --git a/crates/mun_project/src/lib.rs b/crates/mun_project/src/lib.rs index a32333474..8b92dc667 100644 --- a/crates/mun_project/src/lib.rs +++ b/crates/mun_project/src/lib.rs @@ -1,7 +1,9 @@ -mod manifest; -mod package; - pub use manifest::{Manifest, ManifestMetadata, PackageId}; pub use package::Package; +pub use project_manifest::ProjectManifest; + +mod manifest; +mod package; +mod project_manifest; pub const MANIFEST_FILENAME: &str = "mun.toml"; diff --git a/crates/mun_language_server/src/project_manifest.rs b/crates/mun_project/src/project_manifest.rs similarity index 88% rename from crates/mun_language_server/src/project_manifest.rs rename to crates/mun_project/src/project_manifest.rs index 316574e22..097e3a9ea 100644 --- a/crates/mun_language_server/src/project_manifest.rs +++ b/crates/mun_project/src/project_manifest.rs @@ -1,9 +1,8 @@ +use crate::MANIFEST_FILENAME; use anyhow::bail; use paths::{AbsPath, AbsPathBuf}; use rustc_hash::FxHashSet; -use std::convert::TryFrom; -use std::fs::read_dir; -use std::io; +use std::{convert::TryFrom, fs::read_dir, io}; /// A wrapper around a path to a mun project #[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)] @@ -15,14 +14,14 @@ impl ProjectManifest { /// Constructs a new [`ProjectManifest`] from a path pub fn from_manifest_path(path: impl AsRef) -> anyhow::Result { let path = path.as_ref(); - if path.ends_with(project::MANIFEST_FILENAME) { + if path.ends_with(MANIFEST_FILENAME) { Ok(Self { path: path.to_path_buf(), }) } else { bail!( "project root must point to {}: {}", - project::MANIFEST_FILENAME, + MANIFEST_FILENAME, path.display() ); } @@ -37,7 +36,7 @@ impl ProjectManifest { path.is_file() && path .file_name() - .map(|file_name| file_name == project::MANIFEST_FILENAME) + .map(|file_name| file_name == MANIFEST_FILENAME) .unwrap_or(false) }) .map(|path| ProjectManifest {