Skip to content

Commit

Permalink
3.1.2 Set the PTY fd to be non-blocking
Browse files Browse the repository at this point in the history
We have gone back and forth a couple of times, but we _do_ need to set
this FD to be non-blocking. Otherwise node will fully consume one I/O
thread for each PTY, and node only has four of those.

This change makes the controller fd (the one the parent process reads
from / writes to) as non-blocking for node. Also, now that the
controller fd is non-blocking, we need to pass it through Node's TTY
machinery so that it is properly handled. Because of that, Rust is no
longer on the hook for closing the FD, since ownership is fully
transferred to Node.
  • Loading branch information
lhchavez committed Jul 15, 2024
1 parent 6acb8f2 commit 9388cc4
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 104 deletions.
22 changes: 7 additions & 15 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export interface Size {
cols: number
rows: number
}
/** Resize the terminal. */
export function ptyResize(fd: number, size: Size): void
/**
* Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under
* the covers.
Expand All @@ -27,24 +29,14 @@ export function setCloseOnExec(fd: number, closeOnExec: boolean): void
*_CLOEXEC` under the covers.
*/
export function getCloseOnExec(fd: number): boolean
export declare class Pty {
export class Pty {
/** The pid of the forked process. */
pid: number
constructor(opts: PtyOptions)
/** Resize the terminal. */
resize(size: Size): void
/**
* Returns a file descriptor for the PTY controller.
* See the docstring of the class for an usage example.
* Transfers ownership of the file descriptor for the PTY controller. This can only be called
* once (it will error the second time). The caller is responsible for closing the file
* descriptor.
*/
fd(): c_int
/**
* Close the PTY file descriptor. This must be called when the readers / writers of the PTY have
* been closed, otherwise we will leak file descriptors!
*
* In an ideal world, this would be automatically called after the wait loop is done, but Node
* doesn't like that one bit, since it implies that the file is closed outside of the main
* event loop.
*/
close(): void
takeFd(): c_int
}
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,9 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}

const { Pty, setCloseOnExec, getCloseOnExec } = nativeBinding
const { Pty, ptyResize, setCloseOnExec, getCloseOnExec } = nativeBinding

module.exports.Pty = Pty
module.exports.ptyResize = ptyResize
module.exports.setCloseOnExec = setCloseOnExec
module.exports.getCloseOnExec = getCloseOnExec
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty",
"version": "3.1.1",
"version": "3.1.2",
"main": "dist/wrapper.js",
"types": "dist/wrapper.d.ts",
"author": "Szymon Kaliski <[email protected]>",
Expand Down
112 changes: 58 additions & 54 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::io::Error;
use std::io::ErrorKind;
use std::os::fd::{AsRawFd, OwnedFd};
use std::os::fd::{BorrowedFd, FromRawFd, RawFd};
use std::os::fd::{BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
use std::thread;
Expand All @@ -16,7 +16,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun
use napi::Status::GenericFailure;
use napi::{self, Env};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, FdFlag};
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
use nix::poll::{poll, PollFd, PollFlags, PollTimeout};
use nix::pty::{openpty, Winsize};
use nix::sys::termios::{self, SetArg};
Expand Down Expand Up @@ -76,7 +76,8 @@ fn poll_controller_fd_until_read(raw_fd: RawFd) {
continue;
}

// we should never hit this, but if we do, we should just break out of the loop
// we should almost never hit this, but if we do, we should just break out of the loop. this
// can happen if Node destroys the terminal before waiting for the child process to go away.
break;
}

Expand Down Expand Up @@ -117,12 +118,14 @@ impl Pty {
}

// open pty pair, and set close-on-exec to avoid unwanted copies of the FDs from finding their
// way into subprocesses.
// way into subprocesses. Also set the nonblocking flag to avoid Node from consuming a full I/O
// thread for this.
let pty_res = openpty(&window_size, None).map_err(cast_to_napi_error)?;
let controller_fd = pty_res.master;
let user_fd = pty_res.slave;
set_close_on_exec(controller_fd.as_raw_fd(), true)?;
set_close_on_exec(user_fd.as_raw_fd(), true)?;
set_nonblocking(controller_fd.as_raw_fd())?;

// duplicate pty user_fd to be the child's stdin, stdout, and stderr
cmd.stdin(Stdio::from(user_fd.try_clone()?));
Expand Down Expand Up @@ -254,68 +257,43 @@ impl Pty {
})
}

/// Resize the terminal.
/// Transfers ownership of the file descriptor for the PTY controller. This can only be called
/// once (it will error the second time). The caller is responsible for closing the file
/// descriptor.
#[napi]
#[allow(dead_code)]
pub fn resize(&mut self, size: Size) -> Result<(), napi::Error> {
let window_size = Winsize {
ws_col: size.cols,
ws_row: size.rows,
ws_xpixel: 0,
ws_ypixel: 0,
};

if let Some(fd) = &self.controller_fd {
let res = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ, &window_size as *const _) };
if res == -1 {
return Err(napi::Error::new(
napi::Status::GenericFailure,
format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()),
));
}

Ok(())
} else {
Err(napi::Error::new(
napi::Status::GenericFailure,
"ioctl TIOCSWINSZ failed: bad file descriptor (os error 9)",
))
}
}

/// Returns a file descriptor for the PTY controller.
/// See the docstring of the class for an usage example.
#[napi]
#[allow(dead_code)]
pub fn fd(&mut self) -> Result<c_int, napi::Error> {
if let Some(fd) = &self.controller_fd {
Ok(fd.as_raw_fd())
pub fn take_fd(&mut self) -> Result<c_int, napi::Error> {
if let Some(fd) = self.controller_fd.take() {
Ok(fd.into_raw_fd())
} else {
Err(napi::Error::new(
napi::Status::GenericFailure,
"fd failed: bad file descriptor (os error 9)",
))
}
}
}

/// Close the PTY file descriptor. This must be called when the readers / writers of the PTY have
/// been closed, otherwise we will leak file descriptors!
///
/// In an ideal world, this would be automatically called after the wait loop is done, but Node
/// doesn't like that one bit, since it implies that the file is closed outside of the main
/// event loop.
#[napi]
#[allow(dead_code)]
pub fn close(&mut self) -> Result<(), napi::Error> {
if let Some(fd) = self.controller_fd.take() {
unsafe {
// ok to best-effort close as node can also close this via autoClose
libc::close(fd.as_raw_fd());
};
}
/// Resize the terminal.
#[napi]
#[allow(dead_code)]
fn pty_resize(fd: i32, size: Size) -> Result<(), napi::Error> {
let window_size = Winsize {
ws_col: size.cols,
ws_row: size.rows,
ws_xpixel: 0,
ws_ypixel: 0,
};

Ok(())
let res = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &window_size as *const _) };
if res == -1 {
return Err(napi::Error::new(
napi::Status::GenericFailure,
format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()),
));
}

Ok(())
}

/// Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under
Expand Down Expand Up @@ -362,3 +340,29 @@ fn get_close_on_exec(fd: i32) -> Result<bool, napi::Error> {
)),
}
}

/// Set the file descriptor to be non-blocking.
#[allow(dead_code)]
fn set_nonblocking(fd: i32) -> Result<(), napi::Error> {
let old_flags = match fcntl(fd, FcntlArg::F_GETFL) {
Ok(flags) => OFlag::from_bits_truncate(flags),
Err(err) => {
return Err(napi::Error::new(
GenericFailure,
format!("fcntl F_GETFL: {}", err),
));
}
};

let mut new_flags = old_flags;
new_flags.set(OFlag::O_NONBLOCK, true);
if old_flags != new_flags {
if let Err(err) = fcntl(fd, FcntlArg::F_SETFL(new_flags)) {
return Err(napi::Error::new(
GenericFailure,
format!("fcntl F_SETFL: {}", err),
));
}
}
Ok(())
}
76 changes: 69 additions & 7 deletions tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper';
import { type Writable } from 'stream';
import { readdirSync, readlinkSync } from 'fs';
import { describe, test, expect } from 'vitest';

Expand All @@ -19,10 +20,11 @@ function getOpenFds(): FdRecord {
try {
const linkTarget = readlinkSync(procSelfFd + filename);
if (
linkTarget === 'anon_inode:[timerfd]' ||
linkTarget.startsWith('anon_inode:[') ||
linkTarget.startsWith('socket:[') ||
// node likes to asynchronously read stuff mid-test.
linkTarget.includes('/rustpy/')
linkTarget.includes('/ruspty/') ||
linkTarget === '/dev/null'
) {
continue;
}
Expand Down Expand Up @@ -92,7 +94,7 @@ describe(

// We have local echo enabled, so we'll read the message twice.
const result = IS_DARWIN
? 'hello cat\r\n^D\b\bhello cat\r\n'
? 'hello cat\r\nhello cat\r\n^D\b\b'
: 'hello cat\r\nhello cat\r\n';

const pty = new Pty({
Expand Down Expand Up @@ -261,10 +263,7 @@ describe(
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/sh',
args: [
'-c',
'sleep 0.1 ; ls /proc/$$/fd',
],
args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
Expand Down Expand Up @@ -298,6 +297,69 @@ describe(
{ repeats: 4 },
);

test(
'can run concurrent shells',
() =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
const donePromises: Array<Promise<void>> = [];
const readyPromises: Array<Promise<void>> = [];
const writeStreams: Array<Writable> = [];

// We have local echo enabled, so we'll read the message twice.
const result = IS_DARWIN
? 'ready\r\nhello cat\r\nhello cat\r\n^D\b\b'
: 'ready\r\nhello cat\r\nhello cat\r\n';

for (let i = 0; i < 10; i++) {
donePromises.push(
new Promise<void>((accept) => {
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/sh',
args: ['-c', 'echo ready ; exec cat'],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
expect(buffer.toString()).toStrictEqual(result);
accept();
},
});

readyPromises.push(
new Promise<void>((ready) => {
let readyMessageReceived = false;
const readStream = pty.read;
readStream.on('data', (data) => {
buffer = Buffer.concat([buffer, data]);
if (!readyMessageReceived) {
readyMessageReceived = true;
ready();
}
});
}),
);
writeStreams.push(pty.write);
}),
);
}
Promise.allSettled(readyPromises).then(() => {
// The message should end in newline so that the EOT can signal that the input has ended and not
// just the line.
const message = 'hello cat\n';
for (const writeStream of writeStreams) {
writeStream.write(message);
writeStream.end(EOT);
}
});
Promise.allSettled(donePromises).then(() => {
expect(getOpenFds()).toStrictEqual(oldFds);
done();
});
}),
{ repeats: 4 },
);

test("doesn't break when executing non-existing binary", () =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
Expand Down
Loading

0 comments on commit 9388cc4

Please sign in to comment.