From 9388cc4269364094f962c8be16dac064ef4be8a5 Mon Sep 17 00:00:00 2001 From: lhchavez Date: Mon, 15 Jul 2024 15:01:18 +0000 Subject: [PATCH] 3.1.2 Set the PTY fd to be non-blocking We have gone back and forth a couple of times, but we _do_ need to set this FD to be non-blocking. Otherwise node will fully consume one I/O thread for each PTY, and node only has four of those. This change makes the controller fd (the one the parent process reads from / writes to) as non-blocking for node. Also, now that the controller fd is non-blocking, we need to pass it through Node's TTY machinery so that it is properly handled. Because of that, Rust is no longer on the hook for closing the FD, since ownership is fully transferred to Node. --- index.d.ts | 22 +++------ index.js | 3 +- package-lock.json | 4 +- package.json | 2 +- src/lib.rs | 112 +++++++++++++++++++++++--------------------- tests/index.test.ts | 76 +++++++++++++++++++++++++++--- wrapper.ts | 48 +++++++++---------- 7 files changed, 163 insertions(+), 104 deletions(-) diff --git a/index.d.ts b/index.d.ts index efdcc53..dae5f58 100644 --- a/index.d.ts +++ b/index.d.ts @@ -17,6 +17,8 @@ export interface Size { cols: number rows: number } +/** Resize the terminal. */ +export function ptyResize(fd: number, size: Size): void /** * Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under * the covers. @@ -27,24 +29,14 @@ export function setCloseOnExec(fd: number, closeOnExec: boolean): void *_CLOEXEC` under the covers. */ export function getCloseOnExec(fd: number): boolean -export declare class Pty { +export class Pty { /** The pid of the forked process. */ pid: number constructor(opts: PtyOptions) - /** Resize the terminal. */ - resize(size: Size): void /** - * Returns a file descriptor for the PTY controller. - * See the docstring of the class for an usage example. + * Transfers ownership of the file descriptor for the PTY controller. This can only be called + * once (it will error the second time). The caller is responsible for closing the file + * descriptor. */ - fd(): c_int - /** - * Close the PTY file descriptor. This must be called when the readers / writers of the PTY have - * been closed, otherwise we will leak file descriptors! - * - * In an ideal world, this would be automatically called after the wait loop is done, but Node - * doesn't like that one bit, since it implies that the file is closed outside of the main - * event loop. - */ - close(): void + takeFd(): c_int } diff --git a/index.js b/index.js index 251c62d..43de6ae 100644 --- a/index.js +++ b/index.js @@ -310,8 +310,9 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { Pty, setCloseOnExec, getCloseOnExec } = nativeBinding +const { Pty, ptyResize, setCloseOnExec, getCloseOnExec } = nativeBinding module.exports.Pty = Pty +module.exports.ptyResize = ptyResize module.exports.setCloseOnExec = setCloseOnExec module.exports.getCloseOnExec = getCloseOnExec diff --git a/package-lock.json b/package-lock.json index 164b6b2..105749c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/ruspty", - "version": "3.1.1", + "version": "3.1.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@replit/ruspty", - "version": "3.1.1", + "version": "3.1.2", "license": "MIT", "devDependencies": { "@napi-rs/cli": "^2.18.2", diff --git a/package.json b/package.json index b50b062..daae6b7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty", - "version": "3.1.1", + "version": "3.1.2", "main": "dist/wrapper.js", "types": "dist/wrapper.d.ts", "author": "Szymon Kaliski ", diff --git a/src/lib.rs b/src/lib.rs index 59cf337..97b8cc7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::Error; use std::io::ErrorKind; use std::os::fd::{AsRawFd, OwnedFd}; -use std::os::fd::{BorrowedFd, FromRawFd, RawFd}; +use std::os::fd::{BorrowedFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; use std::thread; @@ -16,7 +16,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun use napi::Status::GenericFailure; use napi::{self, Env}; use nix::errno::Errno; -use nix::fcntl::{fcntl, FcntlArg, FdFlag}; +use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag}; use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; use nix::pty::{openpty, Winsize}; use nix::sys::termios::{self, SetArg}; @@ -76,7 +76,8 @@ fn poll_controller_fd_until_read(raw_fd: RawFd) { continue; } - // we should never hit this, but if we do, we should just break out of the loop + // we should almost never hit this, but if we do, we should just break out of the loop. this + // can happen if Node destroys the terminal before waiting for the child process to go away. break; } @@ -117,12 +118,14 @@ impl Pty { } // open pty pair, and set close-on-exec to avoid unwanted copies of the FDs from finding their - // way into subprocesses. + // way into subprocesses. Also set the nonblocking flag to avoid Node from consuming a full I/O + // thread for this. let pty_res = openpty(&window_size, None).map_err(cast_to_napi_error)?; let controller_fd = pty_res.master; let user_fd = pty_res.slave; set_close_on_exec(controller_fd.as_raw_fd(), true)?; set_close_on_exec(user_fd.as_raw_fd(), true)?; + set_nonblocking(controller_fd.as_raw_fd())?; // duplicate pty user_fd to be the child's stdin, stdout, and stderr cmd.stdin(Stdio::from(user_fd.try_clone()?)); @@ -254,42 +257,14 @@ impl Pty { }) } - /// Resize the terminal. + /// Transfers ownership of the file descriptor for the PTY controller. This can only be called + /// once (it will error the second time). The caller is responsible for closing the file + /// descriptor. #[napi] #[allow(dead_code)] - pub fn resize(&mut self, size: Size) -> Result<(), napi::Error> { - let window_size = Winsize { - ws_col: size.cols, - ws_row: size.rows, - ws_xpixel: 0, - ws_ypixel: 0, - }; - - if let Some(fd) = &self.controller_fd { - let res = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ, &window_size as *const _) }; - if res == -1 { - return Err(napi::Error::new( - napi::Status::GenericFailure, - format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()), - )); - } - - Ok(()) - } else { - Err(napi::Error::new( - napi::Status::GenericFailure, - "ioctl TIOCSWINSZ failed: bad file descriptor (os error 9)", - )) - } - } - - /// Returns a file descriptor for the PTY controller. - /// See the docstring of the class for an usage example. - #[napi] - #[allow(dead_code)] - pub fn fd(&mut self) -> Result { - if let Some(fd) = &self.controller_fd { - Ok(fd.as_raw_fd()) + pub fn take_fd(&mut self) -> Result { + if let Some(fd) = self.controller_fd.take() { + Ok(fd.into_raw_fd()) } else { Err(napi::Error::new( napi::Status::GenericFailure, @@ -297,25 +272,28 @@ impl Pty { )) } } +} - /// Close the PTY file descriptor. This must be called when the readers / writers of the PTY have - /// been closed, otherwise we will leak file descriptors! - /// - /// In an ideal world, this would be automatically called after the wait loop is done, but Node - /// doesn't like that one bit, since it implies that the file is closed outside of the main - /// event loop. - #[napi] - #[allow(dead_code)] - pub fn close(&mut self) -> Result<(), napi::Error> { - if let Some(fd) = self.controller_fd.take() { - unsafe { - // ok to best-effort close as node can also close this via autoClose - libc::close(fd.as_raw_fd()); - }; - } +/// Resize the terminal. +#[napi] +#[allow(dead_code)] +fn pty_resize(fd: i32, size: Size) -> Result<(), napi::Error> { + let window_size = Winsize { + ws_col: size.cols, + ws_row: size.rows, + ws_xpixel: 0, + ws_ypixel: 0, + }; - Ok(()) + let res = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &window_size as *const _) }; + if res == -1 { + return Err(napi::Error::new( + napi::Status::GenericFailure, + format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()), + )); } + + Ok(()) } /// Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under @@ -362,3 +340,29 @@ fn get_close_on_exec(fd: i32) -> Result { )), } } + +/// Set the file descriptor to be non-blocking. +#[allow(dead_code)] +fn set_nonblocking(fd: i32) -> Result<(), napi::Error> { + let old_flags = match fcntl(fd, FcntlArg::F_GETFL) { + Ok(flags) => OFlag::from_bits_truncate(flags), + Err(err) => { + return Err(napi::Error::new( + GenericFailure, + format!("fcntl F_GETFL: {}", err), + )); + } + }; + + let mut new_flags = old_flags; + new_flags.set(OFlag::O_NONBLOCK, true); + if old_flags != new_flags { + if let Err(err) = fcntl(fd, FcntlArg::F_SETFL(new_flags)) { + return Err(napi::Error::new( + GenericFailure, + format!("fcntl F_SETFL: {}", err), + )); + } + } + Ok(()) +} diff --git a/tests/index.test.ts b/tests/index.test.ts index 03721c9..bbbb869 100644 --- a/tests/index.test.ts +++ b/tests/index.test.ts @@ -1,4 +1,5 @@ import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper'; +import { type Writable } from 'stream'; import { readdirSync, readlinkSync } from 'fs'; import { describe, test, expect } from 'vitest'; @@ -19,10 +20,11 @@ function getOpenFds(): FdRecord { try { const linkTarget = readlinkSync(procSelfFd + filename); if ( - linkTarget === 'anon_inode:[timerfd]' || + linkTarget.startsWith('anon_inode:[') || linkTarget.startsWith('socket:[') || // node likes to asynchronously read stuff mid-test. - linkTarget.includes('/rustpy/') + linkTarget.includes('/ruspty/') || + linkTarget === '/dev/null' ) { continue; } @@ -92,7 +94,7 @@ describe( // We have local echo enabled, so we'll read the message twice. const result = IS_DARWIN - ? 'hello cat\r\n^D\b\bhello cat\r\n' + ? 'hello cat\r\nhello cat\r\n^D\b\b' : 'hello cat\r\nhello cat\r\n'; const pty = new Pty({ @@ -261,10 +263,7 @@ describe( let buffer = Buffer.from(''); const pty = new Pty({ command: '/bin/sh', - args: [ - '-c', - 'sleep 0.1 ; ls /proc/$$/fd', - ], + args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'], onExit: (err, exitCode) => { expect(err).toBeNull(); expect(exitCode).toBe(0); @@ -298,6 +297,69 @@ describe( { repeats: 4 }, ); + test( + 'can run concurrent shells', + () => + new Promise((done) => { + const oldFds = getOpenFds(); + const donePromises: Array> = []; + const readyPromises: Array> = []; + const writeStreams: Array = []; + + // We have local echo enabled, so we'll read the message twice. + const result = IS_DARWIN + ? 'ready\r\nhello cat\r\nhello cat\r\n^D\b\b' + : 'ready\r\nhello cat\r\nhello cat\r\n'; + + for (let i = 0; i < 10; i++) { + donePromises.push( + new Promise((accept) => { + let buffer = Buffer.from(''); + const pty = new Pty({ + command: '/bin/sh', + args: ['-c', 'echo ready ; exec cat'], + onExit: (err, exitCode) => { + expect(err).toBeNull(); + expect(exitCode).toBe(0); + expect(buffer.toString()).toStrictEqual(result); + accept(); + }, + }); + + readyPromises.push( + new Promise((ready) => { + let readyMessageReceived = false; + const readStream = pty.read; + readStream.on('data', (data) => { + buffer = Buffer.concat([buffer, data]); + if (!readyMessageReceived) { + readyMessageReceived = true; + ready(); + } + }); + }), + ); + writeStreams.push(pty.write); + }), + ); + } + Promise.allSettled(readyPromises).then(() => { + // The message should end in newline so that the EOT can signal that the input has ended and not + // just the line. + const message = 'hello cat\n'; + for (const writeStream of writeStreams) { + writeStream.write(message); + writeStream.end(EOT); + } + }); + Promise.allSettled(donePromises).then(() => { + expect(getOpenFds()).toStrictEqual(oldFds); + done(); + }); + }), + { repeats: 4 }, + ); + test("doesn't break when executing non-existing binary", () => new Promise((done) => { const oldFds = getOpenFds(); diff --git a/wrapper.ts b/wrapper.ts index fcd1918..4ec2bd1 100644 --- a/wrapper.ts +++ b/wrapper.ts @@ -1,12 +1,13 @@ -import { PassThrough, Readable, Writable } from 'stream'; +import { PassThrough, type Readable, type Writable } from 'node:stream'; +import { ReadStream } from 'node:tty'; import { Pty as RawPty, type Size, setCloseOnExec as rawSetCloseOnExec, getCloseOnExec as rawGetCloseOnExec, + ptyResize, } from './index.js'; import { type PtyOptions as RawOptions } from './index.js'; -import fs from 'fs'; export type PtyOptions = RawOptions; @@ -43,6 +44,8 @@ type ExitResult = { */ export class Pty { #pty: RawPty; + #fd: number; + #socket: ReadStream; read: Readable; write: Writable; @@ -63,14 +66,14 @@ export class Pty { // we use a mocked exit function to capture the exit result // and then call the real exit function after the fd is fully read this.#pty = new RawPty({ ...options, onExit: mockedExit }); - const fd = this.#pty.fd(); + // Transfer ownership of the FD to us. + this.#fd = this.#pty.takeFd(); - const read = fs.createReadStream('', { fd, autoClose: false }); - const write = fs.createWriteStream('', { fd, autoClose: false }); + this.#socket = new ReadStream(this.#fd); const userFacingRead = new PassThrough(); const userFacingWrite = new PassThrough(); - read.pipe(userFacingRead); - userFacingWrite.pipe(write); + this.#socket.pipe(userFacingRead); + userFacingWrite.pipe(this.#socket); this.read = userFacingRead; this.write = userFacingWrite; @@ -82,39 +85,36 @@ export class Pty { eofCalled = true; exitResult.then((result) => realExit(result.error, result.code)); - this.#pty.close(); userFacingRead.end(); }; // catch end events - read.on('end', eof); + this.#socket.on('close', eof); // strip out EIO errors - read.on('error', (err: NodeJS.ErrnoException) => { - if (err.code && err.code.indexOf('EIO') !== -1) { - eof(); - return; + this.#socket.on('error', (err: NodeJS.ErrnoException) => { + if (err.code) { + const code = err.code; + if (code === 'EINTR' || code === 'EAGAIN') { + return; + } + if (code.indexOf('EIO') !== -1) { + // EIO only happens when the child dies. + this.#socket.destroy(); + return; + } } this.read.emit('error', err); }); - - write.on('error', (err: NodeJS.ErrnoException) => { - if (err.code && err.code.indexOf('EIO') !== -1) { - eof(); - return; - } - - this.write.emit('error', err); - }); } close() { - this.#pty.close(); + this.#socket.destroy(); } resize(size: Size) { - this.#pty.resize(size); + ptyResize(this.#fd, size); } get pid() {