Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.1.2 Set the PTY fd to be non-blocking #36

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export interface Size {
cols: number
rows: number
}
/** Resize the terminal. */
export function ptyResize(fd: number, size: Size): void
/**
* Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under
* the covers.
Expand All @@ -27,24 +29,14 @@ export function setCloseOnExec(fd: number, closeOnExec: boolean): void
*_CLOEXEC` under the covers.
*/
export function getCloseOnExec(fd: number): boolean
export declare class Pty {
export class Pty {
/** The pid of the forked process. */
pid: number
constructor(opts: PtyOptions)
/** Resize the terminal. */
resize(size: Size): void
/**
* Returns a file descriptor for the PTY controller.
* See the docstring of the class for an usage example.
* Transfers ownership of the file descriptor for the PTY controller. This can only be called
* once (it will error the second time). The caller is responsible for closing the file
* descriptor.
*/
fd(): c_int
/**
* Close the PTY file descriptor. This must be called when the readers / writers of the PTY have
* been closed, otherwise we will leak file descriptors!
*
* In an ideal world, this would be automatically called after the wait loop is done, but Node
* doesn't like that one bit, since it implies that the file is closed outside of the main
* event loop.
*/
close(): void
takeFd(): c_int
}
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,9 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}

const { Pty, setCloseOnExec, getCloseOnExec } = nativeBinding
const { Pty, ptyResize, setCloseOnExec, getCloseOnExec } = nativeBinding

module.exports.Pty = Pty
module.exports.ptyResize = ptyResize
module.exports.setCloseOnExec = setCloseOnExec
module.exports.getCloseOnExec = getCloseOnExec
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty",
"version": "3.1.1",
"version": "3.1.2",
"main": "dist/wrapper.js",
"types": "dist/wrapper.d.ts",
"author": "Szymon Kaliski <[email protected]>",
Expand Down
112 changes: 58 additions & 54 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::io::Error;
use std::io::ErrorKind;
use std::os::fd::{AsRawFd, OwnedFd};
use std::os::fd::{BorrowedFd, FromRawFd, RawFd};
use std::os::fd::{BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
use std::thread;
Expand All @@ -16,7 +16,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun
use napi::Status::GenericFailure;
use napi::{self, Env};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, FdFlag};
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
use nix::poll::{poll, PollFd, PollFlags, PollTimeout};
use nix::pty::{openpty, Winsize};
use nix::sys::termios::{self, SetArg};
Expand Down Expand Up @@ -76,7 +76,8 @@ fn poll_controller_fd_until_read(raw_fd: RawFd) {
continue;
}

// we should never hit this, but if we do, we should just break out of the loop
// we should almost never hit this, but if we do, we should just break out of the loop. this
// can happen if Node destroys the terminal before waiting for the child process to go away.
break;
}

Expand Down Expand Up @@ -117,12 +118,14 @@ impl Pty {
}

// open pty pair, and set close-on-exec to avoid unwanted copies of the FDs from finding their
// way into subprocesses.
// way into subprocesses. Also set the nonblocking flag to avoid Node from consuming a full I/O
// thread for this.
let pty_res = openpty(&window_size, None).map_err(cast_to_napi_error)?;
let controller_fd = pty_res.master;
let user_fd = pty_res.slave;
set_close_on_exec(controller_fd.as_raw_fd(), true)?;
set_close_on_exec(user_fd.as_raw_fd(), true)?;
set_nonblocking(controller_fd.as_raw_fd())?;

// duplicate pty user_fd to be the child's stdin, stdout, and stderr
cmd.stdin(Stdio::from(user_fd.try_clone()?));
Expand Down Expand Up @@ -254,68 +257,43 @@ impl Pty {
})
}

/// Resize the terminal.
/// Transfers ownership of the file descriptor for the PTY controller. This can only be called
/// once (it will error the second time). The caller is responsible for closing the file
/// descriptor.
#[napi]
#[allow(dead_code)]
pub fn resize(&mut self, size: Size) -> Result<(), napi::Error> {
let window_size = Winsize {
ws_col: size.cols,
ws_row: size.rows,
ws_xpixel: 0,
ws_ypixel: 0,
};

if let Some(fd) = &self.controller_fd {
let res = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ, &window_size as *const _) };
if res == -1 {
return Err(napi::Error::new(
napi::Status::GenericFailure,
format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()),
));
}

Ok(())
} else {
Err(napi::Error::new(
napi::Status::GenericFailure,
"ioctl TIOCSWINSZ failed: bad file descriptor (os error 9)",
))
}
}

/// Returns a file descriptor for the PTY controller.
/// See the docstring of the class for an usage example.
#[napi]
#[allow(dead_code)]
pub fn fd(&mut self) -> Result<c_int, napi::Error> {
if let Some(fd) = &self.controller_fd {
Ok(fd.as_raw_fd())
pub fn take_fd(&mut self) -> Result<c_int, napi::Error> {
if let Some(fd) = self.controller_fd.take() {
lhchavez marked this conversation as resolved.
Show resolved Hide resolved
Ok(fd.into_raw_fd())
} else {
Err(napi::Error::new(
napi::Status::GenericFailure,
"fd failed: bad file descriptor (os error 9)",
))
}
}
}

/// Close the PTY file descriptor. This must be called when the readers / writers of the PTY have
/// been closed, otherwise we will leak file descriptors!
///
/// In an ideal world, this would be automatically called after the wait loop is done, but Node
/// doesn't like that one bit, since it implies that the file is closed outside of the main
/// event loop.
#[napi]
#[allow(dead_code)]
pub fn close(&mut self) -> Result<(), napi::Error> {
if let Some(fd) = self.controller_fd.take() {
unsafe {
// ok to best-effort close as node can also close this via autoClose
libc::close(fd.as_raw_fd());
};
}
/// Resize the terminal.
#[napi]
#[allow(dead_code)]
fn pty_resize(fd: i32, size: Size) -> Result<(), napi::Error> {
let window_size = Winsize {
ws_col: size.cols,
ws_row: size.rows,
ws_xpixel: 0,
ws_ypixel: 0,
};

Ok(())
let res = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &window_size as *const _) };
if res == -1 {
return Err(napi::Error::new(
napi::Status::GenericFailure,
format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()),
));
}

Ok(())
}

/// Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under
Expand Down Expand Up @@ -362,3 +340,29 @@ fn get_close_on_exec(fd: i32) -> Result<bool, napi::Error> {
)),
}
}

/// Set the file descriptor to be non-blocking.
#[allow(dead_code)]
fn set_nonblocking(fd: i32) -> Result<(), napi::Error> {
let old_flags = match fcntl(fd, FcntlArg::F_GETFL) {
Ok(flags) => OFlag::from_bits_truncate(flags),
Err(err) => {
return Err(napi::Error::new(
GenericFailure,
format!("fcntl F_GETFL: {}", err),
));
}
};

let mut new_flags = old_flags;
new_flags.set(OFlag::O_NONBLOCK, true);
if old_flags != new_flags {
if let Err(err) = fcntl(fd, FcntlArg::F_SETFL(new_flags)) {
return Err(napi::Error::new(
GenericFailure,
format!("fcntl F_SETFL: {}", err),
));
}
}
Ok(())
}
88 changes: 78 additions & 10 deletions tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper';
import { type Writable } from 'stream';
import { readdirSync, readlinkSync } from 'fs';
import { describe, test, expect } from 'vitest';

Expand All @@ -19,10 +20,11 @@ function getOpenFds(): FdRecord {
try {
const linkTarget = readlinkSync(procSelfFd + filename);
if (
linkTarget === 'anon_inode:[timerfd]' ||
linkTarget.startsWith('anon_inode:[') ||
lhchavez marked this conversation as resolved.
Show resolved Hide resolved
linkTarget.startsWith('socket:[') ||
// node likes to asynchronously read stuff mid-test.
linkTarget.includes('/rustpy/')
linkTarget.includes('/ruspty/') ||
linkTarget === '/dev/null'
) {
continue;
}
Expand Down Expand Up @@ -91,16 +93,19 @@ describe(
let buffer = '';

// We have local echo enabled, so we'll read the message twice.
const result = IS_DARWIN
? 'hello cat\r\n^D\b\bhello cat\r\n'
: 'hello cat\r\nhello cat\r\n';
const expectedResult = 'hello cat\r\nhello cat\r\n';

const pty = new Pty({
command: '/bin/cat',
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
expect(buffer.trim()).toBe(result.trim());
let result = buffer.toString();
if (IS_DARWIN) {
// Darwin adds the visible EOT to the stream.
result = result.replace('^D\b\b', '');
}
expect(result.trim()).toStrictEqual(expectedResult.trim());
expect(getOpenFds()).toStrictEqual(oldFds);
done();
},
Expand Down Expand Up @@ -261,10 +266,7 @@ describe(
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/sh',
args: [
'-c',
'sleep 0.1 ; ls /proc/$$/fd',
],
args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
Expand Down Expand Up @@ -298,6 +300,72 @@ describe(
{ repeats: 4 },
);

test(
'can run concurrent shells',
() =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
const donePromises: Array<Promise<void>> = [];
const readyPromises: Array<Promise<void>> = [];
const writeStreams: Array<Writable> = [];

// We have local echo enabled, so we'll read the message twice.
const expectedResult = 'ready\r\nhello cat\r\nhello cat\r\n';

for (let i = 0; i < 10; i++) {
donePromises.push(
new Promise<void>((accept) => {
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/sh',
args: ['-c', 'echo ready ; exec cat'],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
let result = buffer.toString();
if (IS_DARWIN) {
// Darwin adds the visible EOT to the stream.
result = result.replace('^D\b\b', '');
}
expect(result).toStrictEqual(expectedResult);
accept();
},
});

readyPromises.push(
new Promise<void>((ready) => {
let readyMessageReceived = false;
const readStream = pty.read;
readStream.on('data', (data) => {
buffer = Buffer.concat([buffer, data]);
if (!readyMessageReceived) {
readyMessageReceived = true;
ready();
}
});
}),
);
writeStreams.push(pty.write);
}),
);
}
Promise.allSettled(readyPromises).then(() => {
// The message should end in newline so that the EOT can signal that the input has ended and not
// just the line.
const message = 'hello cat\n';
for (const writeStream of writeStreams) {
writeStream.write(message);
writeStream.end(EOT);
}
});
Promise.allSettled(donePromises).then(() => {
expect(getOpenFds()).toStrictEqual(oldFds);
done();
});
}),
{ repeats: 4 },
);

test("doesn't break when executing non-existing binary", () =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
Expand Down
Loading