Skip to content

Commit

Permalink
Fix a race between process death and stdio read
Browse files Browse the repository at this point in the history
Previously we used to close the fd immediately after the process was
terminated. That made lifetimes simple to reason about, but we
accidentally introduced a very subtle bug: If bun hadn't read the
entirety of the stdout/stderr of the child process, since Rust would
close the file from under it, Bun would not be able to read the rest of
the output, truncating it!

This change now adds an explicit `close()` and `fd()` functions so that
the fd management becomes way more explicit than before. Tests were also
updated to ensure that there are no leaked FDs, since now we lost the
guarantee that everything was going to be cleaned up properly.
  • Loading branch information
lhchavez committed May 5, 2024
1 parent 2aeef95 commit 2479a55
Show file tree
Hide file tree
Showing 6 changed files with 391 additions and 122 deletions.
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
in
pkgs.mkShell {
buildInputs = with pkgs; [
nodejs_20
bun
cargo
libiconv
Expand Down
68 changes: 67 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,79 @@

/* auto-generated by NAPI-RS */

/** A size struct to pass to resize. */
export interface Size {
cols: number
rows: number
}
/**
* A very thin wrapper around PTYs and processes. The caller is responsible for calling `.close()`
* when all streams have been closed. We hold onto both ends of the PTY (controller and user) to
* prevent reads from erroring out with EIO.
*
* This is the recommended usage:
*
* ```
* const { Pty } = require('replit-ruspy');
* const fs = require('node:fs');
*
* const pty = new Pty('sh', [], ENV, CWD, { rows: 24, cols: 80 }, (...result) => {
* pty.close();
* // TODO: Handle process exit.
* });
*
* const read = new fs.createReadStream('', {
* fd: pty.fd(),
* start: 0,
* highWaterMark: 16 * 1024,
* autoClose: true,
* });
* const write = new fs.createWriteStream('', {
* fd: pty.fd(),
* autoClose: true,
* });
*
* read.on('data', (chunk) => {
* // TODO: Handle data.
* });
* read.on('error', (err) => {
* if (err.code && err.code.indexOf('EIO') !== -1) {
* // This is expected to happen when the process exits.
* return;
* }
* // TODO: Handle the error.
* });
* write.on('error', (err) => {
* if (err.code && err.code.indexOf('EIO') !== -1) {
* // This is expected to happen when the process exits.
* return;
* }
* // TODO: Handle the error.
* });
* ```
*/
export class Pty {
fd: number
/** The pid of the forked process. */
pid: number
constructor(command: string, args: Array<string>, envs: Record<string, string>, dir: string, size: Size, onExit: (err: null | Error, exitCode: number) => void)
/** Resize the terminal. */
resize(size: Size): void
/**
* Returns a file descriptor for the PTY controller. If running under node, it will dup the file
* descriptor, but under bun it will return the same file desciptor, since bun does not close
* the streams by itself. Maybe that is a bug in bun, so we should confirm the new behavior
* after we upgrade.
*
* See the docstring of the class for an usage example.
*/
fd(): c_int
/**
* Close the PTY file descriptor. This must be called when the readers / writers of the PTY have
* been closed, otherwise we will leak file descriptors!
*
* In an ideal world, this would be automatically called after the wait loop is done, but Node
* doesn't like that one bit, since it implies that the file is closed outside of the main
* event loop.
*/
close(): void
}
187 changes: 141 additions & 46 deletions index.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,51 @@
import fs from 'fs';
import os from 'node:os';
import { readdir, readlink } from 'node:fs/promises';
import { Pty } from './index';

const EOT = '\x04';
const procSelfFd = '/proc/self/fd/';
const previousFDs: Record<string, string> = {};

// These two functions ensure that there are no extra open file descriptors after each test
// finishes. Only works on Linux.
if (os.type() !== 'Darwin') {
beforeEach(async () => {
for (const filename of await readdir(procSelfFd)) {
try {
previousFDs[filename] = await readlink(procSelfFd + filename);
} catch (err: any) {
if (err.code === 'ENOENT') {
continue;
}
throw err;
}
}
});
afterEach(async () => {
for (const filename of await readdir(procSelfFd)) {
try {
const linkTarget = await readlink(procSelfFd + filename);
if (linkTarget === 'anon_inode:[timerfd]') {
continue;
}
expect(previousFDs).toHaveProperty(filename, linkTarget);
} catch (err: any) {
if (err.code === 'ENOENT') {
continue;
}
throw err;
}
}
});
}

describe('PTY', () => {
const CWD = process.cwd();

test('spawns and exits', (done) => {
const message = 'hello from a pty';
let buffer = '';

const pty = new Pty(
'/bin/echo',
Expand All @@ -16,19 +56,30 @@ describe('PTY', () => {
(err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
expect(buffer).toBe(message + '\r\n');
pty.close();

done();
},
);

const readStream = fs.createReadStream('', { fd: pty.fd });
const readStreamFd = pty.fd();
const readStream = fs.createReadStream('', { fd: readStreamFd });

readStream.on('data', (chunk) => {
expect(chunk.toString()).toBe(message + '\r\n');
buffer += chunk.toString();
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
console.log('err', { err });
throw err;
});
});

test('captures an exit code', (done) => {
new Pty(
let pty = new Pty(
'/bin/sh',
['-c', 'exit 17'],
{},
Expand All @@ -37,46 +88,59 @@ describe('PTY', () => {
(err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(17);
pty.close();

done();
},
);
});

test('can be written to', (done) => {
const message = 'hello cat';
// The message should end in newline so that the EOT can signal that the input has ended and not
// just the line.
const message = 'hello cat\n';
let buffer = '';

const pty = new Pty(
'/bin/cat',
[],
{},
CWD,
{ rows: 24, cols: 80 },
() => {},
);
const pty = new Pty('/bin/cat', [], {}, CWD, { rows: 24, cols: 80 }, () => {
// We have local echo enabled, so we'll read the message twice.
expect(buffer).toBe('hello cat\r\nhello cat\r\n');
pty.close();

const readStream = fs.createReadStream('', { fd: pty.fd });
const writeStream = fs.createWriteStream('', { fd: pty.fd });
done();
});

const readStream = fs.createReadStream('', { fd: pty.fd() });
const writeStream = fs.createWriteStream('', { fd: pty.fd() });

readStream.on('data', (chunk) => {
expect(chunk.toString()).toBe(message);
done();
buffer += chunk.toString();
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});

writeStream.write(message);
writeStream.end(EOT);
writeStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test('can be resized', (done) => {
const pty = new Pty(
'/bin/sh',
[],
{},
CWD,
{ rows: 24, cols: 80 },
() => {},
);
const pty = new Pty('/bin/sh', [], {}, CWD, { rows: 24, cols: 80 }, () => {
pty.close();

const readStream = fs.createReadStream('', { fd: pty.fd });
const writeStream = fs.createWriteStream('', { fd: pty.fd });
done();
});

const readStream = fs.createReadStream('', { fd: pty.fd() });
const writeStream = fs.createWriteStream('', { fd: pty.fd() });

let buffer = '';

Expand All @@ -92,14 +156,28 @@ describe('PTY', () => {

if (buffer.includes('done2\r\n')) {
expect(buffer).toContain('60 100');
done();
}
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});

writeStream.write("stty size; echo 'done1'\n");
writeStream.end(EOT);
writeStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test('respects working directory', (done) => {
let buffer = '';

const pty = new Pty(
'/bin/pwd',
[],
Expand All @@ -109,18 +187,27 @@ describe('PTY', () => {
(err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
expect(buffer).toBe(`${CWD}\r\n`);
pty.close();

done();
},
);

const readStream = fs.createReadStream('', { fd: pty.fd });
const readStream = fs.createReadStream('', { fd: pty.fd() });

readStream.on('data', (chunk) => {
expect(chunk.toString()).toBe(`${CWD}\r\n`);
buffer += chunk.toString();
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test.skip('respects env', (done) => {
test('respects env', (done) => {
const message = 'hello from env';
let buffer = '';

Expand All @@ -136,44 +223,52 @@ describe('PTY', () => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
expect(buffer).toBe(message + '\r\n');
pty.close();

done();
},
);

const readStream = fs.createReadStream('', { fd: pty.fd });
const readStream = fs.createReadStream('', { fd: pty.fd() });

readStream.on('data', (chunk) => {
buffer += chunk.toString();
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test('works with Bun.read & Bun.write', (done) => {
const message = 'hello bun';
const message = 'hello bun\n';
let buffer = '';

const pty = new Pty(
'/bin/cat',
[],
{},
CWD,
{ rows: 24, cols: 80 },
() => {},
);
const pty = new Pty('/bin/cat', [], {}, CWD, { rows: 24, cols: 80 }, () => {
// We have local echo enabled, so we'll read the message twice. Furthermore, the newline
// is converted to `\r\n` in this method.
expect(buffer).toBe('hello bun\r\nhello bun\r\n');
pty.close();

done();
});

const file = Bun.file(pty.fd);
const file = Bun.file(pty.fd());

async function read() {
const stream = file.stream();

for await (const chunk of stream) {
expect(Buffer.from(chunk).toString()).toBe(message);
done();
buffer += Buffer.from(chunk).toString();
// TODO: For some reason, Bun's stream will raise the EIO somewhere where we cannot catch
// it, and make the test fail no matter how many try / catch blocks we add.
break;
}
}

read();

Bun.write(pty.fd, message);
Bun.write(pty.fd(), message + EOT + EOT);
});

test("doesn't break when executing non-existing binary", (done) => {
Expand Down
Loading

0 comments on commit 2479a55

Please sign in to comment.