Skip to content

Commit

Permalink
fix(xrpc-server): properly parse & process content-encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed Feb 19, 2024
1 parent e710c21 commit 2b1c66e
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 22 deletions.
14 changes: 14 additions & 0 deletions packages/common/src/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,22 @@ import {
PassThrough,
Transform,
TransformCallback,
Duplex,
} from 'stream'

// TODO: Replace this with Node.JS's compose once it is stable
export const compose = (
readable: Readable,
...transforms: Duplex[]
): Readable => {
let stream: Readable = readable
for (const transform of transforms) {
forwardStreamErrors(stream, transform)
stream = stream.pipe(transform)
}
return stream
}

export const forwardStreamErrors = (...streams: Stream[]) => {
for (let i = 0; i < streams.length; ++i) {
const stream = streams[i]
Expand Down
48 changes: 33 additions & 15 deletions packages/xrpc-server/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import assert from 'assert'
import { Readable } from 'stream'
import { createBrotliDecompress, createDeflate, createGunzip } from 'zlib'
import { Readable, Transform } from 'stream'
import { createBrotliDecompress, createInflate, createGunzip } from 'zlib'
import express from 'express'
import mime from 'mime-types'
import {
Expand All @@ -10,7 +10,7 @@ import {
LexXrpcQuery,
LexXrpcSubscription,
} from '@atproto/lexicon'
import { MaxSizeChecker } from '@atproto/common'
import { compose, MaxSizeChecker } from '@atproto/common'
import {
UndecodedParams,
Params,
Expand Down Expand Up @@ -232,35 +232,53 @@ function decodeBodyStream(
req: express.Request,
maxSize: number | undefined,
): Readable {
let stream: Readable = req
const contentEncoding = req.headers['content-encoding']
const transforms: Transform[] = []

const contentEncoding = req.headers['content-encoding']?.trim()
const contentLength = req.headers['content-length']

const contentLengthParsed = contentLength
? parseInt(contentLength, 10)
: undefined

if (Number.isNaN(contentLengthParsed)) {
throw new XRPCError(400, 'invalid content-length')
}

if (
maxSize !== undefined &&
contentLength &&
parseInt(contentLength, 10) > maxSize
contentLengthParsed !== undefined &&
contentLengthParsed > maxSize
) {
throw new XRPCError(413, 'request entity too large')
}

if (contentEncoding === 'gzip') {
stream = stream.compose(createGunzip())
} else if (contentEncoding === 'deflate') {
stream = stream.compose(createDeflate())
} else if (contentEncoding === 'br') {
stream = stream.compose(createBrotliDecompress())
if (contentEncoding) {
for (const enc of contentEncoding.split(',').reverse()) {
const currentEncoding = enc.trim()
if (currentEncoding === 'identity') continue

if (currentEncoding === 'gzip') {
transforms.push(createGunzip())
} else if (currentEncoding === 'deflate') {
transforms.push(createInflate())
} else if (currentEncoding === 'br') {
transforms.push(createBrotliDecompress())
} else {
throw new XRPCError(415, 'unsupported content-encoding')
}
}
}

if (maxSize !== undefined) {
const maxSizeChecker = new MaxSizeChecker(
maxSize,
() => new XRPCError(413, 'request entity too large'),
)
stream = stream.compose(maxSizeChecker)
transforms.push(maxSizeChecker)
}

return stream
return compose(req, ...transforms)
}

export function serverTimingHeader(timings: ServerTiming[]) {
Expand Down
21 changes: 14 additions & 7 deletions packages/xrpc-server/tests/bodies.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as http from 'http'
import { Readable } from 'stream'
import { gzipSync } from 'zlib'
import { brotliCompressSync, deflateSync, gzipSync } from 'zlib'
import getPort from 'get-port'
import { LexiconDoc } from '@atproto/lexicon'
import xrpc, { ServiceClient } from '@atproto/xrpc'
Expand Down Expand Up @@ -97,10 +97,14 @@ describe('Bodies', () => {
})
server.method(
'io.example.validationTest',
(ctx: { params: xrpcServer.Params; input?: xrpcServer.HandlerInput }) => ({
encoding: 'json',
body: ctx.input?.body,
}),
(ctx: { params: xrpcServer.Params; input?: xrpcServer.HandlerInput }) => {
if (ctx.input?.body instanceof Readable)
throw new Error('Input is readable')
return {
encoding: 'json',
body: ctx.input?.body ?? null,
}
},
)
server.method('io.example.validationTestTwo', () => ({
encoding: 'json',
Expand Down Expand Up @@ -192,18 +196,21 @@ describe('Bodies', () => {
bytes,
{
encoding: 'application/octet-stream',
headers: {
'content-encoding': 'identity',
},
},
)
expect(uncompressed.cid).toEqual(expectedCid.toString())

const { data: compressed } = await client.call(
'io.example.blobTest',
{},
gzipSync(bytes),
brotliCompressSync(deflateSync(gzipSync(bytes))),
{
encoding: 'application/octet-stream',
headers: {
'content-encoding': 'gzip',
'content-encoding': 'gzip, deflate, br, identity',
},
},
)
Expand Down
1 change: 1 addition & 0 deletions packages/xrpc/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export enum ResponseType {
Forbidden = 403,
XRPCNotSupported = 404,
PayloadTooLarge = 413,
UnsupportedMediaType = 415,
RateLimitExceeded = 429,
InternalServerError = 500,
MethodNotImplemented = 501,
Expand Down

0 comments on commit 2b1c66e

Please sign in to comment.