diff --git a/index.d.ts b/index.d.ts index efdcc53..dae5f58 100644 --- a/index.d.ts +++ b/index.d.ts @@ -17,6 +17,8 @@ export interface Size { cols: number rows: number } +/** Resize the terminal. */ +export function ptyResize(fd: number, size: Size): void /** * Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under * the covers. @@ -27,24 +29,14 @@ export function setCloseOnExec(fd: number, closeOnExec: boolean): void *_CLOEXEC` under the covers. */ export function getCloseOnExec(fd: number): boolean -export declare class Pty { +export class Pty { /** The pid of the forked process. */ pid: number constructor(opts: PtyOptions) - /** Resize the terminal. */ - resize(size: Size): void /** - * Returns a file descriptor for the PTY controller. - * See the docstring of the class for an usage example. + * Transfers ownership of the file descriptor for the PTY controller. This can only be called + * once (it will error the second time). The caller is responsible for closing the file + * descriptor. */ - fd(): c_int - /** - * Close the PTY file descriptor. This must be called when the readers / writers of the PTY have - * been closed, otherwise we will leak file descriptors! - * - * In an ideal world, this would be automatically called after the wait loop is done, but Node - * doesn't like that one bit, since it implies that the file is closed outside of the main - * event loop. - */ - close(): void + takeFd(): c_int } diff --git a/index.js b/index.js index 251c62d..43de6ae 100644 --- a/index.js +++ b/index.js @@ -310,8 +310,9 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { Pty, setCloseOnExec, getCloseOnExec } = nativeBinding +const { Pty, ptyResize, setCloseOnExec, getCloseOnExec } = nativeBinding module.exports.Pty = Pty +module.exports.ptyResize = ptyResize module.exports.setCloseOnExec = setCloseOnExec module.exports.getCloseOnExec = getCloseOnExec diff --git a/package-lock.json b/package-lock.json index 164b6b2..105749c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/ruspty", - "version": "3.1.1", + "version": "3.1.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@replit/ruspty", - "version": "3.1.1", + "version": "3.1.2", "license": "MIT", "devDependencies": { "@napi-rs/cli": "^2.18.2", diff --git a/package.json b/package.json index b50b062..daae6b7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty", - "version": "3.1.1", + "version": "3.1.2", "main": "dist/wrapper.js", "types": "dist/wrapper.d.ts", "author": "Szymon Kaliski ", diff --git a/src/lib.rs b/src/lib.rs index 59cf337..97b8cc7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::Error; use std::io::ErrorKind; use std::os::fd::{AsRawFd, OwnedFd}; -use std::os::fd::{BorrowedFd, FromRawFd, RawFd}; +use std::os::fd::{BorrowedFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; use std::thread; @@ -16,7 +16,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun use napi::Status::GenericFailure; use napi::{self, Env}; use nix::errno::Errno; -use nix::fcntl::{fcntl, FcntlArg, FdFlag}; +use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag}; use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; use nix::pty::{openpty, Winsize}; use nix::sys::termios::{self, SetArg}; @@ -76,7 +76,8 @@ fn poll_controller_fd_until_read(raw_fd: RawFd) { continue; } - // we should never hit this, but if we do, we should just break out of the loop + // we should almost never hit this, but if we do, we should just break out of the loop. this + // can happen if Node destroys the terminal before waiting for the child process to go away. break; } @@ -117,12 +118,14 @@ impl Pty { } // open pty pair, and set close-on-exec to avoid unwanted copies of the FDs from finding their - // way into subprocesses. + // way into subprocesses. Also set the nonblocking flag to avoid Node from consuming a full I/O + // thread for this. let pty_res = openpty(&window_size, None).map_err(cast_to_napi_error)?; let controller_fd = pty_res.master; let user_fd = pty_res.slave; set_close_on_exec(controller_fd.as_raw_fd(), true)?; set_close_on_exec(user_fd.as_raw_fd(), true)?; + set_nonblocking(controller_fd.as_raw_fd())?; // duplicate pty user_fd to be the child's stdin, stdout, and stderr cmd.stdin(Stdio::from(user_fd.try_clone()?)); @@ -254,42 +257,14 @@ impl Pty { }) } - /// Resize the terminal. + /// Transfers ownership of the file descriptor for the PTY controller. This can only be called + /// once (it will error the second time). The caller is responsible for closing the file + /// descriptor. #[napi] #[allow(dead_code)] - pub fn resize(&mut self, size: Size) -> Result<(), napi::Error> { - let window_size = Winsize { - ws_col: size.cols, - ws_row: size.rows, - ws_xpixel: 0, - ws_ypixel: 0, - }; - - if let Some(fd) = &self.controller_fd { - let res = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ, &window_size as *const _) }; - if res == -1 { - return Err(napi::Error::new( - napi::Status::GenericFailure, - format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()), - )); - } - - Ok(()) - } else { - Err(napi::Error::new( - napi::Status::GenericFailure, - "ioctl TIOCSWINSZ failed: bad file descriptor (os error 9)", - )) - } - } - - /// Returns a file descriptor for the PTY controller. - /// See the docstring of the class for an usage example. - #[napi] - #[allow(dead_code)] - pub fn fd(&mut self) -> Result { - if let Some(fd) = &self.controller_fd { - Ok(fd.as_raw_fd()) + pub fn take_fd(&mut self) -> Result { + if let Some(fd) = self.controller_fd.take() { + Ok(fd.into_raw_fd()) } else { Err(napi::Error::new( napi::Status::GenericFailure, @@ -297,25 +272,28 @@ impl Pty { )) } } +} - /// Close the PTY file descriptor. This must be called when the readers / writers of the PTY have - /// been closed, otherwise we will leak file descriptors! - /// - /// In an ideal world, this would be automatically called after the wait loop is done, but Node - /// doesn't like that one bit, since it implies that the file is closed outside of the main - /// event loop. - #[napi] - #[allow(dead_code)] - pub fn close(&mut self) -> Result<(), napi::Error> { - if let Some(fd) = self.controller_fd.take() { - unsafe { - // ok to best-effort close as node can also close this via autoClose - libc::close(fd.as_raw_fd()); - }; - } +/// Resize the terminal. +#[napi] +#[allow(dead_code)] +fn pty_resize(fd: i32, size: Size) -> Result<(), napi::Error> { + let window_size = Winsize { + ws_col: size.cols, + ws_row: size.rows, + ws_xpixel: 0, + ws_ypixel: 0, + }; - Ok(()) + let res = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &window_size as *const _) }; + if res == -1 { + return Err(napi::Error::new( + napi::Status::GenericFailure, + format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()), + )); } + + Ok(()) } /// Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under @@ -362,3 +340,29 @@ fn get_close_on_exec(fd: i32) -> Result { )), } } + +/// Set the file descriptor to be non-blocking. +#[allow(dead_code)] +fn set_nonblocking(fd: i32) -> Result<(), napi::Error> { + let old_flags = match fcntl(fd, FcntlArg::F_GETFL) { + Ok(flags) => OFlag::from_bits_truncate(flags), + Err(err) => { + return Err(napi::Error::new( + GenericFailure, + format!("fcntl F_GETFL: {}", err), + )); + } + }; + + let mut new_flags = old_flags; + new_flags.set(OFlag::O_NONBLOCK, true); + if old_flags != new_flags { + if let Err(err) = fcntl(fd, FcntlArg::F_SETFL(new_flags)) { + return Err(napi::Error::new( + GenericFailure, + format!("fcntl F_SETFL: {}", err), + )); + } + } + Ok(()) +} diff --git a/tests/index.test.ts b/tests/index.test.ts index 03721c9..c627b9b 100644 --- a/tests/index.test.ts +++ b/tests/index.test.ts @@ -1,4 +1,5 @@ import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper'; +import { type Writable } from 'stream'; import { readdirSync, readlinkSync } from 'fs'; import { describe, test, expect } from 'vitest'; @@ -19,10 +20,11 @@ function getOpenFds(): FdRecord { try { const linkTarget = readlinkSync(procSelfFd + filename); if ( - linkTarget === 'anon_inode:[timerfd]' || + linkTarget.startsWith('anon_inode:[') || linkTarget.startsWith('socket:[') || // node likes to asynchronously read stuff mid-test. - linkTarget.includes('/rustpy/') + linkTarget.includes('/ruspty/') || + linkTarget === '/dev/null' ) { continue; } @@ -91,16 +93,19 @@ describe( let buffer = ''; // We have local echo enabled, so we'll read the message twice. - const result = IS_DARWIN - ? 'hello cat\r\n^D\b\bhello cat\r\n' - : 'hello cat\r\nhello cat\r\n'; + const expectedResult = 'hello cat\r\nhello cat\r\n'; const pty = new Pty({ command: '/bin/cat', onExit: (err, exitCode) => { expect(err).toBeNull(); expect(exitCode).toBe(0); - expect(buffer.trim()).toBe(result.trim()); + let result = buffer.toString(); + if (IS_DARWIN) { + // Darwin adds the visible EOT to the stream. + result = result.replace('^D\b\b', ''); + } + expect(result.trim()).toStrictEqual(expectedResult.trim()); expect(getOpenFds()).toStrictEqual(oldFds); done(); }, @@ -261,10 +266,7 @@ describe( let buffer = Buffer.from(''); const pty = new Pty({ command: '/bin/sh', - args: [ - '-c', - 'sleep 0.1 ; ls /proc/$$/fd', - ], + args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'], onExit: (err, exitCode) => { expect(err).toBeNull(); expect(exitCode).toBe(0); @@ -298,6 +300,72 @@ describe( { repeats: 4 }, ); + test( + 'can run concurrent shells', + () => + new Promise((done) => { + const oldFds = getOpenFds(); + const donePromises: Array> = []; + const readyPromises: Array> = []; + const writeStreams: Array = []; + + // We have local echo enabled, so we'll read the message twice. + const expectedResult = 'ready\r\nhello cat\r\nhello cat\r\n'; + + for (let i = 0; i < 10; i++) { + donePromises.push( + new Promise((accept) => { + let buffer = Buffer.from(''); + const pty = new Pty({ + command: '/bin/sh', + args: ['-c', 'echo ready ; exec cat'], + onExit: (err, exitCode) => { + expect(err).toBeNull(); + expect(exitCode).toBe(0); + let result = buffer.toString(); + if (IS_DARWIN) { + // Darwin adds the visible EOT to the stream. + result = result.replace('^D\b\b', ''); + } + expect(result).toStrictEqual(expectedResult); + accept(); + }, + }); + + readyPromises.push( + new Promise((ready) => { + let readyMessageReceived = false; + const readStream = pty.read; + readStream.on('data', (data) => { + buffer = Buffer.concat([buffer, data]); + if (!readyMessageReceived) { + readyMessageReceived = true; + ready(); + } + }); + }), + ); + writeStreams.push(pty.write); + }), + ); + } + Promise.allSettled(readyPromises).then(() => { + // The message should end in newline so that the EOT can signal that the input has ended and not + // just the line. + const message = 'hello cat\n'; + for (const writeStream of writeStreams) { + writeStream.write(message); + writeStream.end(EOT); + } + }); + Promise.allSettled(donePromises).then(() => { + expect(getOpenFds()).toStrictEqual(oldFds); + done(); + }); + }), + { repeats: 4 }, + ); + test("doesn't break when executing non-existing binary", () => new Promise((done) => { const oldFds = getOpenFds(); diff --git a/wrapper.ts b/wrapper.ts index fcd1918..4ec2bd1 100644 --- a/wrapper.ts +++ b/wrapper.ts @@ -1,12 +1,13 @@ -import { PassThrough, Readable, Writable } from 'stream'; +import { PassThrough, type Readable, type Writable } from 'node:stream'; +import { ReadStream } from 'node:tty'; import { Pty as RawPty, type Size, setCloseOnExec as rawSetCloseOnExec, getCloseOnExec as rawGetCloseOnExec, + ptyResize, } from './index.js'; import { type PtyOptions as RawOptions } from './index.js'; -import fs from 'fs'; export type PtyOptions = RawOptions; @@ -43,6 +44,8 @@ type ExitResult = { */ export class Pty { #pty: RawPty; + #fd: number; + #socket: ReadStream; read: Readable; write: Writable; @@ -63,14 +66,14 @@ export class Pty { // we use a mocked exit function to capture the exit result // and then call the real exit function after the fd is fully read this.#pty = new RawPty({ ...options, onExit: mockedExit }); - const fd = this.#pty.fd(); + // Transfer ownership of the FD to us. + this.#fd = this.#pty.takeFd(); - const read = fs.createReadStream('', { fd, autoClose: false }); - const write = fs.createWriteStream('', { fd, autoClose: false }); + this.#socket = new ReadStream(this.#fd); const userFacingRead = new PassThrough(); const userFacingWrite = new PassThrough(); - read.pipe(userFacingRead); - userFacingWrite.pipe(write); + this.#socket.pipe(userFacingRead); + userFacingWrite.pipe(this.#socket); this.read = userFacingRead; this.write = userFacingWrite; @@ -82,39 +85,36 @@ export class Pty { eofCalled = true; exitResult.then((result) => realExit(result.error, result.code)); - this.#pty.close(); userFacingRead.end(); }; // catch end events - read.on('end', eof); + this.#socket.on('close', eof); // strip out EIO errors - read.on('error', (err: NodeJS.ErrnoException) => { - if (err.code && err.code.indexOf('EIO') !== -1) { - eof(); - return; + this.#socket.on('error', (err: NodeJS.ErrnoException) => { + if (err.code) { + const code = err.code; + if (code === 'EINTR' || code === 'EAGAIN') { + return; + } + if (code.indexOf('EIO') !== -1) { + // EIO only happens when the child dies. + this.#socket.destroy(); + return; + } } this.read.emit('error', err); }); - - write.on('error', (err: NodeJS.ErrnoException) => { - if (err.code && err.code.indexOf('EIO') !== -1) { - eof(); - return; - } - - this.write.emit('error', err); - }); } close() { - this.#pty.close(); + this.#socket.destroy(); } resize(size: Size) { - this.#pty.resize(size); + ptyResize(this.#fd, size); } get pid() {