From 06c940a81da83143bbed16c94cf6b7e27b6f7625 Mon Sep 17 00:00:00 2001 From: lhchavez Date: Mon, 6 May 2024 01:27:07 -0700 Subject: [PATCH] Fix a race between process death and stdio read (#8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously we used to close the fd immediately after the process was terminated. That made lifetimes simple to reason about, but we accidentally introduced a very subtle bug: If bun hadn't read the entirety of the stdout/stderr of the child process, since Rust would close the file from under it, Bun would not be able to read the rest of the output, truncating it! This change now adds an explicit `close()` and `fd()` functions so that the fd management becomes way more explicit than before. Tests were also updated to ensure that there are no leaked FDs, since now we lost the guarantee that everything was going to be cleaned up properly. ```shell ~/ruspty$ bun run build && bun test $ napi build --platform --release Compiling replit_ruspty v1.0.0 (/home/lhchavez/ruspty) Finished release [optimized] target(s) in 15.19s bun test v1.0.26 (c75e768a) index.test.ts: ✓ PTY > spawns and exits [11.96ms] ✓ PTY > captures an exit code [7.81ms] ✓ PTY > can be written to [10.44ms] ✓ PTY > can be resized [9.45ms] ✓ PTY > respects working directory [7.24ms] ✓ PTY > respects env [110.81ms] ✓ PTY > works with Bun.read & Bun.write [15.14ms] ✓ PTY > doesn't break when executing non-existing binary [3.73ms] 8 pass 0 fail 11 expect() calls Ran 8 tests across 1 files. [334.00ms] ``` --------- Co-authored-by: Szymon Kaliski --- bun.lockb | Bin 21351 -> 24918 bytes flake.nix | 1 + index.d.ts | 68 +++++++++++++- index.test.ts | 227 +++++++++++++++++++++++++++++++++++---------- package.json | 6 +- src/lib.rs | 251 +++++++++++++++++++++++++++++++++++--------------- 6 files changed, 427 insertions(+), 126 deletions(-) diff --git a/bun.lockb b/bun.lockb index 941803e574b9368b6dfda0d7ceb522a8727cf41f..60ed448755ea85c7e16d4aa032f0f5891896b035 100755 GIT binary patch delta 6163 zcmeHLeNnBB*8n#{D8FFsr^{qwA zzVKFmFjtV9W1}nDb7-WG@I8AALfyPa%4K4TCH_$5__$+WF z@KNA?z`KEsz)LIL)zzi$%0-|9L9f;bLJ;sz6k4Yh1cL{RN_WZp>g3WYHyGTpm<7Ob zeuB^k*u7$@dw#Vbw1L)x9_R1FIl$b1era-P`N9fe{^F9dhhaQTCkX7w;*zT5g=Hm+ zRJ*sqXS?HJBmg)K`W`mm1%qdn2QGVd0W^Dd6?A{#u|lKJi`L8!rKOdkT~35DVTQ+2`U7Yf`SD0pNK|JBUZ0sfT-Eif1YLqP}= z8@>V}7PuJ`_k~CDevpxPB|A6B|t&jd@%`Z+}HqY4SkZy^FT3ENhItoV+DNj3;QZ+im5|9|a6L)EVJ_yW- zU6|Z8ToQzt-MGuZ9p8=H2yS5)?%oi6P%lA1-E|s^&AP#z(2e^$a38#%EA`QkO=Fh| zG*kn$K|=?C&S=OB6s9Gc)-Em8QjONG--YtELtjg)v>DPxEqNgu;YT(|9%U`zjq>gMlyL3=TH9&rP zIso*bp1eA{b_4R8MR#D&FMw+Fc4=Aw9RS)AKwiCFuSLBU!B8(ctj~~U22xFcUAqrV zr#zcHnS<$o!LDBrEC`wC(n}t83n)2Ds^a$}4t56K9DkXg zrSLmriSC4kdsrK#Ew|2?opAuznNa}lhhmUhXUu#Qf-E2F6AO%4uRx(YV-0wd6h5#= zV8&F1!I%v`sL(2A@_&=XhkXP72Wgb2#A)RT{y&RdvKaO6vq%wPB_54#lM*~Rl6YwT z9DIGkh_4EUy;ftI^ZXwzuRP%myFSu!^bZ>z7kg=hg~!Isdg6G+k;uwV>NII$Uh}#7 zBy-Tb?W;beQD<&Pk6L~dnZQq12U}dOYSZ){H>lvyuO>ws_kJQwu6<$pjuW2a9&7OT zF)<&{yl|sB|9E>~VcyMQR~l;)RwW)Pe7-X0*h|la-%LE=9(`%e`n4CRlVPr7wP{W^ z@BU@rjqme~2?b5JYbSiPtnFtPr_U(;hp{a5)i`g}Tjy${ZqznjU1#3$MRH=Hwr_o4 z`I-%19-8GCa>p}mqqMw0G0f%0b+0x}e8EFsp9^1i>SFM~lgmn?qeGkXD`)TD_`%I% z8$Q?^bvY~V$ci7oE;_%c^@TOk-No-!K0YIV()eTMvF8`QxH2|u`^-*u`JL)Na$R|HQPui)-)}j*{e1KG^ZR$6n3a*AkkB`A zQ^1HF-ygMU+SkkpzIe;|@K0k`|8ko2U4HvJ^vQcaO#k`l z*p;`l7EQl7cfo|wSHqW-zgh1c`2FF%DO%6Ork@24mo&ZO4TXcP@4ceTn4`ceTWy+` zkG!zi?rvyoIr>Aq|AqL2DQDYiBejoRj}HvqIc=fqw~Yti8kDXXyY>A)msT8`^U#O1 z?u$85oAQDOSc+}+)rQrb?DF?sZ5q$CChL?5Up*R@G5qneZ@0bD)K?!+ zolDRBDSyWKy=7k)=AT}3&9d!{H%=Y5x4R#yjf z=04px^rL4-k9qLy-4MUx{j}a_r1fD=F_1nFbJ6$+1LZ#86brI9W=2kI24 zffNztqGw_ZR2$_K2hoS1GGh&t9PJbb(__&tx&-P=P(x^l*+nl7G|+mpQyfa4gBl+P z|6-hCGCdmu|3I~YN};S+_%{gt#X7|_x(uo~9{vqfEU z1f5PqRE(6{L$ zapDo$@A?|^GXN-;d;fnb%^xZCA6orq=I@L8zsvv5tj9IRXE^SUEtmXX%FBA3@}8?; zUXc!fhZXpRm3cMg1DKZwU>>go2Y`*RjCowd{LVb}4<08Q`A?n=j|Ol*Y$)Y-wlPNG zC9q*M2OAIs8kH4EQ5k-epBd_5UH-md(cs{JlY5z)>9xNB|@PIJzA5 zIKV)_SipDy@1b~Kl?@mKzy?_02*&_83mkO^fPH3vJHK;~ICv@^!>)6bmU2(Dw?~mZ zE0zmHw#7BV^J3>Y5F8K@z`@}laj-amoD>a!t3v{Cz&IcrFb*sSn76RJ-Q>A)m9T&8 z8wZU8!uC0+2(CwtGH+g#Q=uGXoPYV0FB{~5u*0kr4PZqxAQr$f-e9s(4i?LJ1H^&l z32~4)aPfd403L?}J`})#O&5xN>EpCn0csjKg(Co&0A3fIN;b}!V2{{1)0`2NW_*c5 zj+Qf#HrV3nZQB-)dc&cfawTz2nkCHwi@VVgM(Wh_t434=!cj;NtUC3C+aWm%2_jRc zo`E|gHy}Yk>(o=SBH;w9mniB%xJsY9TN2-Mm>yoNYtwu^|;<4S&hO#J=Ezd zaz3~+wf>+aJ~4t8XPYIRmj0I4&o9-IDimw!wOpg**3!G#R+D<`61`Pw9Ul7TMFc0+ zV$HB*3cLL%D90!@`B8bEQEK)hM~*RFy%VV`ZmLUZINpNp)+|c~%&ONlH7o6zzxKL% zbx6|uX|LT#J8~vVIsVjcH%jCDDJ0iuQtvq?ryTipMDx*pP#KQIpajx%G&RUL@v?_EPc1sV8M`Q+djAa*Q-#WPg)- z*YxFYih{N;7_vo~HdaS<5Y>&en$+8{qPf@ZdY>z~DR&sgA*nIY>5;~C_4Y|qY@K$> zui<@k7~zYVdLNaTGXBR~v-(|!1c~EtxAZ2H!^$O5=rEepYp4y|-YB|Ldxrf&QY=`F zNXt`>9@CO$$zns@)52NdwS6Uou3?CvK4i|bn$#PrflF@x^y$0_bJz=C87bwagyiMC z{?gh|dItsAMW9dt&rFn4C7hT@N%;;C_pkLz$dRVH<)>Gf=RUmn2;?hR<+ar<=leFq z{r!IVEyduwb7^^gEIzOFLjDSWXj$AV+_cd-fWIaw$tidy%iju1^`aU_#d3GK%vw-V z`Dkf*a!E<${Ke_RyYW}_%uAkEQspiyEq8Y)fO=#_cROXJ)0yAJj;||NJ($5{7F0Z1 z)m5Pr^PUR5KE-6e=SELG6sLO<`65Bv3d~78ndm`N4jn7l?kRM4L(%Il=ol}dyLvaR zdt;^E*ICI@7X^jaF+$EgB_+3=1Ffj`An7oW*n@%YdiN%oet-9_v+P=6zEQXY_XUia G9r8~qNia?T delta 3782 zcmd54$+jjkxuI!yZ9{lRa@ZXR8?LvQi z%g})jg0-c)vZKFWZ|K%M6)R4>-;`2P*Brx`*gf1gIy%rdyaQc+=$C-mz%6Ng%rp!O z`fy*oceHL`i$19BrlmprJ7j{R0$Hq{e` z__A#EnjsOubwefr?WQCFPn+WN*y;mQA|Bg24+*g`+b!!n&1#d9B*qRX@p*0cd5kR; z%hTrGfi>HBr+)5|q}NuTxy0wQRcnSs0J}3}(r0^LMUL)*uPp8BpBduw+p5+r5x}Ev znFO42OVV$W%(1xYJy_y}8(l z?zJ~)JIWo2&CdZ+jgLb=Am&T;O*ply-M$NhM9<)G-B`X$2%{%0og)#Xxw2z?`$g;AS zJhs4+m&&5%e7OLvs??ITWTccEQ{RxI5WG5PjFOAc2=%>{A=T1%NF_g6;E zg>tYmCVz!yRYlEm*<2Nqz2%m?4y{tW)iLR+u;iiYs97yD&@MwOtcjY7WS|D|RU$rU zbyBbh@l_$dMNzX}PDArlBfi?G*&t)Jh!5I1XpK@`hxlp`UtQE(D(}_B%qD3F^~#Bi z4C$%&c!qHp@=z0x9cldBrF*wDdOUkj#@H)Uflk@yFF$sovEJt^;Sr{NizHXBOkBI3 zYU~O@c-Gn>Y>v)@U!i+$6cnCir#P^va30SszhP+K0HF=XKi zlUYEQr8-&dwL`pb^f;YxjCQPQyl0miD-wHhQ6l{R3qKjPRD3V=m5>P zT-L8DlBzrN6I`CAbivw;*Mb55NFYBj^s-!rZrdSuLdZ!aqzY04Sp=zrkhi%Id;#hA zKmo7=LcWS1#gGLM^2P#<5|Utn*F|zAune*g!s^WW&gx9^_;Mh@B$Na)9ELLcp7E0x z2N^r%fppAIaS+Y7{A-P2yJShp$iUb8hGgrioCIC-Y2ZGYOXiVOGG~mPl#?TJMV^=n zZi~5QEQmw%!`L~O{E--68Yt){V{Zeme7;F1cehx9*N^%IfPEh3&-e? zByvL}lpL|lmO!`=$t{JDLnefYVuGq6PQsY1S_l)s@eo6xbCS-8=$tyqcXZjKLGn~S zMwWycum$`E$kxtq(D~}$_HfyXc7JY*p1L>$GG^TJN@qZ&Lu<*DKSA4&DW7(R)y_-_ zb%le@FTtS;6C*8utUskpW`b=?mOS1S2s-};dk@rn{O`YPzj(u7$Rn@9!1>Ae$HPUR zKYn!H!Bn74*!qM={@fK#M;LVe9WFlfi+!8UuX|I+u{eY$bj*~V?y!2%BNN>L^`S?; z5AC>1e$yRja{e$@TsmWT}r703_ zx(R=cU=|*nzmI2saCGC<@wYa@19%PLe1X~hk()Bx5W?f4nf(Y$+-FH { + * pty.close(); + * // TODO: Handle process exit. + * }); + * + * const read = new fs.createReadStream('', { + * fd: pty.fd(), + * start: 0, + * highWaterMark: 16 * 1024, + * autoClose: true, + * }); + * const write = new fs.createWriteStream('', { + * fd: pty.fd(), + * autoClose: true, + * }); + * + * read.on('data', (chunk) => { + * // TODO: Handle data. + * }); + * read.on('error', (err) => { + * if (err.code && err.code.indexOf('EIO') !== -1) { + * // This is expected to happen when the process exits. + * return; + * } + * // TODO: Handle the error. + * }); + * write.on('error', (err) => { + * if (err.code && err.code.indexOf('EIO') !== -1) { + * // This is expected to happen when the process exits. + * return; + * } + * // TODO: Handle the error. + * }); + * ``` + */ export class Pty { - fd: number + /** The pid of the forked process. */ pid: number constructor(command: string, args: Array, envs: Record, dir: string, size: Size, onExit: (err: null | Error, exitCode: number) => void) + /** Resize the terminal. */ resize(size: Size): void + /** + * Returns a file descriptor for the PTY controller. If running under node, it will dup the file + * descriptor, but under bun it will return the same file desciptor, since bun does not close + * the streams by itself. Maybe that is a bug in bun, so we should confirm the new behavior + * after we upgrade. + * + * See the docstring of the class for an usage example. + */ + fd(): c_int + /** + * Close the PTY file descriptor. This must be called when the readers / writers of the PTY have + * been closed, otherwise we will leak file descriptors! + * + * In an ideal world, this would be automatically called after the wait loop is done, but Node + * doesn't like that one bit, since it implies that the file is closed outside of the main + * event loop. + */ + close(): void } diff --git a/index.test.ts b/index.test.ts index 6ad32e0..16e23cf 100644 --- a/index.test.ts +++ b/index.test.ts @@ -1,11 +1,68 @@ import fs from 'fs'; +import os from 'node:os'; +import { readdir, readlink } from 'node:fs/promises'; import { Pty } from './index'; +import assert from 'assert'; + +const EOT = '\x04'; +const procSelfFd = '/proc/self/fd/'; +const previousFDs: Record = {}; + +function macOSLinuxCatBufferCompare( + b1: Buffer | Uint8Array, + b2: Buffer | Uint8Array, +) { + // macOS leaves these bytes when using `cat`, which linux does not + // + // so to be sure we drop them when comparing the expected to the received + // buffer, we'll remove them from the output first, in a super crude manner + const macOSStrayBytes = ',94,68,8,8'; + + const a1 = Array.from(b1).toString().replaceAll(macOSStrayBytes, ''); + const a2 = Array.from(b2).toString().replaceAll(macOSStrayBytes, ''); + + return a1 === a2; +} + +// These two functions ensure that there are no extra open file descriptors after each test +// finishes. Only works on Linux. +if (os.type() !== 'Darwin') { + beforeEach(async () => { + for (const filename of await readdir(procSelfFd)) { + try { + previousFDs[filename] = await readlink(procSelfFd + filename); + } catch (err: any) { + if (err.code === 'ENOENT') { + continue; + } + throw err; + } + } + }); + afterEach(async () => { + for (const filename of await readdir(procSelfFd)) { + try { + const linkTarget = await readlink(procSelfFd + filename); + if (linkTarget === 'anon_inode:[timerfd]') { + continue; + } + expect(previousFDs).toHaveProperty(filename, linkTarget); + } catch (err: any) { + if (err.code === 'ENOENT') { + continue; + } + throw err; + } + } + }); +} describe('PTY', () => { const CWD = process.cwd(); test('spawns and exits', (done) => { const message = 'hello from a pty'; + let buffer = ''; const pty = new Pty( '/bin/echo', @@ -16,19 +73,30 @@ describe('PTY', () => { (err, exitCode) => { expect(err).toBeNull(); expect(exitCode).toBe(0); + expect(buffer).toBe(message + '\r\n'); + pty.close(); + done(); }, ); - const readStream = fs.createReadStream('', { fd: pty.fd }); + const readStreamFd = pty.fd(); + const readStream = fs.createReadStream('', { fd: readStreamFd }); readStream.on('data', (chunk) => { - expect(chunk.toString()).toBe(message + '\r\n'); + buffer += chunk.toString(); + }); + readStream.on('error', (err: any) => { + if (err.code && err.code.indexOf('EIO') !== -1) { + return; + } + console.log('err', { err }); + throw err; }); }); test('captures an exit code', (done) => { - new Pty( + let pty = new Pty( '/bin/sh', ['-c', 'exit 17'], {}, @@ -37,46 +105,66 @@ describe('PTY', () => { (err, exitCode) => { expect(err).toBeNull(); expect(exitCode).toBe(17); + pty.close(); + done(); }, ); }); test('can be written to', (done) => { - const message = 'hello cat'; + // The message should end in newline so that the EOT can signal that the input has ended and not + // just the line. + const message = 'hello cat\n'; + let buffer: Buffer | undefined; + + const result = Buffer.from([ + 104, 101, 108, 108, 111, 32, 99, 97, 116, 13, 10, 104, 101, 108, 108, 111, + 32, 99, 97, 116, 13, 10, + ]); + + const pty = new Pty('/bin/cat', [], {}, CWD, { rows: 24, cols: 80 }, () => { + // We have local echo enabled, so we'll read the message twice. + assert(buffer); + expect(macOSLinuxCatBufferCompare(buffer, result)).toBe(true); + pty.close(); - const pty = new Pty( - '/bin/cat', - [], - {}, - CWD, - { rows: 24, cols: 80 }, - () => {}, - ); + done(); + }); - const readStream = fs.createReadStream('', { fd: pty.fd }); - const writeStream = fs.createWriteStream('', { fd: pty.fd }); + const readStream = fs.createReadStream('', { fd: pty.fd() }); + const writeStream = fs.createWriteStream('', { fd: pty.fd() }); readStream.on('data', (chunk) => { - expect(chunk.toString()).toBe(message); - done(); + assert(Buffer.isBuffer(chunk)); + buffer = chunk; + }); + readStream.on('error', (err: any) => { + if (err.code && err.code.indexOf('EIO') !== -1) { + return; + } + throw err; }); writeStream.write(message); + writeStream.end(EOT); + writeStream.on('error', (err: any) => { + if (err.code && err.code.indexOf('EIO') !== -1) { + return; + } + throw err; + }); }); test('can be resized', (done) => { - const pty = new Pty( - '/bin/sh', - [], - {}, - CWD, - { rows: 24, cols: 80 }, - () => {}, - ); + const pty = new Pty('/bin/sh', [], {}, CWD, { rows: 24, cols: 80 }, () => { + pty.close(); - const readStream = fs.createReadStream('', { fd: pty.fd }); - const writeStream = fs.createWriteStream('', { fd: pty.fd }); + done(); + }); + + const readStream = fs.createReadStream('', { fd: pty.fd() }); + const writeStream = fs.createWriteStream('', { fd: pty.fd() }); let buffer = ''; @@ -92,14 +180,28 @@ describe('PTY', () => { if (buffer.includes('done2\r\n')) { expect(buffer).toContain('60 100'); - done(); } }); + readStream.on('error', (err: any) => { + if (err.code && err.code.indexOf('EIO') !== -1) { + return; + } + throw err; + }); writeStream.write("stty size; echo 'done1'\n"); + writeStream.end(EOT); + writeStream.on('error', (err: any) => { + if (err.code && err.code.indexOf('EIO') !== -1) { + return; + } + throw err; + }); }); test('respects working directory', (done) => { + let buffer = ''; + const pty = new Pty( '/bin/pwd', [], @@ -109,24 +211,33 @@ describe('PTY', () => { (err, exitCode) => { expect(err).toBeNull(); expect(exitCode).toBe(0); + expect(buffer).toBe(`${CWD}\r\n`); + pty.close(); + done(); }, ); - const readStream = fs.createReadStream('', { fd: pty.fd }); + const readStream = fs.createReadStream('', { fd: pty.fd() }); readStream.on('data', (chunk) => { - expect(chunk.toString()).toBe(`${CWD}\r\n`); + buffer += chunk.toString(); + }); + readStream.on('error', (err: any) => { + if (err.code && err.code.indexOf('EIO') !== -1) { + return; + } + throw err; }); }); - test.skip('respects env', (done) => { + test('respects env', (done) => { const message = 'hello from env'; - let buffer = ''; + let buffer: Buffer | undefined; const pty = new Pty( '/bin/sh', - ['-c', 'sleep 0.1s && echo $ENV_VARIABLE && exit'], + ['-c', 'echo $ENV_VARIABLE && exit'], { ENV_VARIABLE: message, }, @@ -135,45 +246,61 @@ describe('PTY', () => { (err, exitCode) => { expect(err).toBeNull(); expect(exitCode).toBe(0); - expect(buffer).toBe(message + '\r\n'); + assert(buffer); + expect(Buffer.compare(buffer, Buffer.from(message + '\r\n'))).toBe(0); + pty.close(); done(); }, ); - const readStream = fs.createReadStream('', { fd: pty.fd }); + const readStream = fs.createReadStream('', { fd: pty.fd() }); readStream.on('data', (chunk) => { - buffer += chunk.toString(); + assert(Buffer.isBuffer(chunk)); + buffer = chunk; + }); + readStream.on('error', (err: any) => { + if (err.code && err.code.indexOf('EIO') !== -1) { + return; + } + throw err; }); }); test('works with Bun.read & Bun.write', (done) => { - const message = 'hello bun'; + const message = 'hello bun\n'; + let buffer: Uint8Array | undefined; - const pty = new Pty( - '/bin/cat', - [], - {}, - CWD, - { rows: 24, cols: 80 }, - () => {}, - ); + const result = new Uint8Array([ + 104, 101, 108, 108, 111, 32, 98, 117, 110, 13, 10, 104, 101, 108, 108, + 111, 32, 98, 117, 110, 13, 10, + ]); + + const pty = new Pty('/bin/cat', [], {}, CWD, { rows: 24, cols: 80 }, () => { + // We have local echo enabled, so we'll read the message twice. Furthermore, the newline + // is converted to `\r\n` in this method. + assert(buffer !== undefined); + expect(macOSLinuxCatBufferCompare(buffer, result)).toBe(true); + pty.close(); + + done(); + }); - const file = Bun.file(pty.fd); + const file = Bun.file(pty.fd()); async function read() { const stream = file.stream(); - for await (const chunk of stream) { - expect(Buffer.from(chunk).toString()).toBe(message); - done(); + buffer = chunk; + // TODO: For some reason, Bun's stream will raise the EIO somewhere where we cannot catch + // it, and make the test fail no matter how many try / catch blocks we add. + break; } } read(); - - Bun.write(pty.fd, message); + Bun.write(pty.fd(), message + EOT + EOT); }); test("doesn't break when executing non-existing binary", (done) => { diff --git a/package.json b/package.json index 8bfb8cb..9318716 100644 --- a/package.json +++ b/package.json @@ -4,9 +4,9 @@ "main": "index.js", "types": "index.d.ts", "author": "Szymon Kaliski ", - "repository": { - "type": "git", - "url": "https://github.com/replit/ruspty.git" + "repository": { + "type": "git", + "url": "https://github.com/replit/ruspty.git" }, "homepage": "https://github.com/replit/ruspty#readme", "bugs": { diff --git a/src/lib.rs b/src/lib.rs index a7ff449..acee8f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,33 +1,77 @@ -use libc::{self, c_int, TIOCSCTTY}; -use napi::bindgen_prelude::JsFunction; -use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}; -use napi::Error as NAPI_ERROR; -use napi::Status::GenericFailure; -use rustix_openpty::openpty; -use rustix_openpty::rustix::termios::Winsize; -use rustix_openpty::rustix::termios::{self, InputModes, OptionalActions}; use std::collections::HashMap; -use std::fs::File; use std::io::Error; use std::io::ErrorKind; -use std::os::fd::AsRawFd; -use std::os::fd::FromRawFd; +use std::os::fd::{AsRawFd, OwnedFd}; use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; use std::thread; +use libc::{self, c_int}; +use napi::bindgen_prelude::JsFunction; +use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}; +use napi::Status::GenericFailure; +use napi::{self, Env}; +use rustix_openpty::openpty; +use rustix_openpty::rustix::termios::{self, InputModes, OptionalActions, Winsize}; + #[macro_use] extern crate napi_derive; +/// A very thin wrapper around PTYs and processes. The caller is responsible for calling `.close()` +/// when all streams have been closed. We hold onto both ends of the PTY (controller and user) to +/// prevent reads from erroring out with EIO. +/// +/// This is the recommended usage: +/// +/// ``` +/// const { Pty } = require('replit-ruspy'); +/// const fs = require('node:fs'); +/// +/// const pty = new Pty('sh', [], ENV, CWD, { rows: 24, cols: 80 }, (...result) => { +/// pty.close(); +/// // TODO: Handle process exit. +/// }); +/// +/// const read = new fs.createReadStream('', { +/// fd: pty.fd(), +/// start: 0, +/// highWaterMark: 16 * 1024, +/// autoClose: true, +/// }); +/// const write = new fs.createWriteStream('', { +/// fd: pty.fd(), +/// autoClose: true, +/// }); +/// +/// read.on('data', (chunk) => { +/// // TODO: Handle data. +/// }); +/// read.on('error', (err) => { +/// if (err.code && err.code.indexOf('EIO') !== -1) { +/// // This is expected to happen when the process exits. +/// return; +/// } +/// // TODO: Handle the error. +/// }); +/// write.on('error', (err) => { +/// if (err.code && err.code.indexOf('EIO') !== -1) { +/// // This is expected to happen when the process exits. +/// return; +/// } +/// // TODO: Handle the error. +/// }); +/// ``` #[napi] #[allow(dead_code)] struct Pty { - file: File, - #[napi(ts_type = "number")] - pub fd: c_int, + controller_fd: Option, + user_fd: Option, + should_dup_fds: bool, + /// The pid of the forked process. pub pid: u32, } +/// A size struct to pass to resize. #[napi(object)] struct Size { pub cols: u16, @@ -38,7 +82,7 @@ struct Size { fn set_controlling_terminal(fd: c_int) -> Result<(), Error> { let res = unsafe { #[allow(clippy::cast_lossless)] - libc::ioctl(fd, TIOCSCTTY as _, 0) + libc::ioctl(fd, libc::TIOCSCTTY as _, 0) }; if res != 0 { @@ -49,13 +93,13 @@ fn set_controlling_terminal(fd: c_int) -> Result<(), Error> { } #[allow(dead_code)] -fn set_nonblocking(fd: c_int) -> Result<(), NAPI_ERROR> { +fn set_nonblocking(fd: c_int) -> Result<(), napi::Error> { use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK}; let status_flags = unsafe { fcntl(fd, F_GETFL, 0) }; if status_flags < 0 { - return Err(NAPI_ERROR::new( + return Err(napi::Error::new( napi::Status::GenericFailure, format!("fcntl F_GETFL failed: {}", Error::last_os_error()), )); @@ -64,7 +108,7 @@ fn set_nonblocking(fd: c_int) -> Result<(), NAPI_ERROR> { let res = unsafe { fcntl(fd, F_SETFL, status_flags | O_NONBLOCK) }; if res != 0 { - return Err(NAPI_ERROR::new( + return Err(napi::Error::new( napi::Status::GenericFailure, format!("fcntl F_SETFL failed: {}", Error::last_os_error()), )); @@ -78,13 +122,15 @@ impl Pty { #[napi(constructor)] #[allow(dead_code)] pub fn new( + env: Env, command: String, args: Vec, envs: HashMap, dir: String, size: Size, #[napi(ts_arg_type = "(err: null | Error, exitCode: number) => void")] on_exit: JsFunction, - ) -> Result { + ) -> Result { + let should_dup_fds = env.get_node_version()?.release == "node"; let window_size = Winsize { ws_col: size.cols, ws_row: size.rows, @@ -95,36 +141,43 @@ impl Pty { let mut cmd = Command::new(command); cmd.args(args); - let pty_pair = openpty(None, Some(&window_size)) - .map_err(|err| NAPI_ERROR::new(napi::Status::GenericFailure, err))?; + let rustix_openpty::Pty { + controller: controller_fd, + user: user_fd, + } = openpty(None, Some(&window_size)) + .map_err(|err| napi::Error::new(napi::Status::GenericFailure, err))?; - let fd_controller = pty_pair.controller.as_raw_fd(); - let fd_user = pty_pair.user.as_raw_fd(); - - if let Ok(mut termios) = termios::tcgetattr(&pty_pair.controller) { + if let Ok(mut termios) = termios::tcgetattr(&controller_fd) { termios.input_modes.set(InputModes::IUTF8, true); - termios::tcsetattr(&pty_pair.controller, OptionalActions::Now, &termios) - .map_err(|err| NAPI_ERROR::new(napi::Status::GenericFailure, err))?; + termios::tcsetattr(&controller_fd, OptionalActions::Now, &termios) + .map_err(|err| napi::Error::new(napi::Status::GenericFailure, err))?; } - cmd.stdin(unsafe { Stdio::from_raw_fd(fd_user) }); - cmd.stderr(unsafe { Stdio::from_raw_fd(fd_user) }); - cmd.stdout(unsafe { Stdio::from_raw_fd(fd_user) }); + // The Drop implementation for Command will try to close _each_ stdio. That implies that it + // will try to close the three of them, so if we don't dup them, Rust will try to close the + // same FD three times, and if stars don't align, we might be even closing a different FD + // accidentally. + cmd.stdin(Stdio::from(user_fd.try_clone()?)); + cmd.stderr(Stdio::from(user_fd.try_clone()?)); + cmd.stdout(Stdio::from(user_fd.try_clone()?)); cmd.envs(envs); cmd.current_dir(dir); unsafe { + let raw_user_fd = user_fd.as_raw_fd(); + let raw_controller_fd = controller_fd.as_raw_fd(); cmd.pre_exec(move || { let err = libc::setsid(); if err == -1 { return Err(Error::new(ErrorKind::Other, "Failed to set session id")); } - set_controlling_terminal(fd_user)?; + // stdin is wired to the tty, so we can use that for the controlling terminal. + set_controlling_terminal(0)?; - libc::close(fd_user); - libc::close(fd_controller); + libc::close(raw_user_fd); + libc::close(raw_controller_fd); libc::signal(libc::SIGCHLD, libc::SIG_DFL); libc::signal(libc::SIGHUP, libc::SIG_DFL); @@ -137,20 +190,15 @@ impl Pty { }); } - let ts_on_exit: ThreadsafeFunction = on_exit - .create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))?; - let mut child = cmd .spawn() - .map_err(|err| NAPI_ERROR::new(GenericFailure, err))?; + .map_err(|err| napi::Error::new(GenericFailure, err))?; + // We are marking the pty fd as non-blocking, despite Node's docs suggesting that the fd passed + // to `createReadStream`/`createWriteStream` should be blocking. + set_nonblocking(controller_fd.as_raw_fd())?; let pid = child.id(); - set_nonblocking(fd_controller)?; - - let file = File::from(pty_pair.controller); - let fd = file.as_raw_fd(); - // We're creating a new thread for every child, this uses a bit more system resources compared // to alternatives (below), trading off simplicity of implementation. // @@ -164,44 +212,45 @@ impl Pty { // they are ready to be `wait`'ed. This has the inconvenience that it consumes one FD per child. // // For discussion check out: https://github.com/replit/ruspty/pull/1#discussion_r1463672548 - thread::spawn(move || { - match child.wait() { - Ok(status) => { - if status.success() { - ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking); - } else { - ts_on_exit.call( - Ok(status.code().unwrap_or(-1)), - ThreadsafeFunctionCallMode::Blocking, - ); - } - } - Err(err) => { + let ts_on_exit: ThreadsafeFunction = on_exit + .create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))?; + thread::spawn(move || match child.wait() { + Ok(status) => { + if status.success() { + ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking); + } else { ts_on_exit.call( - Err(NAPI_ERROR::new( - GenericFailure, - format!( - "OS error when waiting for child process to exit: {}", - err.raw_os_error().unwrap_or(-1) - ), - )), + Ok(status.code().unwrap_or(-1)), ThreadsafeFunctionCallMode::Blocking, ); } } - - // Close the fd once we return from `child.wait()`. - unsafe { - rustix::io::close(fd); + Err(err) => { + ts_on_exit.call( + Err(napi::Error::new( + GenericFailure, + format!( + "OS error when waiting for child process to exit: {}", + err.raw_os_error().unwrap_or(-1) + ), + )), + ThreadsafeFunctionCallMode::Blocking, + ); } }); - Ok(Pty { file, fd, pid }) + Ok(Pty { + controller_fd: Some(controller_fd), + user_fd: Some(user_fd), + should_dup_fds, + pid, + }) } + /// Resize the terminal. #[napi] #[allow(dead_code)] - pub fn resize(&mut self, size: Size) -> Result<(), NAPI_ERROR> { + pub fn resize(&mut self, size: Size) -> Result<(), napi::Error> { let window_size = Winsize { ws_col: size.cols, ws_row: size.rows, @@ -209,14 +258,72 @@ impl Pty { ws_ypixel: 0, }; - let res = unsafe { libc::ioctl(self.fd, libc::TIOCSWINSZ, &window_size as *const _) }; + if let Some(fd) = &self.controller_fd { + let res = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ, &window_size as *const _) }; + + if res != 0 { + return Err(napi::Error::new( + napi::Status::GenericFailure, + format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()), + )); + } - if res != 0 { - return Err(NAPI_ERROR::new( + Ok(()) + } else { + Err(napi::Error::new( + napi::Status::GenericFailure, + "ioctl TIOCSWINSZ failed: bad file descriptor (os error 9)", + )) + } + } + + /// Returns a file descriptor for the PTY controller. If running under node, it will dup the file + /// descriptor, but under bun it will return the same file desciptor, since bun does not close + /// the streams by itself. Maybe that is a bug in bun, so we should confirm the new behavior + /// after we upgrade. + /// + /// See the docstring of the class for an usage example. + #[napi] + #[allow(dead_code)] + pub fn fd(&mut self) -> Result { + if let Some(fd) = &self.controller_fd { + if !self.should_dup_fds { + return Ok(fd.as_raw_fd()); + } + let res = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_DUPFD_CLOEXEC, 3) }; + if res < 0 { + return Err(napi::Error::new( + napi::Status::GenericFailure, + format!("fcntl F_DUPFD_CLOEXEC failed: {}", Error::last_os_error()), + )); + } + Ok(res) + } else { + Err(napi::Error::new( + napi::Status::GenericFailure, + "fcntl F_DUPFD_CLOEXEC failed: bad file descriptor (os error 9)", + )) + } + } + + /// Close the PTY file descriptor. This must be called when the readers / writers of the PTY have + /// been closed, otherwise we will leak file descriptors! + /// + /// In an ideal world, this would be automatically called after the wait loop is done, but Node + /// doesn't like that one bit, since it implies that the file is closed outside of the main + /// event loop. + #[napi] + #[allow(dead_code)] + pub fn close(&mut self) -> Result<(), napi::Error> { + let controller_fd = self.controller_fd.take(); + let user_fd = self.user_fd.take(); + if controller_fd.is_none() { + return Err(napi::Error::new( napi::Status::GenericFailure, - format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()), + format!("close failed: {}", libc::EBADF), )); } + drop(user_fd); Ok(()) }