diff --git a/packages/xrpc-server/package.json b/packages/xrpc-server/package.json index d9a57af7254..7df538dbe1f 100644 --- a/packages/xrpc-server/package.json +++ b/packages/xrpc-server/package.json @@ -28,13 +28,13 @@ "@atproto/common": "workspace:^", "@atproto/crypto": "workspace:^", "@atproto/lexicon": "workspace:^", - "axios": "^0.27.2", "cbor-x": "^1.5.1", "express": "^4.17.2", "http-errors": "^2.0.0", "mime-types": "^2.1.35", "rate-limiter-flexible": "^2.4.1", "uint8arrays": "3.0.0", + "undici": "^5.27.2", "ws": "^8.12.0", "zod": "^3.21.4" }, diff --git a/packages/xrpc-server/src/server.ts b/packages/xrpc-server/src/server.ts index 9c0cd2dd9c1..aa677437180 100644 --- a/packages/xrpc-server/src/server.ts +++ b/packages/xrpc-server/src/server.ts @@ -264,12 +264,7 @@ export class Server { if (isHandlerPassthru(outputUnvalidated)) { const { passthru } = outputUnvalidated - assert.ok( - passthru.statusCode && passthru.statusMessage, - 'Missing status: passthru response must originate from a ClientRequest.', - ) res.statusCode = passthru.statusCode - res.statusMessage = passthru.statusMessage if (!res.headersSent) { const hopByHop = getHopByHopHeaders(passthru.headers.connection) for (const [name, value] of Object.entries(passthru.headers)) { @@ -278,7 +273,7 @@ export class Server { } } } - return pipePassthru(passthru, req, res, next) + return pipePassthru(passthru.body, req, res, next) } if (isHandlerError(outputUnvalidated)) { diff --git a/packages/xrpc-server/src/types.ts b/packages/xrpc-server/src/types.ts index adcd0eff267..f1255c92d5e 100644 --- a/packages/xrpc-server/src/types.ts +++ b/packages/xrpc-server/src/types.ts @@ -1,3 +1,4 @@ +import { Readable } from 'node:stream' import { IncomingMessage } from 'http' import express from 'express' import { isHttpError } from 'http-errors' @@ -53,7 +54,13 @@ export const handlerError = zod.object({ }) export type HandlerError = zod.infer -export type HandlerPassthru = { passthru: IncomingMessage } +export type HandlerPassthru = { + passthru: { + body: Readable + statusCode: number + headers: Record + } +} export type HandlerOutput = HandlerSuccess | HandlerError | HandlerPassthru @@ -214,9 +221,11 @@ export function isHandlerError(v: unknown): v is HandlerError { export function isHandlerPassthru(v: unknown): v is HandlerPassthru { return ( - typeof v === 'object' && - v !== null && - v['passthru'] instanceof IncomingMessage + !!v?.['passthru'] && + v['passthru']['body'] instanceof Readable && + typeof v['passthru']['statusCode'] === 'number' && + typeof v['passthru']['headers'] === 'object' && + v['passthru']['headers'] !== null ) } diff --git a/packages/xrpc-server/src/util.ts b/packages/xrpc-server/src/util.ts index 963d1177e5a..dfcaa170052 100644 --- a/packages/xrpc-server/src/util.ts +++ b/packages/xrpc-server/src/util.ts @@ -1,5 +1,5 @@ import assert from 'assert' -import axios, { AxiosError } from 'axios' +import undici, { Dispatcher } from 'undici' import { IncomingMessage } from 'http' import { Readable, Transform } from 'stream' import { createDeflate, createGunzip } from 'zlib' @@ -328,31 +328,40 @@ export async function proxy( } } try { - const result = await axios.request({ - responseType: 'stream', - method: ctx.req.method, - baseURL: host, - url: ctx.req.url, + const url = new URL(ctx.req.url ?? '', host) + const result = await undici.request(url, { + method: ctx.req.method?.toUpperCase() as + | Dispatcher.HttpMethod + | undefined, headers, - data: payload, - validateStatus: (status) => status < 500, - timeout: opts?.timeout, - decompress: false, + body: payload, + bodyTimeout: opts?.timeout, + headersTimeout: opts?.timeout, }) - return { passthru: result.data } + if (result.statusCode >= 500) { + throw new UpstreamFailureError() + } + return { passthru: result } } catch (err) { - if (err instanceof AxiosError) { - if (err.code === 'ECONNABORTED') { + if (err instanceof undici.errors.UndiciError) { + if ( + err?.['code'] === 'UND_ERR_CONNECT_TIMEOUT' || + err?.['code'] === 'UND_ERR_HEADERS_TIMEOUT' || + err?.['code'] === 'UND_ERR_BODY_TIMEOUT' + ) { throw new UpstreamTimeoutError() } else { throw new UpstreamFailureError() } + } else if (err?.['code'] === 'ECONNREFUSED') { + throw new UpstreamFailureError() + } else { + throw err } - throw err } } -export function getHopByHopHeaders(connectionHeader?: string) { +export function getHopByHopHeaders(connectionHeader: string | string[] = '') { const hopByHop = new Set([ 'connection', 'keep-alive', @@ -363,7 +372,11 @@ export function getHopByHopHeaders(connectionHeader?: string) { 'transfer-encoding', 'upgrade', ]) - const additional = (connectionHeader ?? '').split(/\s*,\s*/) + const connectionHeaderStr = + typeof connectionHeader === 'string' + ? connectionHeader + : connectionHeader.join(',') + const additional = connectionHeaderStr.split(/\s*,\s*/) additional.forEach((header) => hopByHop.add(header.toLowerCase())) return hopByHop } diff --git a/packages/xrpc-server/tests/proxy.test.ts b/packages/xrpc-server/tests/proxy.test.ts index a5c7d9d15ac..5c1d136f319 100644 --- a/packages/xrpc-server/tests/proxy.test.ts +++ b/packages/xrpc-server/tests/proxy.test.ts @@ -1,17 +1,18 @@ +import assert from 'node:assert' import * as http from 'http' import getPort from 'get-port' import { wait } from '@atproto/common' import xrpc, { ServiceClient } from '@atproto/xrpc' +import { cidForCbor } from '@atproto/common' +import { XRPCError } from '@atproto/xrpc' +import { LexiconDoc } from '@atproto/lexicon' import { createServer, closeServer } from './_util' import * as xrpcServer from '../src' import { Readable } from 'stream' import { randomBytes } from 'crypto' import { gzipSync } from 'zlib' -import { cidForCbor } from '@atproto/common' -import { XRPCError } from '@atproto/xrpc' -import assert from 'assert' -const LEXICONS = [ +const LEXICONS: LexiconDoc[] = [ { lexicon: 1, id: 'io.example.params', diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1add2e92ee5..326668cfcd1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -691,9 +691,6 @@ importers: '@atproto/lexicon': specifier: workspace:^ version: link:../lexicon - axios: - specifier: ^0.27.2 - version: 0.27.2 cbor-x: specifier: ^1.5.1 version: 1.5.1 @@ -712,6 +709,9 @@ importers: uint8arrays: specifier: 3.0.0 version: 3.0.0 + undici: + specifier: ^5.27.2 + version: 5.27.2 ws: specifier: ^8.12.0 version: 8.12.0 @@ -4629,6 +4629,11 @@ packages: - supports-color dev: true + /@fastify/busboy@2.0.0: + resolution: {integrity: sha512-JUFJad5lv7jxj926GPgymrWQxxjPYuJNiNjNMzqT+HiuP6Vl3dk5xzG+8sTX96np0ZAluvaMzPsjhHZ5rNuNQQ==} + engines: {node: '>=14'} + dev: false + /@fastify/deepmerge@1.3.0: resolution: {integrity: sha512-J8TOSBq3SoZbDhM9+R/u77hP93gz/rajSA+K2kGyijPpORPWUXHUpTaleoj+92As0S9uPRP7Oi8IqMf0u+ro6A==} @@ -10989,6 +10994,13 @@ packages: which-boxed-primitive: 1.0.2 dev: true + /undici@5.27.2: + resolution: {integrity: sha512-iS857PdOEy/y3wlM3yRp+6SNQQ6xU0mmZcwRSriqk+et/cwWAtwmIGf6WkoDN2EK/AMdCO/dfXzIwi+rFMrjjQ==} + engines: {node: '>=14.0'} + dependencies: + '@fastify/busboy': 2.0.0 + dev: false + /unicode-canonical-property-names-ecmascript@2.0.0: resolution: {integrity: sha512-yY5PpDlfVIU5+y/BSCxAJRBIS1Zc2dDG3Ujq+sR0U+JjUevW2JhocOF+soROYDSaAezOzOKuyyixhD6mBknSmQ==} engines: {node: '>=4'}