diff --git a/plugin/core/sessions.py b/plugin/core/sessions.py index b4392348d..5c74d5363 100644 --- a/plugin/core/sessions.py +++ b/plugin/core/sessions.py @@ -86,7 +86,8 @@ from .settings import client_configs from .settings import globalprefs from .settings import userprefs -from .transports import Transport +from .transports import Json +from .transports import TransportWrapper from .transports import TransportCallbacks from .types import Capabilities from .types import ClientConfig @@ -1265,11 +1266,11 @@ def check_applicable(self, sb: SessionBufferProtocol) -> None: _PARTIAL_RESULT_PROGRESS_PREFIX = "$ublime-partial-result-progress-" -class Session(TransportCallbacks): +class Session(TransportCallbacks[Json]): def __init__(self, manager: Manager, logger: Logger, workspace_folders: list[WorkspaceFolder], config: ClientConfig, plugin_class: type[AbstractPlugin] | None) -> None: - self.transport: Transport | None = None + self.transport: TransportWrapper[Json] | None = None self.working_directory: str | None = None self.request_id = 0 # Our request IDs are always integers. self._logger = logger @@ -1533,7 +1534,7 @@ def initialize_async( self, variables: dict[str, str], working_directory: str | None, - transport: Transport, + transport: TransportWrapper[Json], init_callback: InitCallback ) -> None: self.transport = transport diff --git a/plugin/core/transports.py b/plugin/core/transports.py index 1ea3cbbf7..e4a9cd19a 100644 --- a/plugin/core/transports.py +++ b/plugin/core/transports.py @@ -1,127 +1,173 @@ from __future__ import annotations from .logging import exception_log, debug -from .types import TCP_CONNECT_TIMEOUT -from .types import TransportConfig +from abc import abstractmethod from contextlib import closing from functools import partial from queue import Queue -from typing import Any, Callable, Dict, Generic, IO, Protocol, TypeVar +from typing import Any, Callable, Dict, final, Final, Generic, IO, Protocol, Sequence, TypeVar +import http.client import http +import io import json import os import shutil import socket import sublime import subprocess +import contextlib import threading import time import weakref +import ssl try: import orjson except ImportError: orjson = None -T = TypeVar('T') -T_contra = TypeVar('T_contra', contravariant=True) +TCP_CONNECT_TIMEOUT: Final[int] = 5 # seconds +T = TypeVar("T") +T_contra = TypeVar("T_contra", contravariant=True) +Json = Dict[str, Any] + + +def _set_inheritable(inherit_file_descriptors: Sequence[int] | None, value: bool) -> None: + if inherit_file_descriptors and sublime.platform() == "windows": + for file_descriptor in inherit_file_descriptors: + os.set_handle_inheritable(file_descriptor, value) # type: ignore + + +@final +class LaunchConfig: + __slots__ = ("command", "env") + + def __init__(self, command: list[str], env: dict[str, str] = {}) -> None: + self.command: list[str] = command + self.env: dict[str, str] = env + + def start( + self, + cwd: str | None, + stdout: int, + stdin: int, + stderr: int, + inherit_file_descriptors: Sequence[int] | None = None, + ) -> subprocess.Popen: + startupinfo = _fixup_startup_args(self.command, inherit_file_descriptors) + _set_inheritable(inherit_file_descriptors, True) + pass_fds = inherit_file_descriptors if inherit_file_descriptors and sublime.platform() != "windows" else tuple() + try: + return _start_subprocess(self.command, stdout, stdin, stderr, startupinfo, self.env, cwd, pass_fds) + finally: + _set_inheritable(inherit_file_descriptors, False) +@final class StopLoopError(Exception): pass class Transport(Generic[T]): + def __init__(self, encoder: Callable[[T], bytes], decoder: Callable[[bytes], T], http_headers: bool) -> None: + self._encoder = encoder + self._decoder = decoder + self._http_headers = http_headers - def send(self, payload: T) -> None: + @abstractmethod + def read(self) -> T: + raise NotImplementedError() + + @abstractmethod + def write(self, payload: T) -> None: raise NotImplementedError() + @abstractmethod def close(self) -> None: raise NotImplementedError() class TransportCallbacks(Protocol[T_contra]): + def on_transport_close(self, exit_code: int, exception: Exception | None) -> None: ... - def on_transport_close(self, exit_code: int, exception: Exception | None) -> None: - ... - - def on_payload(self, payload: T_contra) -> None: - ... - - def on_stderr_message(self, message: str) -> None: - ... + def on_payload(self, payload: T_contra) -> None: ... + def on_stderr_message(self, message: str) -> None: ... -class AbstractProcessor(Generic[T]): - def write_data(self, writer: IO[bytes], data: T) -> None: - raise NotImplementedError() +def _join_thread(t: threading.Thread) -> None: + if t.ident == threading.current_thread().ident: + return + try: + t.join(2) + except TimeoutError as ex: + exception_log(f"failed to join {t.name} thread", ex) - def read_data(self, reader: IO[bytes]) -> T | None: - raise NotImplementedError() +@final +class ErrorReader(Generic[T]): + """ + Responsible for relaying log messages from a raw stream to a (subclass of) TransportCallbacks. Because the various + transport configurations want to listen to different streams, perhaps completely separate from the regular RPC + transport, this is wrapped in a different class. For instance, a TCP client transport communicating via a socket, + while it listens for log messages on the stdout/stderr streams of a spawned child process. + """ -class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): + def __init__(self, callback_object: TransportCallbacks[T], reader: IO[bytes]) -> None: + self._callback_object = weakref.ref(callback_object) + self._reader = reader + self._thread = threading.Thread(target=self._loop) + self._thread.start() - def write_data(self, writer: IO[bytes], data: dict[str, Any]) -> None: - body = self._encode(data) - writer.writelines((f"Content-Length: {len(body)}\r\n\r\n".encode('ascii'), body)) + def __del__(self) -> None: + _join_thread(self._thread) - def read_data(self, reader: IO[bytes]) -> dict[str, Any] | None: - headers = http.client.parse_headers(reader) # type: ignore - try: - body = reader.read(int(headers.get("Content-Length"))) - except TypeError: - if str(headers) == '\n': - # Expected on process stopping. Gracefully stop the transport. - raise StopLoopError() - else: - # Propagate server's output to the UI. - raise Exception(f"Unexpected payload in server's stdout:\n\n{headers}") + def _loop(self) -> None: try: - return self._decode(body) + while self._reader: + message = self._reader.readline().decode("utf-8", "replace") + if message == "": + continue + callback_object = self._callback_object() + if callback_object: + callback_object.on_stderr_message(message.rstrip()) + else: + break + except (BrokenPipeError, AttributeError): + pass except Exception as ex: - raise Exception(f"JSON decode error: {ex}") - - @staticmethod - def _encode(data: dict[str, Any]) -> bytes: - if orjson: - return orjson.dumps(data) - return json.dumps( - data, - ensure_ascii=False, - sort_keys=False, - check_circular=False, - separators=(',', ':') - ).encode('utf-8') - - @staticmethod - def _decode(message: bytes) -> dict[str, Any]: - if orjson: - return orjson.loads(message) - return json.loads(message.decode('utf-8')) - - -class ProcessTransport(Transport[T]): - - def __init__(self, name: str, process: subprocess.Popen | None, socket: socket.socket | None, - reader: IO[bytes], writer: IO[bytes], stderr: IO[bytes] | None, - processor: AbstractProcessor[T], callback_object: TransportCallbacks[T]) -> None: + exception_log("unexpected exception type in error reader", ex) + + +@final +class TransportWrapper(Generic[T]): + """ + Double dispatch-like class that takes a (subclass of) Transport, and provides to a (subclass of) TransportCallbacks + appropriately decoded messages of type T. The TransportWrapper is also responsible for keeping the spawned child + process around (if any), and also keeps track of the ErrorReader. It can be the case that there is no ErrorReader, + for instance when talking to a remote TCP language server. So it can be None. + """ + + def __init__( + self, + callback_object: TransportCallbacks[T], + transport: Transport[T], + process: subprocess.Popen | None, + error_reader: ErrorReader | None, + ) -> None: self._closed = False - self._process = process - self._socket = socket - self._reader = reader - self._writer = writer - self._stderr = stderr - self._processor = processor - self._reader_thread = threading.Thread(target=self._read_loop, name=f'{name}-reader') - self._writer_thread = threading.Thread(target=self._write_loop, name=f'{name}-writer') self._callback_object = weakref.ref(callback_object) + self._transport = transport + self._process = process + self._error_reader = error_reader + self._reader_thread = threading.Thread(target=self._read_loop) + self._writer_thread = threading.Thread(target=self._write_loop) self._send_queue: Queue[T | None] = Queue(0) self._reader_thread.start() self._writer_thread.start() - if stderr: - self._stderr_thread = threading.Thread(target=self._stderr_loop, name=f'{name}-stderr') - self._stderr_thread.start() + + @property + def process_args(self) -> Any: + return self._process.args if self._process else None def send(self, payload: T) -> None: self._send_queue.put_nowait(payload) @@ -129,30 +175,19 @@ def send(self, payload: T) -> None: def close(self) -> None: if not self._closed: self._send_queue.put_nowait(None) - if self._socket: - self._socket.close() + self._transport.close() self._closed = True - def _join_thread(self, t: threading.Thread) -> None: - if t.ident == threading.current_thread().ident: - return - try: - t.join(2) - except TimeoutError as ex: - exception_log(f"failed to join {t.name} thread", ex) - def __del__(self) -> None: self.close() - self._join_thread(self._writer_thread) - self._join_thread(self._reader_thread) - if self._stderr_thread: - self._join_thread(self._stderr_thread) + _join_thread(self._writer_thread) + _join_thread(self._reader_thread) def _read_loop(self) -> None: exception = None try: - while self._reader: - payload = self._processor.read_data(self._reader) + while True: + payload = self._transport.read() if payload is None: continue @@ -205,92 +240,422 @@ def invoke() -> None: def _write_loop(self) -> None: exception: Exception | None = None try: - while self._writer: + while True: d = self._send_queue.get() if d is None: break - self._processor.write_data(self._writer, d) - self._writer.flush() + self._transport.write(d) except (BrokenPipeError, AttributeError): pass except Exception as ex: exception = ex self._end(exception) - def _stderr_loop(self) -> None: + +_encode_options: Final[dict[str, Any]] = { + "ensure_ascii": False, + "sort_keys": False, + "check_circular": False, + "separators": (",", ":"), +} + + +def encode_json(data: Json) -> bytes: + return orjson.dumps(data) if orjson else json.dumps(data, **_encode_options).encode("utf-8") + + +def decode_json(message: bytes) -> Json: + return (orjson or json).loads(message) + + +class FileObjectTransport(Transport[T]): + def __init__( + self, + encoder: Callable[[T], bytes], + decoder: Callable[[bytes], T], + http_headers: bool, + reader: io.BufferedIOBase, + writer: io.BufferedIOBase, + ) -> None: + super().__init__(encoder, decoder, http_headers) + self._reader = reader + self._writer = writer + + def read(self) -> T: + headers: http.client.HTTPMessage | None = None try: - while self._stderr: - if self._closed: - # None message already posted, just return - return - message = self._stderr.readline().decode('utf-8', 'replace') - if message == '': - continue - callback_object = self._callback_object() - if callback_object: - callback_object.on_stderr_message(message.rstrip()) - else: - break - except (BrokenPipeError, AttributeError): - pass + if self._http_headers: + headers = http.client.parse_headers(self._reader) + content_length = headers.get("Content-Length") + if not isinstance(content_length, str): + raise Exception("missing Content-Length header") + body = self._reader.read(int(content_length)) + else: + body = self._reader.readline() + if not body or body == b"\n": + raise StopLoopError() + except TypeError: + if str(headers) == "\n": + # Expected on process stopping. Gracefully stop the transport. + raise StopLoopError() + else: + # Propagate server's output to the UI. + raise Exception(f"Unexpected payload in server's stdout:\n\n{headers}") + try: + return self._decoder(body) except Exception as ex: - exception_log('unexpected exception type in stderr loop', ex) - self._send_queue.put_nowait(None) + raise Exception(f"JSON decode error: {ex}") + + def write(self, payload: T) -> None: + body = self._encoder(payload) + if self._http_headers: + self._writer.writelines((f"Content-Length: {len(body)}\r\n\r\n".encode("ascii"), body)) + else: + self._writer.writelines((body, b"\n")) + self._writer.flush() + + def close(self) -> None: + self._writer.close() + self._reader.close() + + +class SocketTransport(FileObjectTransport[T]): + def __init__( + self, encoder: Callable[[T], bytes], decoder: Callable[[bytes], T], http_headers: bool, sock: socket.socket + ) -> None: + reader_writer_pair: io.BufferedRWPair = sock.makefile("rwb") + super().__init__(encoder, decoder, http_headers, reader_writer_pair, reader_writer_pair) + self._socket = sock + def close(self) -> None: + super().close() + self._socket.close() + + +class WebSocketTransport(Transport[T]): + def read(self) -> T: + raise NotImplementedError() + + def write(self, payload: T) -> None: + raise NotImplementedError() + + def close(self) -> None: + raise NotImplementedError() -# Can be a singleton since it doesn't hold any state. -json_rpc_processor = JsonRpcProcessor() +class DuplexPipeTransport(SocketTransport[T]): + def __init__( + self, + encoder: Callable[[T], bytes], + decoder: Callable[[bytes], T], + http_headers: bool, + sock1: socket.socket, + sock2: socket.socket, + ) -> None: + super().__init__(encoder, decoder, http_headers, sock2) + self._sock1 = sock1 -def create_transport(config: TransportConfig, cwd: str | None, - callback_object: TransportCallbacks) -> Transport[dict[str, Any]]: - if config.tcp_port is not None: - assert config.tcp_port is not None - if config.tcp_port < 0: - stdout = subprocess.PIPE + def close(self) -> None: + super().close() + self._sock1.close() + + +class TransportConfig: + """ + Responsible for instantiating a TransportWrapper, which is the object that does the actual RPC communication. + """ + + __slots__ = ("_http_headers",) + + def __init__(self, http_headers: bool = True) -> None: + self._http_headers = http_headers + + @property + def http_headers(self) -> bool: + return self._http_headers + + def requires_launch_config(self) -> bool: + return False + + def _resolve_launch_config( + self, + command: list[str], + env: dict[str, str | list[str]] | None, + variables: dict[str, str], + ) -> LaunchConfig: + command = sublime.expand_variables(command, variables) + command = [os.path.expanduser(arg) for arg in command] + resolved_env = os.environ.copy() + for key, value in env.items() if isinstance(env, dict) else {}: + if isinstance(value, list): + value = os.path.pathsep.join(value) + if key == "PATH": + resolved_env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + resolved_env[key] + else: + resolved_env[key] = sublime.expand_variables(value, variables) + return LaunchConfig(command, resolved_env) + + @abstractmethod + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[Json], + ) -> TransportWrapper[Json]: + raise NotImplementedError() + + +class StdioTransportConfig(TransportConfig): + """ + The simplest of transport configs: launch the subprocess and communicate with it over standard I/O. This transport + config requires a "command". This is the default transport config when only a "command" is specified in the + ClientConfig. + """ + + __slots__ = () + + def __init__(self, http_headers: bool) -> None: + super().__init__(http_headers) + + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[Json], + ) -> TransportWrapper: + if not command: + raise RuntimeError('missing "command" to start a child process for running the language server') + process = self._resolve_launch_config(command, env, variables).start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + return TransportWrapper( + callback_object=callbacks, + transport=FileObjectTransport(encode_json, decode_json, self.http_headers, process.stdout, process.stdin), # type: ignore # noqa: E501 + process=process, + error_reader=ErrorReader(callbacks, process.stderr), # type: ignore + ) + + +class TcpClientTransportConfig(TransportConfig): + """ + Transport for communicating to a language server that expects incoming client connections. The language server acts + as the TCP server, this text editor acts as the TCP client. One can have a "command" with this transport + configuration. In that case the subprocess is launched, and then the TCP connection is attempted. If no "command" is + given, a TCP connection is still made. This can be used for cases where the language server is already running as + part of some larger application, like Godot Editor. + """ + + __slots__ = ("_hostname", "_port") + + def __init__(self, http_headers: bool, hostname: str | None, port: int | None) -> None: + super().__init__(http_headers) + self._hostname = hostname + self._port = port + if isinstance(self._port, int) and self._port <= 0: + raise RuntimeError("invalid port number") + + def _connect(self, port: int) -> socket.socket: + start_time = time.time() + last_exception: Exception | None = None + while time.time() - start_time < TCP_CONNECT_TIMEOUT: + try: + return socket.create_connection((self._hostname or "", port)) + except Exception as ex: + last_exception = ex + if last_exception: + raise last_exception else: - stdout = subprocess.DEVNULL - stdin = subprocess.DEVNULL - else: - stdout = subprocess.PIPE - stdin = subprocess.PIPE - sock: socket.socket | None = None - process: subprocess.Popen | None = None - - def start_subprocess() -> subprocess.Popen: - startupinfo = _fixup_startup_args(config.command) - return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd) - - if config.listener_socket: - assert isinstance(config.tcp_port, int) and config.tcp_port > 0 - if config.command: - process, sock, reader, writer = _start_subprocess_and_await_connection( - config.listener_socket, start_subprocess + raise RuntimeError("failed to connect") + + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[Json], + ) -> TransportWrapper: + port = _add_and_resolve_port_variable(variables, self._port) + if command: + process = self._resolve_launch_config(command, env, variables).start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.DEVNULL, + stderr=subprocess.STDOUT, ) + error_reader: ErrorReader | None = ErrorReader(callbacks, process.stdout) # type: ignore else: - sock, reader, writer = _await_client_connection(config.listener_socket) - else: - if config.command: - process = start_subprocess() - elif not config.tcp_port: - raise RuntimeError("Failed to provide command or tcp_port, at least one of them has to be configured") - if config.tcp_port: - sock = _connect_tcp(config.tcp_port) - if sock is None: - raise RuntimeError(f"Failed to connect on port {config.tcp_port}") - reader = sock.makefile('rwb') # type: ignore - writer = reader - else: - reader = process.stdout # type: ignore - writer = process.stdin # type: ignore - if not reader or not writer: - raise RuntimeError(f'Failed initializing transport: reader: {reader}, writer: {writer}') - stderr = process.stderr if process else None - return ProcessTransport( - config.name, process, sock, reader, writer, stderr, json_rpc_processor, callback_object) # type: ignore - + process = None + error_reader = None + return TransportWrapper( + callback_object=callbacks, + transport=SocketTransport(encode_json, decode_json, self.http_headers, self._connect(port)), + process=process, + error_reader=error_reader, + ) + + +class TcpServerTransportConfig(TransportConfig): + """ + Transport for communicating to a language server over TCP. The difference, however, is that this transport will + start a TCP listener socket accepting new TCP client connections. Once a client connects to this text editor acting + as the TCP server, we'll assume it's the language server we just launched. As such, this tranport requires a + "command" for starting the language server subprocess. + """ + + __slots__ = ("_port",) + + def __init__(self, http_headers: bool, port: int | None = None) -> None: + super().__init__(http_headers) + self._port = port + + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[Json], + ) -> TransportWrapper: + if not command: + raise RuntimeError('missing "command" to start a child process for running the language server') + port = _add_and_resolve_port_variable(variables, self._port) + launch = self._resolve_launch_config(command, env, variables) + listener_socket = socket.socket() + listener_socket.bind(("", port)) + listener_socket.settimeout(TCP_CONNECT_TIMEOUT) + listener_socket.listen(TCP_CONNECT_TIMEOUT) + process: subprocess.Popen | None = None + + # We need to be able to start the process while also awaiting a client connection. + def start_in_background() -> None: + nonlocal process + # Sleep for one second, because the listener socket needs to be in the "accept" state before starting the + # subprocess. This is hacky, and will get better when we can use asyncio. + time.sleep(1) + process = launch.start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) -_subprocesses: weakref.WeakSet[subprocess.Popen] = weakref.WeakSet() + thread = threading.Thread(target=start_in_background) + thread.start() + with closing(listener_socket): + # Await one client connection (blocking!) + sock, _ = listener_socket.accept() + thread.join() + + error_reader = ErrorReader(callbacks, process.stdout) # type: ignore + return TransportWrapper( + callback_object=callbacks, + transport=SocketTransport(encode_json, decode_json, self.http_headers, sock), + process=process, + error_reader=error_reader, + ) + + +class TlsClientTransportConfig(TcpClientTransportConfig): + """ + Exactly like the TCP client transport, except we wrap the communication in secure TLS/SSL. + """ + + __slots__ = () + + def _connect(self, port: int) -> socket.socket: + # TODO: Check if a call to ssl.create_default_context() is needed here. + return ssl.wrap_socket(super()._connect(port)) + + +class WebSocketClientTransportConfig(TransportConfig): + """ + Transport configuration for connecting, as an HTTP(S) client, to an HTTP(S) server. The HTTP(S) server is expected + to make the WebSocket upgrade negotiation, after which we upgrade to WebSocket and will then start talking the LSP + protocol. This transport can have a "command", in which case we start the subprocess using the provided "command", + and then start the websocket connection. + """ + + __slots__ = ("_hostname", "_port", "_secure") + + def __init__( + self, + http_headers: bool, + hostname: str | None, + port: int | None, + secure: bool = False, + ) -> None: + super().__init__(http_headers) + self._hostname: Final[str | None] = hostname + self._port: Final[int | None] = port + self._secure: Final[bool] = secure + + @property + def port(self) -> int: + if isinstance(self._port, int): + return self._port + return http.client.HTTPS_PORT if self._secure else http.client.HTTP_PORT + + +class DuplexPipeTransportConfig(TransportConfig): + """ + Transport configuration for communicating with a process using a "duplex pipe" construction. The spawned subprocess + is informed of the pipe's file descriptor with an environment variable. The pipe file descriptor handle is inherited + by the child process. + + On Linux and macOS, this is implemented using AF_UNIX socketpairs: + https://www.man7.org/linux/man-pages/man7/unix.7.html + + !!! TODO !!! + On Windows, this is implemented using NamedPipes: https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipes + """ + + __slots__ = ("_child_fileno_env_key",) + + def __init__(self, http_headers: bool, child_fileno_env_key: str) -> None: + super().__init__(http_headers) + self._child_fileno_env_key: Final[str] = child_fileno_env_key + + def start( + self, + command: list[str] | None, + env: dict[str, str | list[str]] | None, + cwd: str | None, + variables: dict[str, str], + callbacks: TransportCallbacks[Json], + ) -> TransportWrapper: + if not command: + raise RuntimeError('missing "command" to start a child process for running the language server') + if env is None: + env = {} + # !!! TODO !!! windows named pipes + sock1, sock2 = socket.socketpair() + sock1.set_inheritable(True) + env[self._child_fileno_env_key] = str(sock1.fileno()) + process = self._resolve_launch_config(command, env, variables).start( + cwd, + stdout=subprocess.PIPE, + stdin=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + inherit_file_descriptors=(sock1.fileno(),), + ) + error_reader = ErrorReader(callbacks, process.stdout) # type: ignore + return TransportWrapper( + callback_object=callbacks, + transport=DuplexPipeTransport(encode_json, decode_json, self.http_headers, sock1, sock2), + process=process, + error_reader=error_reader, + ) + + +_subprocesses: Final[weakref.WeakSet[subprocess.Popen]] = weakref.WeakSet() def kill_all_subprocesses() -> None: @@ -308,10 +673,12 @@ def kill_all_subprocesses() -> None: pass -def _fixup_startup_args(args: list[str]) -> Any: +def _fixup_startup_args(args: list[str], inherit_file_descriptors: Sequence[int] | None = None) -> Any: startupinfo = None if sublime.platform() == "windows": startupinfo = subprocess.STARTUPINFO() # type: ignore + if inherit_file_descriptors: + startupinfo.lpAttributeList = {"handle_list": inherit_file_descriptors} startupinfo.dwFlags |= subprocess.SW_HIDE | subprocess.STARTF_USESHOWWINDOW # type: ignore executable_arg = args[0] _, ext = os.path.splitext(executable_arg) @@ -321,7 +688,7 @@ def _fixup_startup_args(args: list[str]) -> Any: # node has .cmd # dart has .bat # python has .exe wrappers - not needed - for extension in ['.cmd', '.bat']: + for extension in [".cmd", ".bat"]: if path_to_executable and path_to_executable.lower().endswith(extension): args[0] = executable_arg + extension break @@ -335,9 +702,12 @@ def _start_subprocess( stderr: int, startupinfo: Any, env: dict[str, str], - cwd: str | None + cwd: str | None, + pass_fds: Sequence[int], ) -> subprocess.Popen: debug(f"starting {args} in {cwd if cwd else os.getcwd()}") + if pass_fds: + debug(f"inheriting file descriptors: {pass_fds}") process = subprocess.Popen( args=args, stdin=stdin, @@ -345,46 +715,23 @@ def _start_subprocess( stderr=stderr, startupinfo=startupinfo, env=env, - cwd=cwd) + cwd=cwd, + pass_fds=pass_fds, + ) + debug("hello world") _subprocesses.add(process) return process -def _await_client_connection(listener_socket: socket.socket) -> tuple[socket.socket, IO[bytes], IO[bytes]]: - with closing(listener_socket): - # Await one client connection (blocking!) - sock, _ = listener_socket.accept() - reader = sock.makefile('rwb') # type: ignore - writer = reader - return sock, reader, writer # type: ignore - +def _find_free_port() -> int: + with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] -def _start_subprocess_and_await_connection( - listener_socket: socket.socket, subprocess_starter: Callable[[], subprocess.Popen] -) -> tuple[subprocess.Popen, socket.socket, IO[bytes], IO[bytes]]: - process = None - # We need to be able to start the process while also awaiting a client connection. - def start_in_background() -> None: - nonlocal process - # Sleep for one second, because the listener socket needs to be in the "accept" state before starting the - # subprocess. This is hacky, and will get better when we can use asyncio. - time.sleep(1) - process = subprocess_starter() - - thread = threading.Thread(target=start_in_background) - thread.start() - sock, reader, writer = _await_client_connection(listener_socket) - thread.join() - assert process is not None - return process, sock, reader, writer # type: ignore - - -def _connect_tcp(port: int) -> socket.socket | None: - start_time = time.time() - while time.time() - start_time < TCP_CONNECT_TIMEOUT: - try: - return socket.create_connection(('localhost', port)) - except ConnectionRefusedError: - pass - return None +def _add_and_resolve_port_variable(variables: dict[str, str], port: int | None) -> int: + if port is None: + port = _find_free_port() + variables["port"] = str(port) + return port diff --git a/plugin/core/types.py b/plugin/core/types.py index b5d5b932f..fb726be70 100644 --- a/plugin/core/types.py +++ b/plugin/core/types.py @@ -3,6 +3,13 @@ from .file_watcher import FileWatcherEventType from .logging import debug, set_debug_logging from .protocol import ServerCapabilities, TextDocumentSyncKind, TextDocumentSyncOptions +from .transports import DuplexPipeTransportConfig +from .transports import StdioTransportConfig +from .transports import TcpClientTransportConfig +from .transports import TcpServerTransportConfig +from .transports import TlsClientTransportConfig +from .transports import TransportConfig +from .transports import WebSocketClientTransportConfig from .url import filename_to_uri from .url import parse_uri from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, TypedDict, TypeVar, Union @@ -12,14 +19,11 @@ from wcmatch.glob import GLOBSTAR import contextlib import fnmatch -import os import posixpath -import socket import sublime import time -TCP_CONNECT_TIMEOUT = 5 # seconds FEATURES_TIMEOUT = 300 # milliseconds WORKSPACE_DIAGNOSTICS_TIMEOUT = 3000 # milliseconds @@ -643,26 +647,6 @@ def map_from_remote_to_local(self, uri: str) -> tuple[str, bool]: return _translate_path(uri, self._remote, self._local) -class TransportConfig: - __slots__ = ("name", "command", "tcp_port", "env", "listener_socket") - - def __init__( - self, - name: str, - command: list[str], - tcp_port: int | None, - env: dict[str, str], - listener_socket: socket.socket | None - ) -> None: - if not command and not tcp_port: - raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') - self.name = name - self.command = command - self.tcp_port = tcp_port - self.env = env - self.listener_socket = listener_socket - - class ClientConfig: def __init__(self, name: str, @@ -671,7 +655,13 @@ def __init__(self, schemes: list[str] | None = None, command: list[str] | None = None, binary_args: list[str] | None = None, # DEPRECATED + hostname: str | None = None, tcp_port: int | None = None, + use_tls: bool | None = None, + websocket: bool | None = None, + http_headers: bool = True, + pipe_fileno_env_key: str | None = None, + use_node_ipc: bool = False, auto_complete_selector: str | None = None, enabled: bool = True, init_options: DottedDict = DottedDict(), @@ -695,7 +685,18 @@ def __init__(self, else: assert isinstance(binary_args, list) self.command = binary_args + self.hostname = hostname self.tcp_port = tcp_port + self.use_tls = use_tls + self.websocket = websocket + # "use_node_ipc" is a convenience bool setting that modifies the http_header and pipe_fileno_env_key settings. + self.use_node_ipc = use_node_ipc + if self.use_node_ipc: + self.http_headers = False + self.pipe_fileno_env_key = "NODE_CHANNEL_FD" + else: + self.http_headers = http_headers + self.pipe_fileno_env_key = pipe_fileno_env_key self.auto_complete_selector = auto_complete_selector self.enabled = enabled self.init_options = init_options @@ -729,9 +730,15 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> Cli priority_selector=_read_priority_selector(s), schemes=s.get("schemes"), command=read_list_setting(s, "command", []), + hostname=s.get("hostname"), tcp_port=s.get("tcp_port"), + use_tls=bool(s.get("use_tls", False)), + websocket=bool(s.get("websocket", False)), + http_headers=bool(s.get("http_headers", True)), + pipe_fileno_env_key=s.get("pipe_fileno_env_key", None), + use_node_ipc=bool(s.get("use_node_ipc", False)), auto_complete_selector=s.get("auto_complete_selector"), - # Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package. + # Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package. enabled=bool(s.get("enabled", True)), init_options=init_options, settings=settings, @@ -760,7 +767,13 @@ def from_dict(cls, name: str, d: dict[str, Any]) -> ClientConfig: priority_selector=_read_priority_selector(d), schemes=schemes, command=d.get("command", []), + hostname=d.get("hostname"), tcp_port=d.get("tcp_port"), + use_tls=bool(d.get("use_tls", False)), + websocket=bool(d.get("websocket", False)), + http_headers=bool(d.get("http_headers", True)), + pipe_fileno_env_key=d.get("pipe_fileno_env_key", False), + use_node_ipc=bool(d.get("use_node_ipc", False)), auto_complete_selector=d.get("auto_complete_selector"), enabled=d.get("enabled", False), init_options=DottedDict(d.get("initializationOptions")), @@ -788,7 +801,12 @@ def from_config(cls, src_config: ClientConfig, override: dict[str, Any]) -> Clie priority_selector=_read_priority_selector(override) or src_config.priority_selector, schemes=override.get("schemes", src_config.schemes), command=override.get("command", src_config.command), + hostname=override.get("hostname", src_config.hostname), tcp_port=override.get("tcp_port", src_config.tcp_port), + use_tls=override.get("use_tls", src_config.use_tls), + websocket=override.get("use_tls", src_config.use_tls), + http_headers=override.get("http_headers", src_config.http_headers), + use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc), auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector), enabled=override.get("enabled", src_config.enabled), init_options=DottedDict.from_base_and_override( @@ -804,36 +822,33 @@ def from_config(cls, src_config: ClientConfig, override: dict[str, Any]) -> Clie path_maps=path_map_override if path_map_override else src_config.path_maps ) - def resolve_transport_config(self, variables: dict[str, str]) -> TransportConfig: - tcp_port: int | None = None - listener_socket: socket.socket | None = None + def create_transport_config(self) -> TransportConfig: + """ + Create a (subclass of) TransportConfig that is able to start a TransportWrapper. + """ if self.tcp_port is not None: - # < 0 means we're hosting a TCP server if self.tcp_port < 0: - # -1 means pick any free port - if self.tcp_port < -1: - tcp_port = -self.tcp_port - # Create a listener socket for incoming connections - listener_socket = _start_tcp_listener(tcp_port) - tcp_port = int(listener_socket.getsockname()[1]) + return TcpServerTransportConfig(self.http_headers, None if self.tcp_port == -1 else -self.tcp_port) + elif self.use_tls: + return TlsClientTransportConfig( + self.http_headers, + self.hostname, + None if self.tcp_port == 0 else self.tcp_port, + ) else: - tcp_port = _find_free_port() if self.tcp_port == 0 else self.tcp_port - if tcp_port is not None: - variables["port"] = str(tcp_port) - command = sublime.expand_variables(self.command, variables) - command = [os.path.expanduser(arg) for arg in command] - if tcp_port is not None: - # DEPRECATED -- replace {port} with $port or ${port} in your client config - command = [a.replace('{port}', str(tcp_port)) for a in command] - env = os.environ.copy() - for key, value in self.env.items(): - if isinstance(value, list): - value = os.path.pathsep.join(value) - if key == 'PATH': - env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key] - else: - env[key] = sublime.expand_variables(value, variables) - return TransportConfig(self.name, command, tcp_port, env, listener_socket) + return TcpClientTransportConfig( + self.http_headers, + self.hostname, + None if self.tcp_port == 0 else self.tcp_port, + ) + elif self.pipe_fileno_env_key: + return DuplexPipeTransportConfig(self.http_headers, self.pipe_fileno_env_key) + elif self.websocket: + return WebSocketClientTransportConfig( + self.http_headers, self.hostname, self.tcp_port, bool(self.use_tls) + ) + else: + return StdioTransportConfig(self.http_headers) def set_view_status(self, view: sublime.View, message: str) -> None: if sublime.load_settings("LSP.sublime-settings").get("show_view_status"): @@ -1007,18 +1022,3 @@ def _read_priority_selector(config: sublime.Settings | dict[str, Any]) -> str: if language_id: return f"source.{language_id}" return "" - - -def _find_free_port() -> int: - with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - s.bind(('', 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - return s.getsockname()[1] - - -def _start_tcp_listener(tcp_port: int | None) -> socket.socket: - sock = socket.socket() - sock.bind(('localhost', tcp_port or 0)) - sock.settimeout(TCP_CONNECT_TIMEOUT) - sock.listen(1) - return sock diff --git a/plugin/core/windows.py b/plugin/core/windows.py index 9234724ef..3a47e980c 100644 --- a/plugin/core/windows.py +++ b/plugin/core/windows.py @@ -25,7 +25,6 @@ from .settings import client_configs from .settings import LspSettingsChangeListener from .settings import userprefs -from .transports import create_transport from .types import ClientConfig from .types import matches_pattern from .types import sublime_pattern_to_glob @@ -277,8 +276,8 @@ def start_async(self, config: ClientConfig, initiating_view: sublime.View) -> No transport_cwd: str | None = cwd else: transport_cwd = workspace_folders[0].path if workspace_folders else None - transport_config = config.resolve_transport_config(variables) - transport = create_transport(transport_config, transport_cwd, session) + transport = config.create_transport_config().start( + config.command, config.env, transport_cwd, variables, session) if plugin_class: plugin_class.on_post_start(self._window, initiating_view, workspace_folders, config) config.set_view_status(initiating_view, "initialize") @@ -295,7 +294,7 @@ def start_async(self, config: ClientConfig, initiating_view: sublime.View) -> No "Re-enable by running \"LSP: Enable Language Server In Project\" from the Command Palette.", "\n\n--- Error: ---\n{1}" )).format(config.name, str(e)) - exception_log(f"Unable to start subprocess for {config.name}", e) + exception_log(f"Unable to initialize language server for {config.name}", e) if isinstance(e, CalledProcessError): print("Server output:\n{}".format(e.output.decode('utf-8', 'replace'))) self._config_manager.disable_config(config.name, only_for_session=True) diff --git a/plugin/tooling.py b/plugin/tooling.py index acfb42c0b..f4581bcc5 100644 --- a/plugin/tooling.py +++ b/plugin/tooling.py @@ -3,9 +3,8 @@ from .core.logging import debug from .core.registry import windows from .core.sessions import get_plugin -from .core.transports import create_transport -from .core.transports import Transport from .core.transports import TransportCallbacks +from .core.transports import TransportWrapper from .core.types import Capabilities from .core.types import ClientConfig from .core.version import __version__ @@ -495,7 +494,7 @@ def __init__( on_close: Callable[[list[str], str, int], None] ) -> None: self._on_close = on_close - self._transport: Transport | None = None + self._transport: TransportWrapper | None = None self._resolved_command: list[str] = [] self._stderr_lines: list[str] = [] try: @@ -516,9 +515,9 @@ def __init__( cwd = plugin_class.on_pre_start(window, initiating_view, workspace_folders, config) if not cwd and workspace_folders: cwd = workspace_folders[0].path - transport_config = config.resolve_transport_config(variables) - self._resolved_command = transport_config.command - self._transport = create_transport(transport_config, cwd, self) + transport_config = config.create_transport_config() + self._transport = transport_config.start(config.command, config.env, cwd, variables, self) + self._resolved_command = self._transport.process_args sublime.set_timeout_async(self.force_close_transport, self.CLOSE_TIMEOUT_SEC * 1000) except Exception as ex: self.on_transport_close(-1, ex) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..2190e6121 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.ruff] +line-length = 120 +indent-width = 4 +target-version = "py38" diff --git a/sublime-package.json b/sublime-package.json index f98b52c50..a9ecb51fb 100644 --- a/sublime-package.json +++ b/sublime-package.json @@ -80,6 +80,11 @@ }, "markdownDescription": "The command to start the language server." }, + "ClientUseNodeIpc": { + "type": "boolean", + "default": false, + "markdownDescription": "Communicate with the language server over Node.js IPC. The command must be adjusted accordingly, e.g. `--stdio` must be replaced with `--node-ipc` for some servers." + }, "ClientEnabled": { "type": "boolean", "default": false, @@ -328,6 +333,9 @@ "command": { "$ref": "sublime://settings/LSP#/definitions/ClientCommand" }, + "use_node_ipc": { + "$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc" + }, "enabled": { "$ref": "sublime://settings/LSP#/definitions/ClientEnabled" }, @@ -879,6 +887,9 @@ "command": { "$ref": "sublime://settings/LSP#/definitions/ClientCommand" }, + "use_node_ipc": { + "$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc" + }, "enabled": { "$ref": "sublime://settings/LSP#/definitions/ClientEnabled" }, diff --git a/tests/test_protocol.py b/tests/test_protocol.py index 6b0d5c36c..29f7171f5 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -1,6 +1,6 @@ from __future__ import annotations from LSP.plugin.core.protocol import Point, Position, Range, Request, Notification -from LSP.plugin.core.transports import JsonRpcProcessor +from LSP.plugin.core.transports import encode_json, decode_json import unittest @@ -22,9 +22,9 @@ def test_lsp_conversion(self) -> None: class EncodingTests(unittest.TestCase): def test_encode(self) -> None: - encoded = JsonRpcProcessor._encode({"text": "😃"}) + encoded = encode_json({"text": "😃"}) self.assertEqual(encoded, b'{"text":"\xF0\x9F\x98\x83"}') - decoded = JsonRpcProcessor._decode(encoded) + decoded = decode_json(encoded) self.assertEqual(decoded, {"text": "😃"}) diff --git a/tox.ini b/tox.ini index 7d1d61255..23da1b8ca 100644 --- a/tox.ini +++ b/tox.ini @@ -1,4 +1,4 @@ -# Tox (http://tox.testrun.org/) is a tool for running tests +# Tox (https://github.com/tox-dev/tox) is a tool for running tests # in multiple virtualenvs. This configuration file will run the # test suite on all supported python versions. To use it, "pip install tox" # and then run "tox" from this directory.