Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --node-ipc support #2015

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
157 changes: 101 additions & 56 deletions plugin/core/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from contextlib import closing
from functools import partial
from queue import Queue
import http
import http.client
import json
import multiprocessing.connection
import os
import shutil
import socket
Expand Down Expand Up @@ -48,57 +49,91 @@ def on_stderr_message(self, message: str) -> None:

class AbstractProcessor(Generic[T]):

def write_data(self, writer: IO[bytes], data: T) -> None:
def write_data(self, data: T) -> None:
raise NotImplementedError()

def read_data(self, reader: IO[bytes]) -> Optional[T]:
def read_data(self) -> Optional[T]:
raise NotImplementedError()


class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]):
def encode_payload(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
check_circular=False,
separators=(',', ':')
).encode('utf-8')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fits one line now :)


def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None:
body = self._encode(data)
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))

def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]:
headers = http.client.parse_headers(reader) # type: ignore
def decode_payload(message: bytes) -> Optional[Dict[str, Any]]:
try:
return json.loads(message.decode('utf-8'))
except Exception as ex:
exception_log("JSON decode error", ex)
return None
alecmev marked this conversation as resolved.
Show resolved Hide resolved


class StandardProcessor(AbstractProcessor[Dict[str, Any]]):

def __init__(self, reader: IO[bytes], writer: IO[bytes]):
self._reader = reader
self._writer = writer

def write_data(self, data: Dict[str, Any]) -> None:
body = encode_payload(data)
self._writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
self._writer.flush()

def read_data(self) -> Optional[Dict[str, Any]]:
headers = http.client.parse_headers(self._reader) # type: ignore
try:
body = reader.read(int(headers.get("Content-Length")))
body = self._reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
try:
return self._decode(body)
except Exception as ex:
exception_log("JSON decode error", ex)
return None

@staticmethod
def _encode(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
sort_keys=False,
check_circular=False,
separators=(',', ':')
).encode('utf-8')

@staticmethod
def _decode(message: bytes) -> Dict[str, Any]:
return json.loads(message.decode('utf-8'))
return decode_payload(body)


class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]):
_buf = bytearray()
_lines = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these two properties be part of the class instance, instead of the class? If I have LSP-dockerfile, LSP-stylelint, and LSP-typescript running with node IPC, wouldn't these overwrite each other in unpredictable ways? Maybe I'm not understanding something.

Copy link
Member

@rchl rchl Aug 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it should work as expected (in most cases ;)) due to how Python works with those.

When instance of the class reads those properties it looks them up first in the instance and then in the class. When writing it always writes to the instance property (creates it if necessary).

The only issue would be if the instance would read a class property (for example self._buf) and then mutate that property (self._buf.append(...) or whatever methods that supports). Then it would mutate the class property.

So I guess it's still safer to set those on the instance.


def __init__(self, conn: multiprocessing.connection._ConnectionBase):
self._conn = conn

def write_data(self, data: Dict[str, Any]) -> None:
body = encode_payload(data) + b"\n"
while len(body):
n = self._conn._write(self._conn.fileno(), body) # type: ignore
body = body[n:]

def read_data(self) -> Optional[Dict[str, Any]]:
while self._lines == 0:
chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore
if len(chunk) == 0:
# EOF reached: https://docs.python.org/3/library/os.html#os.read
raise StopLoopError()

self._buf += chunk
self._lines += chunk.count(b'\n')

self._lines -= 1
message, _, self._buf = self._buf.partition(b'\n')
return decode_payload(message)


class ProcessTransport(Transport[T]):

def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes],
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T],
def __init__(self,
name: str,
process: subprocess.Popen,
socket: Optional[socket.socket],
stderr: Optional[IO[bytes]],
processor: AbstractProcessor[T],
callback_object: TransportCallbacks[T]) -> None:
self._closed = False
self._process = process
self._socket = socket
self._reader = reader
self._writer = writer
self._stderr = stderr
self._processor = processor
self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name))
Expand Down Expand Up @@ -136,8 +171,8 @@ def __del__(self) -> None:

def _read_loop(self) -> None:
try:
while self._reader:
payload = self._processor.read_data(self._reader)
while True:
Copy link
Contributor Author

@alecmev alecmev Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to Google this and also replicate it in REPL with io.BytesIO, but it seems that streams are always truthy? Or am I taking down a Chesterton's fence here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@predragnikolic Do you know anything about this? Do self._reader/self._writer ever become falsey?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know anything about this?

No, I don't know, I just reverted the old code.
I will let someone with more experience answer this question. :)

I can just guess and say that
maybe the ProcessTransport close method should be modified:

    def close(self) -> None:
        if not self._closed:
            self._send_queue.put_nowait(None)
            if self._socket:
                self._socket.close()
            self._closed = True
+           self._reader.close()
+           self._writer.close()
+.          self._reader = None
+.          self._writer = None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know about stdio and TCP, but the multiprocessing pipe is garbage-collectable: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.close

payload = self._processor.read_data()
if payload is None:
continue

Expand Down Expand Up @@ -186,12 +221,11 @@ def invoke() -> None:
def _write_loop(self) -> None:
exception = None # type: Optional[Exception]
try:
while self._writer:
while True:
d = self._send_queue.get()
if d is None:
break
self._processor.write_data(self._writer, d)
self._writer.flush()
self._processor.write_data(d)
except (BrokenPipeError, AttributeError):
pass
except Exception as ex:
Expand Down Expand Up @@ -219,28 +253,32 @@ def _stderr_loop(self) -> None:
self._send_queue.put_nowait(None)


# Can be a singleton since it doesn't hold any state.
json_rpc_processor = JsonRpcProcessor()


def create_transport(config: TransportConfig, cwd: Optional[str],
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]:
stderr = subprocess.PIPE
pass_fds = () # type: Union[Tuple[()], Tuple[int]]
if config.tcp_port is not None:
assert config.tcp_port is not None
if config.tcp_port < 0:
stdout = subprocess.PIPE
else:
stdout = subprocess.DEVNULL
stdin = subprocess.DEVNULL
else:
elif not config.node_ipc:
stdout = subprocess.PIPE
stdin = subprocess.PIPE
else:
stdout = subprocess.PIPE
stdin = subprocess.DEVNULL
stderr = subprocess.STDOUT
pass_fds = (config.node_ipc.child_conn.fileno(),)

startupinfo = _fixup_startup_args(config.command)
sock = None # type: Optional[socket.socket]
process = None # type: Optional[subprocess.Popen]

def start_subprocess() -> subprocess.Popen:
return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd)
return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds)

if config.listener_socket:
assert isinstance(config.tcp_port, int) and config.tcp_port > 0
Expand All @@ -250,21 +288,27 @@ def start_subprocess() -> subprocess.Popen:
config.listener_socket,
start_subprocess
)
processor = StandardProcessor(reader, writer) # type: AbstractProcessor
else:
process = start_subprocess()
if config.tcp_port:
sock = _connect_tcp(config.tcp_port)
if sock is None:
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port))
reader = sock.makefile('rwb') # type: ignore
writer = reader
reader = writer = sock.makefile('rwb')
processor = StandardProcessor(reader, writer)
elif not config.node_ipc:
if not process.stdout or not process.stdin:
predragnikolic marked this conversation as resolved.
Show resolved Hide resolved
raise RuntimeError(
'Failed initializing transport: reader: {}, writer: {}'
.format(process.stdout, process.stdin)
)
processor = StandardProcessor(process.stdout, process.stdin)
else:
reader = process.stdout # type: ignore
writer = process.stdin # type: ignore
if not reader or not writer:
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer))
return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor,
callback_object)
processor = NodeIpcProcessor(config.node_ipc.parent_conn)

stderr_reader = process.stdout if config.node_ipc else process.stderr
return ProcessTransport(config.name, process, sock, stderr_reader, processor, callback_object)


_subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen]
Expand Down Expand Up @@ -312,7 +356,8 @@ def _start_subprocess(
stderr: int,
startupinfo: Any,
env: Dict[str, str],
cwd: Optional[str]
cwd: Optional[str],
pass_fds: Union[Tuple[()], Tuple[int]]
) -> subprocess.Popen:
debug("starting {} in {}".format(args, cwd if cwd else os.getcwd()))
process = subprocess.Popen(
Expand All @@ -322,7 +367,8 @@ def _start_subprocess(
stderr=stderr,
startupinfo=startupinfo,
env=env,
cwd=cwd)
cwd=cwd,
pass_fds=pass_fds)
_subprocesses.add(process)
return process

Expand Down Expand Up @@ -356,8 +402,7 @@ def start_in_background(d: _SubprocessData) -> None:
# Await one client connection (blocking!)
sock, _ = listener_socket.accept()
thread.join()
reader = sock.makefile('rwb') # type: IO[bytes]
writer = reader
reader = writer = sock.makefile('rwb')
assert data.process
return data.process, sock, reader, writer

Expand Down
24 changes: 20 additions & 4 deletions plugin/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from wcmatch.glob import BRACE
from wcmatch.glob import globmatch
from wcmatch.glob import GLOBSTAR
import collections
alecmev marked this conversation as resolved.
Show resolved Hide resolved
import contextlib
import fnmatch
import multiprocessing
import os
import posixpath
import socket
Expand Down Expand Up @@ -605,16 +607,20 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]:
return _translate_path(uri, self._remote, self._local)


NodeIpcPipe = collections.namedtuple('NodeIpcPipe', 'parent_conn,child_conn')


class TransportConfig:
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket")
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc")

def __init__(
self,
name: str,
command: List[str],
tcp_port: Optional[int],
env: Dict[str, str],
listener_socket: Optional[socket.socket]
listener_socket: Optional[socket.socket],
node_ipc: Optional[NodeIpcPipe]
) -> None:
if not command and not tcp_port:
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server')
Expand All @@ -623,6 +629,7 @@ def __init__(
self.tcp_port = tcp_port
self.env = env
self.listener_socket = listener_socket
self.node_ipc = node_ipc


class ClientConfig:
Expand All @@ -632,6 +639,7 @@ def __init__(self,
priority_selector: Optional[str] = None,
schemes: Optional[List[str]] = None,
command: Optional[List[str]] = None,
use_node_ipc: bool = False,
binary_args: Optional[List[str]] = None, # DEPRECATED
tcp_port: Optional[int] = None,
auto_complete_selector: Optional[str] = None,
Expand All @@ -656,6 +664,7 @@ def __init__(self,
else:
assert isinstance(binary_args, list)
self.command = binary_args
self.use_node_ipc = use_node_ipc
self.tcp_port = tcp_port
self.auto_complete_selector = auto_complete_selector
self.enabled = enabled
Expand Down Expand Up @@ -689,9 +698,10 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> "Cl
priority_selector=_read_priority_selector(s),
schemes=s.get("schemes"),
command=read_list_setting(s, "command", []),
use_node_ipc=bool(s.get("use_node_ipc", False)),
tcp_port=s.get("tcp_port"),
auto_complete_selector=s.get("auto_complete_selector"),
# Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"iff" is short for "if and only if" :) but keep the change, no one really cares.

# Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package.
enabled=bool(s.get("enabled", True)),
init_options=init_options,
settings=settings,
Expand Down Expand Up @@ -719,6 +729,7 @@ def from_dict(cls, name: str, d: Dict[str, Any]) -> "ClientConfig":
priority_selector=_read_priority_selector(d),
schemes=schemes,
command=d.get("command", []),
use_node_ipc=d.get("use_node_ipc", False),
tcp_port=d.get("tcp_port"),
auto_complete_selector=d.get("auto_complete_selector"),
enabled=d.get("enabled", False),
Expand Down Expand Up @@ -746,6 +757,7 @@ def from_config(cls, src_config: "ClientConfig", override: Dict[str, Any]) -> "C
priority_selector=_read_priority_selector(override) or src_config.priority_selector,
schemes=override.get("schemes", src_config.schemes),
command=override.get("command", src_config.command),
use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc),
tcp_port=override.get("tcp_port", src_config.tcp_port),
auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector),
enabled=override.get("enabled", src_config.enabled),
Expand Down Expand Up @@ -790,7 +802,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig
env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key]
else:
env[key] = sublime.expand_variables(value, variables)
return TransportConfig(self.name, command, tcp_port, env, listener_socket)
node_ipc = None
if self.use_node_ipc:
node_ipc = NodeIpcPipe(*multiprocessing.Pipe())
env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno())
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc)

def set_view_status(self, view: sublime.View, message: str) -> None:
if sublime.load_settings("LSP.sublime-settings").get("show_view_status"):
Expand Down
11 changes: 11 additions & 0 deletions sublime-package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
},
"markdownDescription": "The command to start the language server."
},
"ClientUseNodeIpc": {
"type": "boolean",
"default": false,
"markdownDescription": "Communicate with the language server over Node.js IPC. This lets the server print to stdout without disrupting the LSP communication. It's non-standard, but is used by VSCode. The command must be adjusted accordingly, e.g. `--stdio` must be replaced with `--node-ipc` in case of vscode-eslint. `tcp_port` is ignored if this is enabled."
alecmev marked this conversation as resolved.
Show resolved Hide resolved
},
"ClientEnabled": {
"type": "boolean",
"default": false,
Expand Down Expand Up @@ -156,6 +161,9 @@
"command": {
"$ref": "sublime://settings/LSP#/definitions/ClientCommand"
},
"use_node_ipc": {
"$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc"
},
"enabled": {
"$ref": "sublime://settings/LSP#/definitions/ClientEnabled"
},
Expand Down Expand Up @@ -555,6 +563,9 @@
"command": {
"$ref": "sublime://settings/LSP#/definitions/ClientCommand"
},
"use_node_ipc": {
"$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc"
},
"enabled": {
"$ref": "sublime://settings/LSP#/definitions/ClientEnabled"
},
Expand Down
Loading