Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --node-ipc support #2015

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
120 changes: 82 additions & 38 deletions plugin/core/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from contextlib import closing
from functools import partial
from queue import Queue
import http
import http.client
import json
import multiprocessing.connection
import os
import shutil
import socket
Expand Down Expand Up @@ -48,18 +49,35 @@ def on_stderr_message(self, message: str) -> None:

class AbstractProcessor(Generic[T]):

def write_data(self, writer: IO[bytes], data: T) -> None:
def write_data(self, writer: Any, data: T) -> None:
raise NotImplementedError()

def read_data(self, reader: IO[bytes]) -> Optional[T]:
def read_data(self, reader: Any) -> Optional[T]:
raise NotImplementedError()


class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]):
def encode_payload(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
check_circular=False,
separators=(',', ':')
).encode('utf-8')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fits one line now :)



def decode_payload(message: bytes) -> Optional[Dict[str, Any]]:
try:
return json.loads(message.decode('utf-8'))
except Exception as ex:
exception_log("JSON decode error", ex)
return None


class StandardProcessor(AbstractProcessor[Dict[str, Any]]):
def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None:
body = self._encode(data)
body = encode_payload(data)
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
writer.flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially breaking for chokidar watcher. We need to add a flush there. I guess duplicate flush() might be ok to avoid adding too much logic there.


def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]:
headers = http.client.parse_headers(reader) # type: ignore
Expand All @@ -68,31 +86,38 @@ def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]:
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
try:
return self._decode(body)
except Exception as ex:
exception_log("JSON decode error", ex)
return None

@staticmethod
def _encode(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
sort_keys=False,
check_circular=False,
separators=(',', ':')
).encode('utf-8')

@staticmethod
def _decode(message: bytes) -> Dict[str, Any]:
return json.loads(message.decode('utf-8'))
return decode_payload(body)


class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]):
_buf = bytearray()
_lines = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these two properties be part of the class instance, instead of the class? If I have LSP-dockerfile, LSP-stylelint, and LSP-typescript running with node IPC, wouldn't these overwrite each other in unpredictable ways? Maybe I'm not understanding something.

Copy link
Member

@rchl rchl Aug 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it should work as expected (in most cases ;)) due to how Python works with those.

When instance of the class reads those properties it looks them up first in the instance and then in the class. When writing it always writes to the instance property (creates it if necessary).

The only issue would be if the instance would read a class property (for example self._buf) and then mutate that property (self._buf.append(...) or whatever methods that supports). Then it would mutate the class property.

So I guess it's still safer to set those on the instance.


def write_data(self, connection: multiprocessing.connection._ConnectionBase, data: Dict[str, Any]) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really no way to use public type here? Underscore denotes a private property.

Copy link
Member

@predragnikolic predragnikolic Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_ConnectionBase was used because multiprocessing.Pipe() uses that.

class NodeIpcPipe():
    def __init__(self) -> None:
        parent_connection, child_connection = multiprocessing.Pipe()
        self.parent_connection = parent_connection
        # type: _ConnectionBase
        self.child_connection = child_connection
        # type _ConnectionBase

I looked a bit at the multiprocessing.Pipe()

    def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = Connection(s1.detach())
            c2 = Connection(s2.detach())
        else:
            fd1, fd2 = os.pipe()
            c1 = Connection(fd1, writable=False)
            c2 = Connection(fd2, readable=False)

        return c1, c2

and found that maybe we can use the multiprocessing.connection.Connection as the type.

as Connection extends _ConnectionBase, and it doesn't add any public methods

class Connection(_ConnectionBase):
    """
    Connection class based on an arbitrary file descriptor (Unix only), or
    a socket handle (Windows).
    """

    if _winapi:
        def _close(self, _close=_multiprocessing.closesocket):
            ...
    else:
        def _close(self, _close=os.close):
            ...

    def _send(self, buf, write=_write):
        ...
    def _recv(self, size, read=_read):
        ...
    def _send_bytes(self, buf):
        ...
    def _recv_bytes(self, maxsize=None):
        ...
    def _poll(self, timeout):
        ...

body = encode_payload(data) + b"\n"
while len(body):
n = connection._write(connection.fileno(), body) # type: ignore
body = body[n:]

def read_data(self, connection: multiprocessing.connection._ConnectionBase) -> Optional[Dict[str, Any]]:
while self._lines == 0:
chunk = connection._read(connection.fileno(), 65536) # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

65536 looks like a rather large buffer (65KB), have you tried 4KB? What are the performance implications compared to the standard transport?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also private API...

I'm not saying it's unacceptable but is there really no higher level API exposed for that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also bothers me. Perhaps switching to some public mechanism also abstracts away the burden of choosing the buffer size? (Currently set to 65KB)

if len(chunk) == 0:
# EOF reached: https://docs.python.org/3/library/os.html#os.read
raise StopLoopError()

self._buf += chunk
self._lines += chunk.count(b'\n')

self._lines -= 1
message, _, self._buf = self._buf.partition(b'\n')
return decode_payload(message)


class ProcessTransport(Transport[T]):

def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes],
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T],
def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: Any,
writer: Any, stderr: Optional[IO[bytes]], processor: AbstractProcessor[T],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good that instead of any I used a better type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be solved with generics, I think, but it would again require downstream changes, since it would change AbstractProcessor's signature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some local changes to clean up the types but I'm yet to find a solution that is fully properly typed. It's complicated.

In any case, that would be something for another PR.

callback_object: TransportCallbacks[T]) -> None:
self._closed = False
self._process = process
Expand Down Expand Up @@ -191,7 +216,6 @@ def _write_loop(self) -> None:
if d is None:
break
self._processor.write_data(self._writer, d)
self._writer.flush()
except (BrokenPipeError, AttributeError):
pass
except Exception as ex:
Expand Down Expand Up @@ -220,27 +244,36 @@ def _stderr_loop(self) -> None:


# Can be a singleton since it doesn't hold any state.
json_rpc_processor = JsonRpcProcessor()
standard_processor = StandardProcessor()
node_ipc_processor = NodeIpcProcessor()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I'm unsure whether this can actually be a singleton.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. The standard processor wasn't meant to have any instance properties so NodeIpc shouldn't either.



def create_transport(config: TransportConfig, cwd: Optional[str],
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]:
stderr = subprocess.PIPE
pass_fds = () # type: Union[Tuple[()], Tuple[int]]
if config.tcp_port is not None:
assert config.tcp_port is not None
if config.tcp_port < 0:
stdout = subprocess.PIPE
else:
stdout = subprocess.DEVNULL
stdin = subprocess.DEVNULL
elif config.node_ipc:
stdout = subprocess.PIPE
stdin = subprocess.DEVNULL
stderr = subprocess.STDOUT
pass_fds = (config.node_ipc.child_connection.fileno(),)
else:
stdout = subprocess.PIPE
stdin = subprocess.PIPE

startupinfo = _fixup_startup_args(config.command)
sock = None # type: Optional[socket.socket]
process = None # type: Optional[subprocess.Popen]

def start_subprocess() -> subprocess.Popen:
return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd)
return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds)

if config.listener_socket:
assert isinstance(config.tcp_port, int) and config.tcp_port > 0
Expand All @@ -256,14 +289,24 @@ def start_subprocess() -> subprocess.Popen:
sock = _connect_tcp(config.tcp_port)
if sock is None:
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port))
reader = sock.makefile('rwb') # type: ignore
writer = reader
reader = writer = sock.makefile('rwb')
elif config.node_ipc:
reader = writer = config.node_ipc.parent_connection # type: ignore
else:
reader = process.stdout # type: ignore
writer = process.stdin # type: ignore
if not process.stdout or not process.stdin:
predragnikolic marked this conversation as resolved.
Show resolved Hide resolved
raise RuntimeError(
'Failed initializing transport: reader: {}, writer: {}'
.format(process.stdout, process.stdin)
)
reader = process.stdout
writer = process.stdin
stderr_reader = process.stdout if config.node_ipc else process.stderr
predragnikolic marked this conversation as resolved.
Show resolved Hide resolved
processor = node_ipc_processor if config.node_ipc else standard_processor

if not reader or not writer:
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer))
return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor,

return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, processor,
callback_object)


Expand Down Expand Up @@ -312,7 +355,8 @@ def _start_subprocess(
stderr: int,
startupinfo: Any,
env: Dict[str, str],
cwd: Optional[str]
cwd: Optional[str],
pass_fds: Union[Tuple[()], Tuple[int]]
) -> subprocess.Popen:
debug("starting {} in {}".format(args, cwd if cwd else os.getcwd()))
process = subprocess.Popen(
Expand All @@ -322,7 +366,8 @@ def _start_subprocess(
stderr=stderr,
startupinfo=startupinfo,
env=env,
cwd=cwd)
cwd=cwd,
pass_fds=pass_fds)
_subprocesses.add(process)
return process

Expand Down Expand Up @@ -356,8 +401,7 @@ def start_in_background(d: _SubprocessData) -> None:
# Await one client connection (blocking!)
sock, _ = listener_socket.accept()
thread.join()
reader = sock.makefile('rwb') # type: IO[bytes]
writer = reader
reader = writer = sock.makefile('rwb')
assert data.process
return data.process, sock, reader, writer

Expand Down
27 changes: 23 additions & 4 deletions plugin/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from wcmatch.glob import GLOBSTAR
import contextlib
import fnmatch
import multiprocessing
import os
import posixpath
import socket
Expand Down Expand Up @@ -605,16 +606,24 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]:
return _translate_path(uri, self._remote, self._local)


class NodeIpcPipe():
def __init__(self) -> None:
parent_connection, child_connection = multiprocessing.Pipe()
self.parent_connection = parent_connection
self.child_connection = child_connection


class TransportConfig:
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket")
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc")

def __init__(
self,
name: str,
command: List[str],
tcp_port: Optional[int],
env: Dict[str, str],
listener_socket: Optional[socket.socket]
listener_socket: Optional[socket.socket],
node_ipc: Optional[NodeIpcPipe]
) -> None:
if not command and not tcp_port:
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server')
Expand All @@ -623,6 +632,7 @@ def __init__(
self.tcp_port = tcp_port
self.env = env
self.listener_socket = listener_socket
self.node_ipc = node_ipc


class ClientConfig:
Expand All @@ -632,6 +642,7 @@ def __init__(self,
priority_selector: Optional[str] = None,
schemes: Optional[List[str]] = None,
command: Optional[List[str]] = None,
use_node_ipc: bool = False,
binary_args: Optional[List[str]] = None, # DEPRECATED
tcp_port: Optional[int] = None,
auto_complete_selector: Optional[str] = None,
Expand All @@ -656,6 +667,7 @@ def __init__(self,
else:
assert isinstance(binary_args, list)
self.command = binary_args
self.use_node_ipc = use_node_ipc
self.tcp_port = tcp_port
self.auto_complete_selector = auto_complete_selector
self.enabled = enabled
Expand Down Expand Up @@ -689,9 +701,10 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> "Cl
priority_selector=_read_priority_selector(s),
schemes=s.get("schemes"),
command=read_list_setting(s, "command", []),
use_node_ipc=bool(s.get("use_node_ipc", False)),
tcp_port=s.get("tcp_port"),
auto_complete_selector=s.get("auto_complete_selector"),
# Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"iff" is short for "if and only if" :) but keep the change, no one really cares.

# Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package.
enabled=bool(s.get("enabled", True)),
init_options=init_options,
settings=settings,
Expand Down Expand Up @@ -719,6 +732,7 @@ def from_dict(cls, name: str, d: Dict[str, Any]) -> "ClientConfig":
priority_selector=_read_priority_selector(d),
schemes=schemes,
command=d.get("command", []),
use_node_ipc=d.get("use_node_ipc", False),
tcp_port=d.get("tcp_port"),
auto_complete_selector=d.get("auto_complete_selector"),
enabled=d.get("enabled", False),
Expand Down Expand Up @@ -746,6 +760,7 @@ def from_config(cls, src_config: "ClientConfig", override: Dict[str, Any]) -> "C
priority_selector=_read_priority_selector(override) or src_config.priority_selector,
schemes=override.get("schemes", src_config.schemes),
command=override.get("command", src_config.command),
use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc),
tcp_port=override.get("tcp_port", src_config.tcp_port),
auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector),
enabled=override.get("enabled", src_config.enabled),
Expand Down Expand Up @@ -790,7 +805,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig
env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key]
else:
env[key] = sublime.expand_variables(value, variables)
return TransportConfig(self.name, command, tcp_port, env, listener_socket)
node_ipc = None
if self.use_node_ipc:
node_ipc = NodeIpcPipe()
env["NODE_CHANNEL_FD"] = str(node_ipc.child_connection.fileno())
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc)

def set_view_status(self, view: sublime.View, message: str) -> None:
if sublime.load_settings("LSP.sublime-settings").get("show_view_status"):
Expand Down
11 changes: 11 additions & 0 deletions sublime-package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
},
"markdownDescription": "The command to start the language server."
},
"ClientUseNodeIpc": {
"type": "boolean",
"default": false,
"markdownDescription": "Communicate with the language server over Node.js IPC. The command must be adjusted accordingly, e.g. `--stdio` must be replaced with `--node-ipc` for some servers."
},
"ClientEnabled": {
"type": "boolean",
"default": false,
Expand Down Expand Up @@ -156,6 +161,9 @@
"command": {
"$ref": "sublime://settings/LSP#/definitions/ClientCommand"
},
"use_node_ipc": {
"$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc"
},
"enabled": {
"$ref": "sublime://settings/LSP#/definitions/ClientEnabled"
},
Expand Down Expand Up @@ -555,6 +563,9 @@
"command": {
"$ref": "sublime://settings/LSP#/definitions/ClientCommand"
},
"use_node_ipc": {
"$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc"
},
"enabled": {
"$ref": "sublime://settings/LSP#/definitions/ClientEnabled"
},
Expand Down
6 changes: 3 additions & 3 deletions tests/test_protocol.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from LSP.plugin.core.protocol import Point, Position, Range, RangeLsp, Request, Notification
from LSP.plugin.core.transports import JsonRpcProcessor
from LSP.plugin.core.transports import encode_payload, decode_payload
import unittest


Expand Down Expand Up @@ -129,9 +129,9 @@ def test_extend(self) -> None:

class EncodingTests(unittest.TestCase):
def test_encode(self) -> None:
encoded = JsonRpcProcessor._encode({"text": "😃"})
encoded = encode_payload({"text": "😃"})
self.assertEqual(encoded, b'{"text":"\xF0\x9F\x98\x83"}')
decoded = JsonRpcProcessor._decode(encoded)
decoded = decode_payload(encoded)
self.assertEqual(decoded, {"text": "😃"})


Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Tox (http://tox.testrun.org/) is a tool for running tests
# Tox (https://github.com/tox-dev/tox) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
Expand Down