Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework logging #1695

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"cbor-x": "^1.5.1",
"iso-datestring-validator": "^2.2.2",
"multiformats": "^9.9.0",
"pino": "^8.15.0",
"zod": "3.21.4"
}
}
1 change: 0 additions & 1 deletion packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@ export * from './dates'
export * from './fs'
export * from './ipld'
export * from './ipld-multi'
export * from './logger'
export * from './streams'
export * from './buffers'
36 changes: 0 additions & 36 deletions packages/common/src/logger.ts

This file was deleted.

6 changes: 6 additions & 0 deletions packages/pds/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { BackgroundQueue } from './background'
import DidSqlCache from './did-cache'
import { Crawlers } from './crawlers'
import { RuntimeFlags } from './runtime-flags'
import { Logger } from './logger'

export class AppContext {
constructor(
Expand All @@ -38,6 +39,7 @@ export class AppContext {
backgroundQueue: BackgroundQueue
appviewAgent: AtpAgent
crawlers: Crawlers
logger: Logger
},
) {}

Expand Down Expand Up @@ -129,6 +131,10 @@ export class AppContext {
return this.opts.crawlers
}

get log(): Logger {
return this.opts.logger
}

get plcClient(): plc.Client {
return new plc.Client(this.cfg.didPlcUrl)
}
Expand Down
45 changes: 16 additions & 29 deletions packages/pds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@ import { AtpAgent } from '@atproto/api'
import * as crypto from '@atproto/crypto'
import { BlobStore } from '@atproto/repo'
import { IdResolver } from '@atproto/identity'
import {
RateLimiter,
RateLimiterCreator,
RateLimiterOpts,
Options as XrpcServerOptions,
} from '@atproto/xrpc-server'
import { Options as XrpcServerOptions } from '@atproto/xrpc-server'
import { DAY, HOUR, MINUTE } from '@atproto/common'
import API from './api'
import * as basicRoutes from './basic-routes'
Expand All @@ -28,7 +23,6 @@ import Database from './db'
import { ServerAuth } from './auth'
import * as error from './error'
import compression from './util/compression'
import { dbLogger, loggerMiddleware, seqLogger } from './logger'
import { ServerConfig } from './config'
import { ServerMailer } from './mailer'
import { ModerationMailer } from './mailer/moderation'
Expand All @@ -42,6 +36,7 @@ import DidSqlCache from './did-cache'
import { Crawlers } from './crawlers'
import { getRedisClient } from './redis'
import { RuntimeFlags } from './runtime-flags'
import { createLogger } from './logger'

export type { ServerConfigValues } from './config'
export { ServerConfig } from './config'
Expand Down Expand Up @@ -111,10 +106,13 @@ export class PDS {
config,
)

// @TODO add config
const { logger, logMiddleware } = createLogger()

const app = express()
app.set('trust proxy', true)
app.use(cors())
app.use(loggerMiddleware)
app.use(logMiddleware)
app.use(compression())

const backgroundQueue = new BackgroundQueue(db)
Expand Down Expand Up @@ -164,9 +162,11 @@ export class PDS {
backgroundQueue,
appviewAgent,
crawlers,
logger,
})

const xrpcOpts: XrpcServerOptions = {
logger,
validateResponse: config.debugMode,
payload: {
jsonLimit: 100 * 1024, // 100kb
Expand All @@ -175,24 +175,11 @@ export class PDS {
},
}
if (config.rateLimitsEnabled) {
let rlCreator: RateLimiterCreator
if (redisScratch) {
rlCreator = (opts: RateLimiterOpts) =>
RateLimiter.redis(redisScratch, {
bypassSecret: config.rateLimitBypassKey,
bypassIps: config.rateLimitBypassIps,
...opts,
})
} else {
rlCreator = (opts: RateLimiterOpts) =>
RateLimiter.memory({
bypassSecret: config.rateLimitBypassKey,
bypassIps: config.rateLimitBypassIps,
...opts,
})
}
xrpcOpts['rateLimits'] = {
creator: rlCreator,
enabled: true,
bypassSecret: config.rateLimitBypassKey,
bypassIps: config.rateLimitBypassIps,
redisClient: redisScratch,
global: [
{
name: 'global-ip',
Expand Down Expand Up @@ -232,15 +219,15 @@ export class PDS {
if (db.cfg.dialect === 'pg') {
const { pool } = db.cfg
this.dbStatsInterval = setInterval(() => {
dbLogger.info(
this.ctx.log.db.info(
{
idleCount: pool.idleCount,
totalCount: pool.totalCount,
waitingCount: pool.waitingCount,
},
'db pool stats',
)
dbLogger.info(
this.ctx.log.db.info(
{
runningCount: backgroundQueue.queue.pending,
waitingCount: backgroundQueue.queue.size,
Expand All @@ -253,9 +240,9 @@ export class PDS {
if (this.ctx.sequencerLeader?.isLeader) {
try {
const seq = await this.ctx.sequencerLeader.lastSeq()
seqLogger.info({ seq }, 'sequencer leader stats')
this.ctx.log.seq.info({ seq }, 'sequencer leader stats')
} catch (err) {
seqLogger.error({ err }, 'error getting last seq')
this.ctx.log.seq.error({ err }, 'error getting last seq')
}
}
}, 500)
Expand Down
105 changes: 63 additions & 42 deletions packages/pds/src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,70 @@
import pino from 'pino'
import pinoHttp from 'pino-http'
import { subsystemLogger } from '@atproto/common'
import * as pino from 'pino'
import * as pinoHttp from 'pino-http'
import * as jwt from 'jsonwebtoken'
import { parseBasicAuth } from './auth'
import * as xrpc from '@atproto/xrpc-server'
Comment on lines 4 to +5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import { parseBasicAuth } from './auth'
import * as xrpc from '@atproto/xrpc-server'
import * as xrpc from '@atproto/xrpc-server'
import { parseBasicAuth } from './auth'


export const dbLogger = subsystemLogger('pds:db')
export const readStickyLogger = subsystemLogger('pds:read-sticky')
export const redisLogger = subsystemLogger('pds:redis')
export const seqLogger = subsystemLogger('pds:sequencer')
export const mailerLogger = subsystemLogger('pds:mailer')
export const labelerLogger = subsystemLogger('pds:labler')
export const crawlerLogger = subsystemLogger('pds:crawler')
export const httpLogger = subsystemLogger('pds')
export type Logger = xrpc.Logger & {
http: pino.Logger
db: pino.Logger
seq: pino.Logger
redis: pino.Logger
mailer: pino.Logger
labeler: pino.Logger
crawler: pino.Logger
munge: pino.Logger
}

export const loggerMiddleware = pinoHttp({
logger: httpLogger,
serializers: {
req: (req) => {
const serialized = pino.stdSerializers.req(req)
const authHeader = serialized.headers.authorization || ''
let auth: string | undefined = undefined
if (authHeader.startsWith('Bearer ')) {
const token = authHeader.slice('Bearer '.length)
const sub = jwt.decode(token)?.sub
if (sub) {
auth = 'Bearer ' + sub
} else {
auth = 'Bearer Invalid'
export const createLogger = (
opts: Partial<xrpc.LoggerOpts> = {},
): { logger: Logger; logMiddleware: pinoHttp.HttpLogger } => {
const logger = xrpc.createLogger(opts)
const http = logger.child({ name: 'pds' })
logger['http'] = http
logger['db'] = logger.child({ name: 'pds:db' })
logger['seq'] = logger.child({ name: 'pds:sequencer' })
logger['redis'] = logger.child({ name: 'pds:redis' })
logger['mailer'] = logger.child({ name: 'pds:mailer' })
logger['labeler'] = logger.child({ name: 'pds:labeler' })
logger['crawler'] = logger.child({ name: 'pds:crawler' })
logger['munge'] = logger.child({ name: 'pds:munge' })
const logMiddleware = createLoggerMiddleware(http)
return { logger: logger as Logger, logMiddleware }
}

export const createLoggerMiddleware = (logger: pino.Logger) => {
return pinoHttp.default({
logger,
serializers: {
req: (req) => {
const serialized = pino.stdSerializers.req(req)
const authHeader = serialized.headers.authorization || ''
let auth: string | undefined = undefined
if (authHeader.startsWith('Bearer ')) {
const token = authHeader.slice('Bearer '.length)
const sub = jwt.decode(token)?.sub
if (sub) {
auth = 'Bearer ' + sub
} else {
auth = 'Bearer Invalid'
}
}
if (authHeader.startsWith('Basic ')) {
const parsed = parseBasicAuth(authHeader)
if (!parsed) {
auth = 'Basic Invalid'
} else {
auth = 'Basic ' + parsed.username
}
}
}
if (authHeader.startsWith('Basic ')) {
const parsed = parseBasicAuth(authHeader)
if (!parsed) {
auth = 'Basic Invalid'
} else {
auth = 'Basic ' + parsed.username
return {
...serialized,
headers: {
...serialized.headers,
authorization: auth,
},
}
}
return {
...serialized,
headers: {
...serialized.headers,
authorization: auth,
},
}
},
},
},
})
})
}
6 changes: 0 additions & 6 deletions packages/repo/src/logger.ts

This file was deleted.

2 changes: 0 additions & 2 deletions packages/repo/src/readable-repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { CID } from 'multiformats/cid'
import { def, RepoContents, Commit } from './types'
import { ReadableBlockstore } from './storage'
import { MST } from './mst'
import log from './logger'
import * as util from './util'
import * as parse from './parse'
import { MissingBlocksError } from './error'
Expand Down Expand Up @@ -30,7 +29,6 @@ export class ReadableRepo {
static async load(storage: ReadableBlockstore, commitCid: CID) {
const commit = await storage.readObj(commitCid, def.versionedCommit)
const data = await MST.load(storage, commit.data)
log.info({ did: commit.did }, 'loaded repo for')
return new ReadableRepo({
storage,
data,
Expand Down
2 changes: 0 additions & 2 deletions packages/repo/src/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
import { RepoStorage } from './storage'
import { MST } from './mst'
import DataDiff from './data-diff'
import log from './logger'
import BlockMap from './block-map'
import { ReadableRepo } from './readable-repo'
import * as util from './util'
Expand Down Expand Up @@ -102,7 +101,6 @@ export class Repo extends ReadableRepo {
}
const commit = await storage.readObj(commitCid, def.versionedCommit)
const data = await MST.load(storage, commit.data)
log.info({ did: commit.did }, 'loaded repo for')
return new Repo({
storage,
data,
Expand Down
1 change: 1 addition & 0 deletions packages/xrpc-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"express": "^4.17.2",
"http-errors": "^2.0.0",
"mime-types": "^2.1.35",
"pino": "^8.15.0",
"rate-limiter-flexible": "^2.4.1",
"uint8arrays": "3.0.0",
"ws": "^8.12.0",
Expand Down
1 change: 1 addition & 0 deletions packages/xrpc-server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './types'
export * from './auth'
export * from './logger'
export * from './server'
export * from './stream'
export * from './rate-limiter'
Expand Down
Loading