From f6c0eb9669f73ce0a1733dbc4ae677636fab2ff2 Mon Sep 17 00:00:00 2001 From: dholms Date: Fri, 29 Sep 2023 19:36:51 -0500 Subject: [PATCH] rework logging --- packages/common/package.json | 1 - packages/common/src/index.ts | 1 - packages/common/src/logger.ts | 36 -------- packages/pds/src/context.ts | 6 ++ packages/pds/src/index.ts | 45 ++++------ packages/pds/src/logger.ts | 105 +++++++++++++--------- packages/repo/src/logger.ts | 6 -- packages/repo/src/readable-repo.ts | 2 - packages/repo/src/repo.ts | 2 - packages/xrpc-server/package.json | 1 + packages/xrpc-server/src/index.ts | 1 + packages/xrpc-server/src/logger.ts | 38 +++++++- packages/xrpc-server/src/rate-limiter.ts | 7 +- packages/xrpc-server/src/server.ts | 98 ++++++++++++-------- packages/xrpc-server/src/stream/logger.ts | 6 -- packages/xrpc-server/src/stream/server.ts | 12 +-- packages/xrpc-server/src/types.ts | 8 +- pnpm-lock.yaml | 6 +- 18 files changed, 203 insertions(+), 178 deletions(-) delete mode 100644 packages/common/src/logger.ts delete mode 100644 packages/repo/src/logger.ts delete mode 100644 packages/xrpc-server/src/stream/logger.ts diff --git a/packages/common/package.json b/packages/common/package.json index 4803b9d0d19..4224785bb16 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -29,7 +29,6 @@ "cbor-x": "^1.5.1", "iso-datestring-validator": "^2.2.2", "multiformats": "^9.9.0", - "pino": "^8.15.0", "zod": "3.21.4" } } diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 524a090c5ab..01f3554a30b 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -3,6 +3,5 @@ export * from './dates' export * from './fs' export * from './ipld' export * from './ipld-multi' -export * from './logger' export * from './streams' export * from './buffers' diff --git a/packages/common/src/logger.ts b/packages/common/src/logger.ts deleted file mode 100644 index 857d32ee9f2..00000000000 --- a/packages/common/src/logger.ts +++ /dev/null @@ -1,36 +0,0 @@ -import pino from 'pino' - -const allSystemsEnabled = !process.env.LOG_SYSTEMS -const enabledSystems = (process.env.LOG_SYSTEMS || '') - .replace(',', ' ') - .split(' ') - -const enabledEnv = process.env.LOG_ENABLED -const enabled = - enabledEnv === 'true' || enabledEnv === 't' || enabledEnv === '1' - -const level = process.env.LOG_LEVEL || 'info' - -const config = { - enabled, - level, -} - -const rootLogger = process.env.LOG_DESTINATION - ? pino(config, pino.destination(process.env.LOG_DESTINATION)) - : pino(config) - -const subsystems: Record = {} - -export const subsystemLogger = (name: string): pino.Logger => { - if (subsystems[name]) return subsystems[name] - const subsystemEnabled = - enabled && (allSystemsEnabled || enabledSystems.indexOf(name) > -1) - - // can't disable child loggers, so we just set their level to fatal to effectively turn them off - subsystems[name] = rootLogger.child( - { name }, - { level: subsystemEnabled ? level : 'silent' }, - ) - return subsystems[name] -} diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 791100c492b..ac9f35b461d 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -16,6 +16,7 @@ import { BackgroundQueue } from './background' import DidSqlCache from './did-cache' import { Crawlers } from './crawlers' import { RuntimeFlags } from './runtime-flags' +import { Logger } from './logger' export class AppContext { constructor( @@ -38,6 +39,7 @@ export class AppContext { backgroundQueue: BackgroundQueue appviewAgent: AtpAgent crawlers: Crawlers + logger: Logger }, ) {} @@ -129,6 +131,10 @@ export class AppContext { return this.opts.crawlers } + get log(): Logger { + return this.opts.logger + } + get plcClient(): plc.Client { return new plc.Client(this.cfg.didPlcUrl) } diff --git a/packages/pds/src/index.ts b/packages/pds/src/index.ts index 687ec9699af..297e6c77912 100644 --- a/packages/pds/src/index.ts +++ b/packages/pds/src/index.ts @@ -14,12 +14,7 @@ import { AtpAgent } from '@atproto/api' import * as crypto from '@atproto/crypto' import { BlobStore } from '@atproto/repo' import { IdResolver } from '@atproto/identity' -import { - RateLimiter, - RateLimiterCreator, - RateLimiterOpts, - Options as XrpcServerOptions, -} from '@atproto/xrpc-server' +import { Options as XrpcServerOptions } from '@atproto/xrpc-server' import { DAY, HOUR, MINUTE } from '@atproto/common' import API from './api' import * as basicRoutes from './basic-routes' @@ -28,7 +23,6 @@ import Database from './db' import { ServerAuth } from './auth' import * as error from './error' import compression from './util/compression' -import { dbLogger, loggerMiddleware, seqLogger } from './logger' import { ServerConfig } from './config' import { ServerMailer } from './mailer' import { ModerationMailer } from './mailer/moderation' @@ -42,6 +36,7 @@ import DidSqlCache from './did-cache' import { Crawlers } from './crawlers' import { getRedisClient } from './redis' import { RuntimeFlags } from './runtime-flags' +import { createLogger } from './logger' export type { ServerConfigValues } from './config' export { ServerConfig } from './config' @@ -111,10 +106,13 @@ export class PDS { config, ) + // @TODO add config + const { logger, logMiddleware } = createLogger() + const app = express() app.set('trust proxy', true) app.use(cors()) - app.use(loggerMiddleware) + app.use(logMiddleware) app.use(compression()) const backgroundQueue = new BackgroundQueue(db) @@ -164,9 +162,11 @@ export class PDS { backgroundQueue, appviewAgent, crawlers, + logger, }) const xrpcOpts: XrpcServerOptions = { + logger, validateResponse: config.debugMode, payload: { jsonLimit: 100 * 1024, // 100kb @@ -175,24 +175,11 @@ export class PDS { }, } if (config.rateLimitsEnabled) { - let rlCreator: RateLimiterCreator - if (redisScratch) { - rlCreator = (opts: RateLimiterOpts) => - RateLimiter.redis(redisScratch, { - bypassSecret: config.rateLimitBypassKey, - bypassIps: config.rateLimitBypassIps, - ...opts, - }) - } else { - rlCreator = (opts: RateLimiterOpts) => - RateLimiter.memory({ - bypassSecret: config.rateLimitBypassKey, - bypassIps: config.rateLimitBypassIps, - ...opts, - }) - } xrpcOpts['rateLimits'] = { - creator: rlCreator, + enabled: true, + bypassSecret: config.rateLimitBypassKey, + bypassIps: config.rateLimitBypassIps, + redisClient: redisScratch, global: [ { name: 'global-ip', @@ -232,7 +219,7 @@ export class PDS { if (db.cfg.dialect === 'pg') { const { pool } = db.cfg this.dbStatsInterval = setInterval(() => { - dbLogger.info( + this.ctx.log.db.info( { idleCount: pool.idleCount, totalCount: pool.totalCount, @@ -240,7 +227,7 @@ export class PDS { }, 'db pool stats', ) - dbLogger.info( + this.ctx.log.db.info( { runningCount: backgroundQueue.queue.pending, waitingCount: backgroundQueue.queue.size, @@ -253,9 +240,9 @@ export class PDS { if (this.ctx.sequencerLeader?.isLeader) { try { const seq = await this.ctx.sequencerLeader.lastSeq() - seqLogger.info({ seq }, 'sequencer leader stats') + this.ctx.log.seq.info({ seq }, 'sequencer leader stats') } catch (err) { - seqLogger.error({ err }, 'error getting last seq') + this.ctx.log.seq.error({ err }, 'error getting last seq') } } }, 500) diff --git a/packages/pds/src/logger.ts b/packages/pds/src/logger.ts index a4277e41669..dd75aa4c310 100644 --- a/packages/pds/src/logger.ts +++ b/packages/pds/src/logger.ts @@ -1,49 +1,70 @@ -import pino from 'pino' -import pinoHttp from 'pino-http' -import { subsystemLogger } from '@atproto/common' +import * as pino from 'pino' +import * as pinoHttp from 'pino-http' import * as jwt from 'jsonwebtoken' import { parseBasicAuth } from './auth' +import * as xrpc from '@atproto/xrpc-server' -export const dbLogger = subsystemLogger('pds:db') -export const readStickyLogger = subsystemLogger('pds:read-sticky') -export const redisLogger = subsystemLogger('pds:redis') -export const seqLogger = subsystemLogger('pds:sequencer') -export const mailerLogger = subsystemLogger('pds:mailer') -export const labelerLogger = subsystemLogger('pds:labler') -export const crawlerLogger = subsystemLogger('pds:crawler') -export const httpLogger = subsystemLogger('pds') +export type Logger = xrpc.Logger & { + http: pino.Logger + db: pino.Logger + seq: pino.Logger + redis: pino.Logger + mailer: pino.Logger + labeler: pino.Logger + crawler: pino.Logger + munge: pino.Logger +} -export const loggerMiddleware = pinoHttp({ - logger: httpLogger, - serializers: { - req: (req) => { - const serialized = pino.stdSerializers.req(req) - const authHeader = serialized.headers.authorization || '' - let auth: string | undefined = undefined - if (authHeader.startsWith('Bearer ')) { - const token = authHeader.slice('Bearer '.length) - const sub = jwt.decode(token)?.sub - if (sub) { - auth = 'Bearer ' + sub - } else { - auth = 'Bearer Invalid' +export const createLogger = ( + opts: Partial = {}, +): { logger: Logger; logMiddleware: pinoHttp.HttpLogger } => { + const logger = xrpc.createLogger(opts) + const http = logger.child({ name: 'pds' }) + logger['http'] = http + logger['db'] = logger.child({ name: 'pds:db' }) + logger['seq'] = logger.child({ name: 'pds:sequencer' }) + logger['redis'] = logger.child({ name: 'pds:redis' }) + logger['mailer'] = logger.child({ name: 'pds:mailer' }) + logger['labeler'] = logger.child({ name: 'pds:labeler' }) + logger['crawler'] = logger.child({ name: 'pds:crawler' }) + logger['munge'] = logger.child({ name: 'pds:munge' }) + const logMiddleware = createLoggerMiddleware(http) + return { logger: logger as Logger, logMiddleware } +} + +export const createLoggerMiddleware = (logger: pino.Logger) => { + return pinoHttp.default({ + logger, + serializers: { + req: (req) => { + const serialized = pino.stdSerializers.req(req) + const authHeader = serialized.headers.authorization || '' + let auth: string | undefined = undefined + if (authHeader.startsWith('Bearer ')) { + const token = authHeader.slice('Bearer '.length) + const sub = jwt.decode(token)?.sub + if (sub) { + auth = 'Bearer ' + sub + } else { + auth = 'Bearer Invalid' + } + } + if (authHeader.startsWith('Basic ')) { + const parsed = parseBasicAuth(authHeader) + if (!parsed) { + auth = 'Basic Invalid' + } else { + auth = 'Basic ' + parsed.username + } } - } - if (authHeader.startsWith('Basic ')) { - const parsed = parseBasicAuth(authHeader) - if (!parsed) { - auth = 'Basic Invalid' - } else { - auth = 'Basic ' + parsed.username + return { + ...serialized, + headers: { + ...serialized.headers, + authorization: auth, + }, } - } - return { - ...serialized, - headers: { - ...serialized.headers, - authorization: auth, - }, - } + }, }, - }, -}) + }) +} diff --git a/packages/repo/src/logger.ts b/packages/repo/src/logger.ts deleted file mode 100644 index d4d1fdb9936..00000000000 --- a/packages/repo/src/logger.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { subsystemLogger } from '@atproto/common' - -export const logger: ReturnType = - subsystemLogger('repo') - -export default logger diff --git a/packages/repo/src/readable-repo.ts b/packages/repo/src/readable-repo.ts index 4381bd5e0e4..04ea6d8ac46 100644 --- a/packages/repo/src/readable-repo.ts +++ b/packages/repo/src/readable-repo.ts @@ -2,7 +2,6 @@ import { CID } from 'multiformats/cid' import { def, RepoContents, Commit } from './types' import { ReadableBlockstore } from './storage' import { MST } from './mst' -import log from './logger' import * as util from './util' import * as parse from './parse' import { MissingBlocksError } from './error' @@ -30,7 +29,6 @@ export class ReadableRepo { static async load(storage: ReadableBlockstore, commitCid: CID) { const commit = await storage.readObj(commitCid, def.versionedCommit) const data = await MST.load(storage, commit.data) - log.info({ did: commit.did }, 'loaded repo for') return new ReadableRepo({ storage, data, diff --git a/packages/repo/src/repo.ts b/packages/repo/src/repo.ts index 49e8ef24810..3e7b80dea6e 100644 --- a/packages/repo/src/repo.ts +++ b/packages/repo/src/repo.ts @@ -12,7 +12,6 @@ import { import { RepoStorage } from './storage' import { MST } from './mst' import DataDiff from './data-diff' -import log from './logger' import BlockMap from './block-map' import { ReadableRepo } from './readable-repo' import * as util from './util' @@ -102,7 +101,6 @@ export class Repo extends ReadableRepo { } const commit = await storage.readObj(commitCid, def.versionedCommit) const data = await MST.load(storage, commit.data) - log.info({ did: commit.did }, 'loaded repo for') return new Repo({ storage, data, diff --git a/packages/xrpc-server/package.json b/packages/xrpc-server/package.json index ef3c56b3bc0..cb90446cbf4 100644 --- a/packages/xrpc-server/package.json +++ b/packages/xrpc-server/package.json @@ -32,6 +32,7 @@ "express": "^4.17.2", "http-errors": "^2.0.0", "mime-types": "^2.1.35", + "pino": "^8.15.0", "rate-limiter-flexible": "^2.4.1", "uint8arrays": "3.0.0", "ws": "^8.12.0", diff --git a/packages/xrpc-server/src/index.ts b/packages/xrpc-server/src/index.ts index 1458d2ba070..0226942649a 100644 --- a/packages/xrpc-server/src/index.ts +++ b/packages/xrpc-server/src/index.ts @@ -1,5 +1,6 @@ export * from './types' export * from './auth' +export * from './logger' export * from './server' export * from './stream' export * from './rate-limiter' diff --git a/packages/xrpc-server/src/logger.ts b/packages/xrpc-server/src/logger.ts index 1e8599637e0..ba2d6053270 100644 --- a/packages/xrpc-server/src/logger.ts +++ b/packages/xrpc-server/src/logger.ts @@ -1,6 +1,36 @@ -import { subsystemLogger } from '@atproto/common' +import * as pino from 'pino' -export const logger: ReturnType = - subsystemLogger('xrpc-server') +export type Logger = pino.Logger & { + xrpcServer: pino.Logger + xrpcStream: pino.Logger +} -export default logger +export type LoggerOpts = { + enabled: boolean + level: string + destination: string +} + +export const isLogger = (obj: unknown): obj is Logger => { + return ( + obj && typeof obj === 'object' && obj['xrpcServer'] && obj['xrpcStream'] + ) +} + +export const createLogger = ( + opts: Logger | Partial = {}, +): Logger => { + if (isLogger(opts)) { + return opts + } + const cfg = { + enabled: opts.enabled ?? true, + level: opts.level ?? 'info', + } + const root = opts.destination + ? pino.default(cfg, pino.destination(opts.destination)) + : pino.default(cfg) + root['xrpcServer'] = root.child({ name: 'xrpc-server' }) + root['xrpcStream'] = root.child({ name: 'xrpc-stream' }) + return root as Logger +} diff --git a/packages/xrpc-server/src/rate-limiter.ts b/packages/xrpc-server/src/rate-limiter.ts index e9bf8a40e22..fa10304508c 100644 --- a/packages/xrpc-server/src/rate-limiter.ts +++ b/packages/xrpc-server/src/rate-limiter.ts @@ -4,7 +4,6 @@ import { RateLimiterRedis, RateLimiterRes, } from 'rate-limiter-flexible' -import { logger } from './logger' import { CalcKeyFn, CalcPointsFn, @@ -14,11 +13,13 @@ import { RateLimiterStatus, XRPCReqContext, } from './types' +import { Logger } from './logger' export type RateLimiterOpts = { keyPrefix: string durationMs: number points: number + logger?: Logger bypassSecret?: string bypassIps?: string[] calcKey?: CalcKeyFn @@ -28,6 +29,7 @@ export type RateLimiterOpts = { export class RateLimiter implements RateLimiterI { public limiter: RateLimiterAbstract + public logger?: Logger private bypassSecret?: string private bypassIps?: string[] private failClosed?: boolean @@ -38,6 +40,7 @@ export class RateLimiter implements RateLimiterI { this.limiter = limiter this.bypassSecret = opts.bypassSecret this.bypassIps = opts.bypassIps + this.logger = opts.logger this.calcKey = opts.calcKey ?? defaultKey this.calcPoints = opts.calcPoints ?? defaultPoints } @@ -93,7 +96,7 @@ export class RateLimiter implements RateLimiterI { if (this.failClosed) { throw err } - logger.error( + this.logger?.xrpcServer.error( { err, keyPrefix: this.limiter.keyPrefix, diff --git a/packages/xrpc-server/src/server.ts b/packages/xrpc-server/src/server.ts index 4e0a84ce4b7..54588209d27 100644 --- a/packages/xrpc-server/src/server.ts +++ b/packages/xrpc-server/src/server.ts @@ -35,6 +35,7 @@ import { RateLimiterConsume, isShared, RateLimitExceededError, + RateLimiterCreator, } from './types' import { decodeQueryParams, @@ -42,8 +43,8 @@ import { validateInput, validateOutput, } from './util' -import log from './logger' -import { consumeMany } from './rate-limiter' +import { RateLimiter, RateLimiterOpts, consumeMany } from './rate-limiter' +import { Logger, createLogger } from './logger' export function createServer(lexicons?: unknown[], options?: Options) { return new Server(lexicons, options) @@ -54,8 +55,10 @@ export class Server { routes = express.Router() subscriptions = new Map() lex = new Lexicons() + logger: Logger options: Options middleware: Record<'json' | 'text', RequestHandler> + rlCreator?: RateLimiterCreator globalRateLimiters: RateLimiterI[] sharedRateLimiters: Record routeRateLimiterFns: Record @@ -64,9 +67,10 @@ export class Server { if (lexicons) { this.addLexicons(lexicons) } + this.logger = createLogger(opts?.logger) this.router.use(this.routes) this.router.use('/xrpc/:methodId', this.catchall.bind(this)) - this.router.use(errorMiddleware) + this.router.use(createErrorMiddleware(this.logger)) this.router.once('mount', (app: express.Application) => { this.enableStreamingOnListen(app) }) @@ -75,21 +79,36 @@ export class Server { json: express.json({ limit: opts?.payload?.jsonLimit }), text: express.text({ limit: opts?.payload?.textLimit }), } + if (opts?.rateLimits?.enabled) { + this.rlCreator = (rlOpts: RateLimiterOpts) => { + const rlCfg = { + logger: this.logger, + bypassSecret: rlOpts.bypassSecret ?? opts.rateLimits?.bypassSecret, + bypassIps: rlOpts.bypassIps ?? opts.rateLimits?.bypassIps, + ...rlOpts, + } + if (opts.rateLimits?.redisClient) { + return RateLimiter.redis(opts.rateLimits.redisClient, rlCfg) + } else { + return RateLimiter.memory(rlCfg) + } + } + } this.globalRateLimiters = [] this.sharedRateLimiters = {} this.routeRateLimiterFns = {} - if (opts?.rateLimits?.global) { + if (opts?.rateLimits?.global && this.rlCreator) { for (const limit of opts.rateLimits.global) { - const rateLimiter = opts.rateLimits.creator({ + const rateLimiter = this.rlCreator({ ...limit, keyPrefix: `rl-${limit.name}`, }) this.globalRateLimiters.push(rateLimiter) } } - if (opts?.rateLimits?.shared) { + if (opts?.rateLimits?.shared && this.rlCreator) { for (const limit of opts.rateLimits.shared) { - const rateLimiter = opts.rateLimits.creator({ + const rateLimiter = this.rlCreator({ ...limit, keyPrefix: `rl-${limit.name}`, }) @@ -320,6 +339,7 @@ export class Server { nsid, new XrpcStreamServer({ noServer: true, + logger: this.logger, handler: async function* (req, signal) { try { // authenticate request @@ -419,15 +439,15 @@ export class Server { this.routeRateLimiterFns[nsid].push(consumeFn) } } else { - const { durationMs, points } = limit - const rateLimiter = this.options.rateLimits?.creator({ - keyPrefix: `nsid-${i}`, - durationMs, - points, - calcKey, - calcPoints, - }) - if (rateLimiter) { + if (this.rlCreator) { + const { durationMs, points } = limit + const rateLimiter = this.rlCreator({ + keyPrefix: `nsid-${i}`, + durationMs, + points, + calcKey, + calcPoints, + }) this.sharedRateLimiters[nsid] = rateLimiter const consumeFn = (ctx: XRPCReqContext) => rateLimiter.consume(ctx, { @@ -477,26 +497,28 @@ function createAuthMiddleware(verifier: AuthVerifier): RequestHandler { } } -const errorMiddleware: ErrorRequestHandler = function (err, req, res, next) { - const locals: RequestLocals | undefined = req[kRequestLocals] - const methodSuffix = locals ? ` method ${locals.nsid}` : '' - const xrpcError = XRPCError.fromError(err) - if (xrpcError instanceof InternalServerError) { - // log trace for unhandled exceptions - log.error(err, `unhandled exception in xrpc${methodSuffix}`) - } else { - // do not log trace for known xrpc errors - log.error( - { - status: xrpcError.type, - message: xrpcError.message, - name: xrpcError.customErrorName, - }, - `error in xrpc${methodSuffix}`, - ) - } - if (res.headersSent) { - return next(err) +const createErrorMiddleware = + (logger: Logger): ErrorRequestHandler => + (err, req, res, next) => { + const locals: RequestLocals | undefined = req[kRequestLocals] + const methodSuffix = locals ? ` method ${locals.nsid}` : '' + const xrpcError = XRPCError.fromError(err) + if (xrpcError instanceof InternalServerError) { + // log trace for unhandled exceptions + logger.xrpcServer.error(err, `unhandled exception in xrpc${methodSuffix}`) + } else { + // do not log trace for known xrpc errors + logger.xrpcServer.error( + { + status: xrpcError.type, + message: xrpcError.message, + name: xrpcError.customErrorName, + }, + `error in xrpc${methodSuffix}`, + ) + } + if (res.headersSent) { + return next(err) + } + return res.status(xrpcError.type).json(xrpcError.payload) } - return res.status(xrpcError.type).json(xrpcError.payload) -} diff --git a/packages/xrpc-server/src/stream/logger.ts b/packages/xrpc-server/src/stream/logger.ts deleted file mode 100644 index 7cff29b9ade..00000000000 --- a/packages/xrpc-server/src/stream/logger.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { subsystemLogger } from '@atproto/common' - -export const logger: ReturnType = - subsystemLogger('xrpc-stream') - -export default logger diff --git a/packages/xrpc-server/src/stream/server.ts b/packages/xrpc-server/src/stream/server.ts index f52730dff4b..2cede4908a3 100644 --- a/packages/xrpc-server/src/stream/server.ts +++ b/packages/xrpc-server/src/stream/server.ts @@ -1,16 +1,18 @@ import { IncomingMessage } from 'http' import { WebSocketServer, ServerOptions, WebSocket } from 'ws' import { ErrorFrame, Frame } from './frames' -import logger from './logger' import { CloseCode, DisconnectError } from './types' +import { Logger } from '../logger' export class XrpcStreamServer { wss: WebSocketServer - constructor(opts: ServerOptions & { handler: Handler }) { - const { handler, ...serverOpts } = opts + constructor(opts: ServerOptions & { handler: Handler; logger: Logger }) { + const { handler, logger, ...serverOpts } = opts this.wss = new WebSocketServer(serverOpts) this.wss.on('connection', async (socket, req) => { - socket.on('error', (err) => logger.error(err, 'websocket error')) + socket.on('error', (err) => + logger.xrpcStream.error(err, 'websocket error'), + ) try { const ac = new AbortController() const iterator = unwrapIterator(handler(req, ac.signal, socket, this)) @@ -36,7 +38,7 @@ export class XrpcStreamServer { if (err instanceof DisconnectError) { return socket.close(err.wsCode, err.xrpcCode) } else { - logger.error(err, 'websocket server error') + logger.xrpcStream.error(err, 'websocket server error') return socket.terminate() } } diff --git a/packages/xrpc-server/src/types.ts b/packages/xrpc-server/src/types.ts index dc75627a95f..aea3105fabc 100644 --- a/packages/xrpc-server/src/types.ts +++ b/packages/xrpc-server/src/types.ts @@ -2,13 +2,16 @@ import { IncomingMessage } from 'http' import express from 'express' import { isHttpError } from 'http-errors' import zod from 'zod' +import * as pino from 'pino' import { ResponseType, ResponseTypeStrings, ResponseTypeNames, } from '@atproto/xrpc' +import { LoggerOpts } from './logger' export type Options = { + logger?: LoggerOpts | pino.Logger validateResponse?: boolean payload?: { jsonLimit?: number @@ -16,9 +19,12 @@ export type Options = { textLimit?: number } rateLimits?: { - creator: RateLimiterCreator + enabled: true global?: ServerRateLimitDescription[] shared?: ServerRateLimitDescription[] + bypassSecret?: string + bypassIps?: string[] + redisClient?: unknown } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 80681385373..6e9e08b4c49 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -295,9 +295,6 @@ importers: multiformats: specifier: ^9.9.0 version: 9.9.0 - pino: - specifier: ^8.15.0 - version: 8.15.0 zod: specifier: 3.21.4 version: 3.21.4 @@ -691,6 +688,9 @@ importers: mime-types: specifier: ^2.1.35 version: 2.1.35 + pino: + specifier: ^8.15.0 + version: 8.15.0 rate-limiter-flexible: specifier: ^2.4.1 version: 2.4.1