Skip to content

Commit

Permalink
Fix a race between process death and stdio read
Browse files Browse the repository at this point in the history
Previously we used to close the fd immediately after the process was
terminated. That made lifetimes simple to reason about, but we
accidentally introduced a very subtle bug: If bun hadn't read the
entirety of the stdout/stderr of the child process, since Rust would
close the file from under it, Bun would not be able to read the rest of
the output, truncating it!

This change now adds an explicit `close()` and `fd()` functions so that
the fd management becomes way more explicit than before. Tests were also
updated to ensure that there are no leaked FDs, since now we lost the
guarantee that everything was going to be cleaned up properly.
  • Loading branch information
lhchavez committed May 5, 2024
1 parent 2aeef95 commit dcf208b
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 120 deletions.
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
in
pkgs.mkShell {
buildInputs = with pkgs; [
nodejs_20
bun
cargo
libiconv
Expand Down
66 changes: 65 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,77 @@

/* auto-generated by NAPI-RS */

/** A size struct to pass to resize. */
export interface Size {
cols: number
rows: number
}
/**
* A very thin wrapper around PTYs and processes. The caller is responsible for calling `.close()`
* when all streams have been closed.
*
* This is the recommended usage:
*
* ```
* const { Pty } = require('replit-ruspy');
* const fs = require('node:fs');
*
* const pty = new Pty('sh', [], ENV, CWD, { rows: 24, cols: 80 }, (...result) => {
* // TODO: Handle process exit.
* });
*
* const read = new fs.createReadStream('', {
* fd: pty.fd(),
* start: 0,
* highWaterMark: 16 * 1024,
* autoClose: true,
* });
* const write = new fs.createWriteStream('', {
* fd: pty.fd(),
* autoClose: true,
* });
*
* read.on('data', (chunk) => {
* // TODO: Handle data.
* });
* read.on('close', () => {
* // TODO: Handle close.
* pty.close();
* });
* read.on('error', (err) => {
* if (err.code && err.code.indexOf('EIO') !== -1) {
* // This is expected to happen when the process exits.
* return;
* }
* // TODO: Handle the error.
* });
* write.on('error', (err) => {
* if (err.code && err.code.indexOf('EIO') !== -1) {
* // This is expected to happen when the process exits.
* return;
* }
* // TODO: Handle the error.
* });
* ```
*/
export class Pty {
fd: number
/** The pid of the forked process. */
pid: number
constructor(command: string, args: Array<string>, envs: Record<string, string>, dir: string, size: Size, onExit: (err: null | Error, exitCode: number) => void)
/** Resize the terminal. */
resize(size: Size): void
/**
* Returns a file descriptor for the PTY controller. If running under node, it will dup the file
* descriptor, but under bun it will return the same file desciptor, since bun does not close
* the streams by itself. Maybe that is a bug in bun, so we should confirm the new behavior
* after we upgrade.
*
* See the docstring of the class for an usage example.
*/
fd(): c_int
/**
* Close the PTY file descriptor. This must be called when the readers / writers of the PTY have
* been closed, otherwise we will leak file descriptors!
*/
close(): void
}
172 changes: 128 additions & 44 deletions index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,42 @@
import fs from 'fs';
import { readdir, readlink } from 'node:fs/promises';
import { Pty } from './index';

const EOT = '\x04';
const procSelfFd = '/proc/self/fd/';
const previousFDs: Record<string, string> = {};

// These two functions ensure that there are no extra open file descriptors after each test
// finishes.
beforeEach(async () => {
for (const filename of await readdir(procSelfFd)) {
try {
previousFDs[filename] = await readlink(procSelfFd + filename);
} catch (err: any) {
if (err.code === 'ENOENT') {
continue;
}
throw err;
}
}
});
afterEach(async () => {
for (const filename of await readdir(procSelfFd)) {
try {
const linkTarget = await readlink(procSelfFd + filename);
if (linkTarget === 'anon_inode:[timerfd]') {
continue;
}
expect(previousFDs).toHaveProperty(filename, linkTarget);
} catch (err: any) {
if (err.code === 'ENOENT') {
continue;
}
throw err;
}
}
});

describe('PTY', () => {
const CWD = process.cwd();

Expand All @@ -20,15 +56,24 @@ describe('PTY', () => {
},
);

const readStream = fs.createReadStream('', { fd: pty.fd });
const readStream = fs.createReadStream('', { fd: pty.fd() });

readStream.on('data', (chunk) => {
expect(chunk.toString()).toBe(message + '\r\n');
});
readStream.on('close', () => {
pty.close();
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test('captures an exit code', (done) => {
new Pty(
let pty = new Pty(
'/bin/sh',
['-c', 'exit 17'],
{},
Expand All @@ -37,46 +82,54 @@ describe('PTY', () => {
(err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(17);
pty.close();
done();
},
);
});

test('can be written to', (done) => {
const message = 'hello cat';
// The message should end in newline so that the EOT can signal that the input has ended and not
// just the line.
const message = 'hello cat\n';

const pty = new Pty(
'/bin/cat',
[],
{},
CWD,
{ rows: 24, cols: 80 },
() => {},
);
const pty = new Pty('/bin/cat', [], {}, CWD, { rows: 24, cols: 80 }, () => {
pty.close();
done();
});

const readStream = fs.createReadStream('', { fd: pty.fd });
const writeStream = fs.createWriteStream('', { fd: pty.fd });
const readStream = fs.createReadStream('', { fd: pty.fd() });
const writeStream = fs.createWriteStream('', { fd: pty.fd() });

readStream.on('data', (chunk) => {
expect(chunk.toString()).toBe(message);
done();
// We have local echo enabled, so we'll read the message twice.
expect(chunk.toString()).toBe(message + message);
});
readStream.on('close', () => {});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});

writeStream.write(message);
writeStream.end(EOT);
writeStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test('can be resized', (done) => {
const pty = new Pty(
'/bin/sh',
[],
{},
CWD,
{ rows: 24, cols: 80 },
() => {},
);
const pty = new Pty('/bin/sh', [], {}, CWD, { rows: 24, cols: 80 }, () => {
done();
});

const readStream = fs.createReadStream('', { fd: pty.fd });
const writeStream = fs.createWriteStream('', { fd: pty.fd });
const readStream = fs.createReadStream('', { fd: pty.fd() });
const writeStream = fs.createWriteStream('', { fd: pty.fd() });

let buffer = '';

Expand All @@ -92,11 +145,26 @@ describe('PTY', () => {

if (buffer.includes('done2\r\n')) {
expect(buffer).toContain('60 100');
done();
}
});
readStream.on('close', () => {
pty.close();
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});

writeStream.write("stty size; echo 'done1'\n");
writeStream.end(EOT);
writeStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test('respects working directory', (done) => {
Expand All @@ -113,14 +181,23 @@ describe('PTY', () => {
},
);

const readStream = fs.createReadStream('', { fd: pty.fd });
const readStream = fs.createReadStream('', { fd: pty.fd() });

readStream.on('data', (chunk) => {
expect(chunk.toString()).toBe(`${CWD}\r\n`);
});
readStream.on('close', () => {
pty.close();
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test.skip('respects env', (done) => {
test('respects env', (done) => {
const message = 'hello from env';
let buffer = '';

Expand All @@ -141,39 +218,46 @@ describe('PTY', () => {
},
);

const readStream = fs.createReadStream('', { fd: pty.fd });
const readStream = fs.createReadStream('', { fd: pty.fd() });

readStream.on('data', (chunk) => {
buffer += chunk.toString();
});
readStream.on('close', () => {
pty.close();
});
readStream.on('error', (err: any) => {
if (err.code && err.code.indexOf('EIO') !== -1) {
return;
}
throw err;
});
});

test('works with Bun.read & Bun.write', (done) => {
const message = 'hello bun';
const message = 'hello bun\n';

const pty = new Pty(
'/bin/cat',
[],
{},
CWD,
{ rows: 24, cols: 80 },
() => {},
);
const pty = new Pty('/bin/cat', [], {}, CWD, { rows: 24, cols: 80 }, () => {
pty.close();
done();
});

const file = Bun.file(pty.fd);
const file = Bun.file(pty.fd());

async function read() {
const stream = file.stream();

for await (const chunk of stream) {
expect(Buffer.from(chunk).toString()).toBe(message);
done();
// We have local echo enabled, so we'll read the message twice. Furthermore, the newline
// is converted to `\r\n` in this method.
expect(Buffer.from(chunk).toString()).toBe(
'hello bun\r\nhello bun\r\n',
);
break;
}
}

read();

Bun.write(pty.fd, message);
Bun.write(pty.fd(), message + EOT + EOT);
});

test("doesn't break when executing non-existing binary", (done) => {
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
"main": "index.js",
"types": "index.d.ts",
"author": "Szymon Kaliski <[email protected]>",
"repository": {
"type": "git",
"url": "https://github.com/replit/ruspty.git"
"repository": {
"type": "git",
"url": "https://github.com/replit/ruspty.git"
},
"homepage": "https://github.com/replit/ruspty#readme",
"bugs": {
Expand Down
Loading

0 comments on commit dcf208b

Please sign in to comment.