-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add --node-ipc support #2015
base: main
Are you sure you want to change the base?
Add --node-ipc support #2015
Changes from 3 commits
f14e088
6ab3fca
ca34372
bdcd755
47fde0d
93157e7
e6ae057
fd31756
45861a0
3dc8daa
7ad90c4
06365a3
d2a7249
9082544
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,9 @@ | |
from contextlib import closing | ||
from functools import partial | ||
from queue import Queue | ||
import http | ||
import http.client | ||
import json | ||
import multiprocessing.connection | ||
import os | ||
import shutil | ||
import socket | ||
|
@@ -48,57 +49,91 @@ def on_stderr_message(self, message: str) -> None: | |
|
||
class AbstractProcessor(Generic[T]): | ||
|
||
def write_data(self, writer: IO[bytes], data: T) -> None: | ||
def write_data(self, data: T) -> None: | ||
raise NotImplementedError() | ||
|
||
def read_data(self, reader: IO[bytes]) -> Optional[T]: | ||
def read_data(self) -> Optional[T]: | ||
raise NotImplementedError() | ||
|
||
|
||
class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): | ||
def encode_payload(data: Dict[str, Any]) -> bytes: | ||
return json.dumps( | ||
data, | ||
ensure_ascii=False, | ||
check_circular=False, | ||
separators=(',', ':') | ||
).encode('utf-8') | ||
|
||
def write_data(self, writer: IO[bytes], data: Dict[str, Any]) -> None: | ||
body = self._encode(data) | ||
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) | ||
|
||
def read_data(self, reader: IO[bytes]) -> Optional[Dict[str, Any]]: | ||
headers = http.client.parse_headers(reader) # type: ignore | ||
def decode_payload(message: bytes) -> Optional[Dict[str, Any]]: | ||
try: | ||
return json.loads(message.decode('utf-8')) | ||
except Exception as ex: | ||
exception_log("JSON decode error", ex) | ||
return None | ||
alecmev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class StandardProcessor(AbstractProcessor[Dict[str, Any]]): | ||
|
||
def __init__(self, reader: IO[bytes], writer: IO[bytes]): | ||
self._reader = reader | ||
self._writer = writer | ||
|
||
def write_data(self, data: Dict[str, Any]) -> None: | ||
body = encode_payload(data) | ||
self._writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) | ||
self._writer.flush() | ||
|
||
def read_data(self) -> Optional[Dict[str, Any]]: | ||
headers = http.client.parse_headers(self._reader) # type: ignore | ||
try: | ||
body = reader.read(int(headers.get("Content-Length"))) | ||
body = self._reader.read(int(headers.get("Content-Length"))) | ||
except TypeError: | ||
# Expected error on process stopping. Stop the read loop. | ||
raise StopLoopError() | ||
try: | ||
return self._decode(body) | ||
except Exception as ex: | ||
exception_log("JSON decode error", ex) | ||
return None | ||
|
||
@staticmethod | ||
def _encode(data: Dict[str, Any]) -> bytes: | ||
return json.dumps( | ||
data, | ||
ensure_ascii=False, | ||
sort_keys=False, | ||
check_circular=False, | ||
separators=(',', ':') | ||
).encode('utf-8') | ||
|
||
@staticmethod | ||
def _decode(message: bytes) -> Dict[str, Any]: | ||
return json.loads(message.decode('utf-8')) | ||
return decode_payload(body) | ||
|
||
|
||
class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): | ||
_buf = bytearray() | ||
_lines = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't these two properties be part of the class instance, instead of the class? If I have LSP-dockerfile, LSP-stylelint, and LSP-typescript running with node IPC, wouldn't these overwrite each other in unpredictable ways? Maybe I'm not understanding something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it should work as expected (in most cases ;)) due to how Python works with those. When instance of the class reads those properties it looks them up first in the instance and then in the class. When writing it always writes to the instance property (creates it if necessary). The only issue would be if the instance would read a class property (for example So I guess it's still safer to set those on the instance. |
||
|
||
def __init__(self, conn: multiprocessing.connection._ConnectionBase): | ||
self._conn = conn | ||
|
||
def write_data(self, data: Dict[str, Any]) -> None: | ||
body = encode_payload(data) + b"\n" | ||
while len(body): | ||
n = self._conn._write(self._conn.fileno(), body) # type: ignore | ||
body = body[n:] | ||
|
||
def read_data(self) -> Optional[Dict[str, Any]]: | ||
while self._lines == 0: | ||
chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore | ||
if len(chunk) == 0: | ||
# EOF reached: https://docs.python.org/3/library/os.html#os.read | ||
raise StopLoopError() | ||
|
||
self._buf += chunk | ||
self._lines += chunk.count(b'\n') | ||
|
||
self._lines -= 1 | ||
message, _, self._buf = self._buf.partition(b'\n') | ||
return decode_payload(message) | ||
|
||
|
||
class ProcessTransport(Transport[T]): | ||
|
||
def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], | ||
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], | ||
def __init__(self, | ||
name: str, | ||
process: subprocess.Popen, | ||
socket: Optional[socket.socket], | ||
stderr: Optional[IO[bytes]], | ||
processor: AbstractProcessor[T], | ||
callback_object: TransportCallbacks[T]) -> None: | ||
self._closed = False | ||
self._process = process | ||
self._socket = socket | ||
self._reader = reader | ||
self._writer = writer | ||
self._stderr = stderr | ||
self._processor = processor | ||
self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name)) | ||
|
@@ -136,8 +171,8 @@ def __del__(self) -> None: | |
|
||
def _read_loop(self) -> None: | ||
try: | ||
while self._reader: | ||
payload = self._processor.read_data(self._reader) | ||
while True: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to Google this and also replicate it in REPL with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @predragnikolic Do you know anything about this? Do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, I don't know, I just reverted the old code. I can just guess and say that def close(self) -> None:
if not self._closed:
self._send_queue.put_nowait(None)
if self._socket:
self._socket.close()
self._closed = True
+ self._reader.close()
+ self._writer.close()
+. self._reader = None
+. self._writer = None There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't know about stdio and TCP, but the multiprocessing pipe is garbage-collectable: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.Connection.close |
||
payload = self._processor.read_data() | ||
if payload is None: | ||
continue | ||
|
||
|
@@ -186,12 +221,11 @@ def invoke() -> None: | |
def _write_loop(self) -> None: | ||
exception = None # type: Optional[Exception] | ||
try: | ||
while self._writer: | ||
while True: | ||
d = self._send_queue.get() | ||
if d is None: | ||
break | ||
self._processor.write_data(self._writer, d) | ||
self._writer.flush() | ||
self._processor.write_data(d) | ||
except (BrokenPipeError, AttributeError): | ||
pass | ||
except Exception as ex: | ||
|
@@ -219,28 +253,32 @@ def _stderr_loop(self) -> None: | |
self._send_queue.put_nowait(None) | ||
|
||
|
||
# Can be a singleton since it doesn't hold any state. | ||
json_rpc_processor = JsonRpcProcessor() | ||
|
||
|
||
def create_transport(config: TransportConfig, cwd: Optional[str], | ||
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: | ||
stderr = subprocess.PIPE | ||
pass_fds = () # type: Union[Tuple[()], Tuple[int]] | ||
if config.tcp_port is not None: | ||
assert config.tcp_port is not None | ||
if config.tcp_port < 0: | ||
stdout = subprocess.PIPE | ||
else: | ||
stdout = subprocess.DEVNULL | ||
stdin = subprocess.DEVNULL | ||
else: | ||
elif not config.node_ipc: | ||
stdout = subprocess.PIPE | ||
stdin = subprocess.PIPE | ||
else: | ||
stdout = subprocess.PIPE | ||
stdin = subprocess.DEVNULL | ||
stderr = subprocess.STDOUT | ||
pass_fds = (config.node_ipc.child_conn.fileno(),) | ||
|
||
startupinfo = _fixup_startup_args(config.command) | ||
sock = None # type: Optional[socket.socket] | ||
process = None # type: Optional[subprocess.Popen] | ||
|
||
def start_subprocess() -> subprocess.Popen: | ||
return _start_subprocess(config.command, stdin, stdout, subprocess.PIPE, startupinfo, config.env, cwd) | ||
return _start_subprocess(config.command, stdin, stdout, stderr, startupinfo, config.env, cwd, pass_fds) | ||
|
||
if config.listener_socket: | ||
assert isinstance(config.tcp_port, int) and config.tcp_port > 0 | ||
|
@@ -250,21 +288,27 @@ def start_subprocess() -> subprocess.Popen: | |
config.listener_socket, | ||
start_subprocess | ||
) | ||
processor = StandardProcessor(reader, writer) # type: AbstractProcessor | ||
else: | ||
process = start_subprocess() | ||
if config.tcp_port: | ||
sock = _connect_tcp(config.tcp_port) | ||
if sock is None: | ||
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) | ||
reader = sock.makefile('rwb') # type: ignore | ||
writer = reader | ||
reader = writer = sock.makefile('rwb') | ||
processor = StandardProcessor(reader, writer) | ||
elif not config.node_ipc: | ||
if not process.stdout or not process.stdin: | ||
predragnikolic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise RuntimeError( | ||
'Failed initializing transport: reader: {}, writer: {}' | ||
.format(process.stdout, process.stdin) | ||
) | ||
processor = StandardProcessor(process.stdout, process.stdin) | ||
else: | ||
reader = process.stdout # type: ignore | ||
writer = process.stdin # type: ignore | ||
if not reader or not writer: | ||
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) | ||
return ProcessTransport(config.name, process, sock, reader, writer, process.stderr, json_rpc_processor, | ||
callback_object) | ||
processor = NodeIpcProcessor(config.node_ipc.parent_conn) | ||
|
||
stderr_reader = process.stdout if config.node_ipc else process.stderr | ||
return ProcessTransport(config.name, process, sock, stderr_reader, processor, callback_object) | ||
|
||
|
||
_subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen] | ||
|
@@ -312,7 +356,8 @@ def _start_subprocess( | |
stderr: int, | ||
startupinfo: Any, | ||
env: Dict[str, str], | ||
cwd: Optional[str] | ||
cwd: Optional[str], | ||
pass_fds: Union[Tuple[()], Tuple[int]] | ||
) -> subprocess.Popen: | ||
debug("starting {} in {}".format(args, cwd if cwd else os.getcwd())) | ||
process = subprocess.Popen( | ||
|
@@ -322,7 +367,8 @@ def _start_subprocess( | |
stderr=stderr, | ||
startupinfo=startupinfo, | ||
env=env, | ||
cwd=cwd) | ||
cwd=cwd, | ||
pass_fds=pass_fds) | ||
_subprocesses.add(process) | ||
return process | ||
|
||
|
@@ -356,8 +402,7 @@ def start_in_background(d: _SubprocessData) -> None: | |
# Await one client connection (blocking!) | ||
sock, _ = listener_socket.accept() | ||
thread.join() | ||
reader = sock.makefile('rwb') # type: IO[bytes] | ||
writer = reader | ||
reader = writer = sock.makefile('rwb') | ||
assert data.process | ||
return data.process, sock, reader, writer | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,8 +10,10 @@ | |
from wcmatch.glob import BRACE | ||
from wcmatch.glob import globmatch | ||
from wcmatch.glob import GLOBSTAR | ||
import collections | ||
alecmev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import contextlib | ||
import fnmatch | ||
import multiprocessing | ||
import os | ||
import posixpath | ||
import socket | ||
|
@@ -605,16 +607,20 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]: | |
return _translate_path(uri, self._remote, self._local) | ||
|
||
|
||
NodeIpcPipe = collections.namedtuple('NodeIpcPipe', 'parent_conn,child_conn') | ||
|
||
|
||
class TransportConfig: | ||
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket") | ||
__slots__ = ("name", "command", "tcp_port", "env", "listener_socket", "node_ipc") | ||
|
||
def __init__( | ||
self, | ||
name: str, | ||
command: List[str], | ||
tcp_port: Optional[int], | ||
env: Dict[str, str], | ||
listener_socket: Optional[socket.socket] | ||
listener_socket: Optional[socket.socket], | ||
node_ipc: Optional[NodeIpcPipe] | ||
) -> None: | ||
if not command and not tcp_port: | ||
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server') | ||
|
@@ -623,6 +629,7 @@ def __init__( | |
self.tcp_port = tcp_port | ||
self.env = env | ||
self.listener_socket = listener_socket | ||
self.node_ipc = node_ipc | ||
|
||
|
||
class ClientConfig: | ||
|
@@ -632,6 +639,7 @@ def __init__(self, | |
priority_selector: Optional[str] = None, | ||
schemes: Optional[List[str]] = None, | ||
command: Optional[List[str]] = None, | ||
use_node_ipc: bool = False, | ||
binary_args: Optional[List[str]] = None, # DEPRECATED | ||
tcp_port: Optional[int] = None, | ||
auto_complete_selector: Optional[str] = None, | ||
|
@@ -656,6 +664,7 @@ def __init__(self, | |
else: | ||
assert isinstance(binary_args, list) | ||
self.command = binary_args | ||
self.use_node_ipc = use_node_ipc | ||
self.tcp_port = tcp_port | ||
self.auto_complete_selector = auto_complete_selector | ||
self.enabled = enabled | ||
|
@@ -689,9 +698,10 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> "Cl | |
priority_selector=_read_priority_selector(s), | ||
schemes=s.get("schemes"), | ||
command=read_list_setting(s, "command", []), | ||
use_node_ipc=bool(s.get("use_node_ipc", False)), | ||
tcp_port=s.get("tcp_port"), | ||
auto_complete_selector=s.get("auto_complete_selector"), | ||
# Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "iff" is short for "if and only if" :) but keep the change, no one really cares. |
||
# Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package. | ||
enabled=bool(s.get("enabled", True)), | ||
init_options=init_options, | ||
settings=settings, | ||
|
@@ -719,6 +729,7 @@ def from_dict(cls, name: str, d: Dict[str, Any]) -> "ClientConfig": | |
priority_selector=_read_priority_selector(d), | ||
schemes=schemes, | ||
command=d.get("command", []), | ||
use_node_ipc=d.get("use_node_ipc", False), | ||
tcp_port=d.get("tcp_port"), | ||
auto_complete_selector=d.get("auto_complete_selector"), | ||
enabled=d.get("enabled", False), | ||
|
@@ -746,6 +757,7 @@ def from_config(cls, src_config: "ClientConfig", override: Dict[str, Any]) -> "C | |
priority_selector=_read_priority_selector(override) or src_config.priority_selector, | ||
schemes=override.get("schemes", src_config.schemes), | ||
command=override.get("command", src_config.command), | ||
use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc), | ||
tcp_port=override.get("tcp_port", src_config.tcp_port), | ||
auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector), | ||
enabled=override.get("enabled", src_config.enabled), | ||
|
@@ -790,7 +802,11 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig | |
env[key] = sublime.expand_variables(value, variables) + os.path.pathsep + env[key] | ||
else: | ||
env[key] = sublime.expand_variables(value, variables) | ||
return TransportConfig(self.name, command, tcp_port, env, listener_socket) | ||
node_ipc = None | ||
if self.use_node_ipc: | ||
node_ipc = NodeIpcPipe(*multiprocessing.Pipe()) | ||
env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno()) | ||
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc) | ||
|
||
def set_view_status(self, view: sublime.View, message: str) -> None: | ||
if sublime.load_settings("LSP.sublime-settings").get("show_view_status"): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fits one line now :)