diff --git a/packages/bsky/src/auth-verifier.ts b/packages/bsky/src/auth-verifier.ts index 95513ea9059..310c4648343 100644 --- a/packages/bsky/src/auth-verifier.ts +++ b/packages/bsky/src/auth-verifier.ts @@ -284,21 +284,6 @@ export const parseBasicAuth = ( return { username, password } } -export const ensureValidAdminAud = ( - auth: RoleOutput | AdminServiceOutput, - subjectDid: string, -) => { - if ( - auth.credentials.type === 'admin_service' && - auth.credentials.aud !== subjectDid - ) { - throw new AuthRequiredError( - 'jwt audience does not match account did', - 'BadJwtAudience', - ) - } -} - export const buildBasicAuth = (username: string, password: string): string => { return ( BASIC + diff --git a/packages/bsky/src/auto-moderator/index.ts b/packages/bsky/src/auto-moderator/index.ts index 507156b2612..851d3289e89 100644 --- a/packages/bsky/src/auto-moderator/index.ts +++ b/packages/bsky/src/auto-moderator/index.ts @@ -53,14 +53,12 @@ export class AutoModerator { ) } - if (ctx.cfg.moderationPushUrl) { - const url = new URL(ctx.cfg.moderationPushUrl) - this.pushAgent = new AtpAgent({ service: url.origin }) - this.pushAgent.api.setHeader( - 'authorization', - buildBasicAuth(url.username, url.password), - ) - } + const url = new URL(ctx.cfg.moderationPushUrl) + this.pushAgent = new AtpAgent({ service: url.origin }) + this.pushAgent.api.setHeader( + 'authorization', + buildBasicAuth(url.username, url.password), + ) } processRecord(uri: AtUri, cid: CID, obj: unknown) { diff --git a/packages/bsky/src/config.ts b/packages/bsky/src/config.ts index f99a6fcfca4..1e8c79ec0fe 100644 --- a/packages/bsky/src/config.ts +++ b/packages/bsky/src/config.ts @@ -35,7 +35,8 @@ export interface ServerConfigValues { adminPassword: string moderatorPassword: string triagePassword: string - moderationPushUrl?: string + modServiceDid: string + modServiceUrl: string rateLimitsEnabled: boolean rateLimitBypassKey?: string rateLimitBypassIps?: string[] @@ -117,10 +118,12 @@ export class ServerConfig { const triagePassword = process.env.TRIAGE_PASSWORD || undefined assert(triagePassword) const labelerDid = process.env.LABELER_DID || 'did:example:labeler' - const moderationPushUrl = - overrides?.moderationPushUrl || - process.env.MODERATION_PUSH_URL || - undefined + const modServiceUrl = + overrides?.modServiceUrl || process.env.MODERATION_PUSH_URL || undefined + const modServiceDid = + overrides?.modServiceDid || process.env.MODERATION_PUSH_DID || undefined + assert(modServiceUrl) + assert(modServiceDid) const rateLimitsEnabled = process.env.RATE_LIMITS_ENABLED === 'true' const rateLimitBypassKey = process.env.RATE_LIMIT_BYPASS_KEY const rateLimitBypassIps = process.env.RATE_LIMIT_BYPASS_IPS @@ -157,7 +160,8 @@ export class ServerConfig { adminPassword, moderatorPassword, triagePassword, - moderationPushUrl, + modServiceUrl, + modServiceDid, rateLimitsEnabled, rateLimitBypassKey, rateLimitBypassIps, @@ -286,8 +290,12 @@ export class ServerConfig { return this.cfg.triagePassword } - get moderationPushUrl() { - return this.cfg.moderationPushUrl + get modServiceUrl() { + return this.cfg.modServiceUrl + } + + get modServiceDid() { + return this.cfg.modServiceDid } get rateLimitsEnabled() { diff --git a/packages/bsky/src/context.ts b/packages/bsky/src/context.ts index 2bb6cac1ecf..3cb837616f1 100644 --- a/packages/bsky/src/context.ts +++ b/packages/bsky/src/context.ts @@ -12,7 +12,7 @@ import { BackgroundQueue } from './background' import { MountedAlgos } from './feed-gen/types' import { NotificationServer } from './notifications' import { Redis } from './redis' -import { AuthVerifier, buildBasicAuth } from './auth-verifier' +import { AuthVerifier } from './auth-verifier' export class AppContext { public moderationPushAgent: AtpAgent | undefined @@ -32,16 +32,7 @@ export class AppContext { notifServer: NotificationServer authVerifier: AuthVerifier }, - ) { - if (opts.cfg.moderationPushUrl) { - const url = new URL(opts.cfg.moderationPushUrl) - this.moderationPushAgent = new AtpAgent({ service: url.origin }) - this.moderationPushAgent.api.setHeader( - 'authorization', - buildBasicAuth(url.username, url.password), - ) - } - } + ) {} get db(): DatabaseCoordinator { return this.opts.db diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index b23c984599c..fd4013b0afc 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -43,6 +43,7 @@ export { PeriodicModerationEventReversal } from './db/periodic-moderation-event- export { Redis } from './redis' export { ViewMaintainer } from './db/views' export { AppContext } from './context' +export type { ImageInvalidator } from './image/invalidator' export { makeAlgos } from './feed-gen' export * from './daemon' export * from './indexer' @@ -130,7 +131,7 @@ export class BskyAppView { const authVerifier = new AuthVerifier(idResolver, { ownDid: config.serverDid, - adminDid: 'did:example:admin', + adminDid: config.modServiceDid, adminPass: config.adminPassword, moderatorPass: config.moderatorPassword, triagePass: config.triagePassword, diff --git a/packages/bsky/src/indexer/config.ts b/packages/bsky/src/indexer/config.ts index dd8b9ab89d5..9dfb74e11a3 100644 --- a/packages/bsky/src/indexer/config.ts +++ b/packages/bsky/src/indexer/config.ts @@ -21,7 +21,7 @@ export interface IndexerConfigValues { fuzzyMatchB64?: string fuzzyFalsePositiveB64?: string labelerKeywords: Record - moderationPushUrl?: string + moderationPushUrl: string indexerConcurrency?: number indexerPartitionIds: number[] indexerPartitionBatchSize?: number @@ -71,6 +71,7 @@ export class IndexerConfig { overrides?.moderationPushUrl || process.env.MODERATION_PUSH_URL || undefined + assert(moderationPushUrl) const hiveApiKey = process.env.HIVE_API_KEY || undefined const abyssEndpoint = process.env.ABYSS_ENDPOINT const abyssPassword = process.env.ABYSS_PASSWORD diff --git a/packages/dev-env/src/bsky.ts b/packages/dev-env/src/bsky.ts index 36ef5b0c8fc..37dcdf01dc7 100644 --- a/packages/dev-env/src/bsky.ts +++ b/packages/dev-env/src/bsky.ts @@ -44,6 +44,8 @@ export class TestBsky { didCacheMaxTTL: DAY, labelCacheStaleTTL: 30 * SECOND, labelCacheMaxTTL: MINUTE, + modServiceUrl: cfg.modServiceUrl ?? 'https://modservice.handle', + modServiceDid: cfg.modServiceDid ?? 'did:example:invalidMod', ...cfg, // Each test suite gets its own lock id for the repo subscription adminPassword: ADMIN_PASSWORD, @@ -109,7 +111,8 @@ export class TestBsky { abyssEndpoint: '', abyssPassword: '', imgUriEndpoint: 'img.example.com', - moderationPushUrl: cfg.moderationPushUrl, + moderationPushUrl: + cfg.indexer?.moderationPushUrl ?? 'https://modservice.invalid', indexerPartitionIds: [0], indexerNamespace: `ns${ns}`, indexerSubLockId: uniqueLockId(), @@ -269,6 +272,7 @@ export async function getIndexers( indexerPartitionIds: [0], indexerNamespace: `ns${ns}`, ingesterPartitionCount: config.ingesterPartitionCount ?? 1, + moderationPushUrl: config.moderationPushUrl ?? 'https://modservice.invalid', ...config, } const db = new bsky.PrimaryDatabase({ diff --git a/packages/dev-env/src/network.ts b/packages/dev-env/src/network.ts index a842e492756..080099175ae 100644 --- a/packages/dev-env/src/network.ts +++ b/packages/dev-env/src/network.ts @@ -3,6 +3,7 @@ import * as uint8arrays from 'uint8arrays' import getPort from 'get-port' import { wait } from '@atproto/common-web' import { createServiceJwt } from '@atproto/xrpc-server' +import { Client as PlcClient } from '@did-plc/lib' import { TestServerParams } from './types' import { TestPlc } from './plc' import { TestPds } from './pds' @@ -10,6 +11,7 @@ import { TestBsky } from './bsky' import { TestOzone } from './ozone' import { mockNetworkUtilities } from './util' import { TestNetworkNoAppView } from './network-no-appview' +import { Secp256k1Keypair } from '@atproto/crypto' const ADMIN_USERNAME = 'admin' const ADMIN_PASSWORD = 'admin-pass' @@ -39,6 +41,16 @@ export class TestNetwork extends TestNetworkNoAppView { const bskyPort = params.bsky?.port ?? (await getPort()) const pdsPort = params.pds?.port ?? (await getPort()) const ozonePort = params.ozone?.port ?? (await getPort()) + + const ozoneKey = await Secp256k1Keypair.create() + const ozoneDid = await new PlcClient(plc.url).createDid({ + signingKey: ozoneKey.did(), + rotationKeys: [ozoneKey.did()], + handle: 'ozone.test', + pds: `http://pds.invalid`, + signer: ozoneKey, + }) + const bsky = await TestBsky.create({ port: bskyPort, plcUrl: plc.url, @@ -48,30 +60,38 @@ export class TestNetwork extends TestNetworkNoAppView { dbPostgresSchema: `appview_${dbPostgresSchema}`, dbPrimaryPostgresUrl: dbPostgresUrl, redisHost, - moderationPushUrl: `http://admin:${ADMIN_PASSWORD}@localhost:${ozonePort}`, + modServiceUrl: `http://localhost:${ozonePort}`, + modServiceDid: ozoneDid, + indexer: { + moderationPushUrl: `http://admin:${ADMIN_PASSWORD}@localhost:${ozonePort}`, + }, ...params.bsky, }) + const pds = await TestPds.create({ + port: pdsPort, + didPlcUrl: plc.url, + bskyAppViewUrl: bsky.url, + bskyAppViewDid: bsky.ctx.cfg.serverDid, + modServiceUrl: `http://localhost:${ozonePort}`, + modServiceDid: ozoneDid, + ...params.pds, + }) + const ozone = await TestOzone.create({ port: ozonePort, plcUrl: plc.url, + signingKey: ozoneKey, + serverDid: ozoneDid, dbPostgresSchema: `ozone_${dbPostgresSchema}`, dbPostgresUrl, appviewUrl: bsky.url, - moderationPushUrl: `http://admin:${ADMIN_PASSWORD}@localhost:${pdsPort}`, // @TODO fix this + appviewDid: bsky.ctx.cfg.serverDid, + pdsUrl: pds.url, + pdsDid: pds.ctx.cfg.service.did, ...params.ozone, }) - const pds = await TestPds.create({ - port: pdsPort, - didPlcUrl: plc.url, - bskyAppViewUrl: bsky.url, - bskyAppViewDid: bsky.ctx.cfg.serverDid, - modServiceUrl: ozone.url, - modServiceDid: ozone.ctx.cfg.serverDid, - ...params.pds, - }) - mockNetworkUtilities(pds, bsky) return new TestNetwork(plc, pds, bsky, ozone) diff --git a/packages/dev-env/src/ozone.ts b/packages/dev-env/src/ozone.ts index 546643d7f8a..d39f06cd816 100644 --- a/packages/dev-env/src/ozone.ts +++ b/packages/dev-env/src/ozone.ts @@ -15,19 +15,21 @@ export class TestOzone { ) {} static async create(cfg: OzoneConfig): Promise { - const serviceKeypair = await Secp256k1Keypair.create() - const plcClient = new PlcClient(cfg.plcUrl) + const serviceKeypair = cfg.signingKey ?? (await Secp256k1Keypair.create()) + let serverDid = cfg.serverDid + if (!serverDid) { + const plcClient = new PlcClient(cfg.plcUrl) + serverDid = await plcClient.createDid({ + signingKey: serviceKeypair.did(), + rotationKeys: [serviceKeypair.did()], + handle: 'ozone.test', + pds: `https://pds.invalid`, + signer: serviceKeypair, + }) + } const port = cfg.port || (await getPort()) const url = `http://localhost:${port}` - const serverDid = await plcClient.createDid({ - signingKey: serviceKeypair.did(), - rotationKeys: [serviceKeypair.did()], - handle: 'bsky.test', - pds: `http://localhost:${port}`, - signer: serviceKeypair, - }) - const config = new ozone.ServerConfig({ version: '0.0.0', port, diff --git a/packages/dev-env/src/types.ts b/packages/dev-env/src/types.ts index c5aa66caf73..e0294262354 100644 --- a/packages/dev-env/src/types.ts +++ b/packages/dev-env/src/types.ts @@ -1,7 +1,8 @@ import * as pds from '@atproto/pds' import * as bsky from '@atproto/bsky' import * as ozone from '@atproto/ozone' -import { ImageInvalidator } from '@atproto/bsky/src/image/invalidator' +import { ImageInvalidator } from '@atproto/bsky' +import { Keypair } from '@atproto/crypto' export type PlcConfig = { port?: number @@ -32,6 +33,7 @@ export type OzoneConfig = Partial & { appviewUrl: string dbPostgresUrl: string migration?: string + signingKey?: Keypair } export type TestServerParams = { diff --git a/packages/ozone/src/api/admin/searchRepos.ts b/packages/ozone/src/api/admin/searchRepos.ts index 333daf0e06b..67cdf4c27fc 100644 --- a/packages/ozone/src/api/admin/searchRepos.ts +++ b/packages/ozone/src/api/admin/searchRepos.ts @@ -7,6 +7,7 @@ export default function (server: Server, ctx: AppContext) { handler: async ({ params }) => { const res = await ctx.appviewAgent.api.com.atproto.admin.searchRepos( params, + await ctx.appviewAuth(), ) const db = ctx.db const modService = ctx.modService(db) diff --git a/packages/ozone/src/api/admin/util.ts b/packages/ozone/src/api/admin/util.ts index f27f718dc95..06b64e45862 100644 --- a/packages/ozone/src/api/admin/util.ts +++ b/packages/ozone/src/api/admin/util.ts @@ -9,12 +9,15 @@ export const getPdsAccountInfo = async ( ctx: AppContext, did: string, ): Promise => { - const agent = ctx.moderationPushAgent + const agent = ctx.pdsAgent if (!agent) return null + const auth = await ctx.pdsAuth() + if (!auth) return null try { - const res = await agent.api.com.atproto.admin.getAccountInfo({ did }) + const res = await agent.api.com.atproto.admin.getAccountInfo({ did }, auth) return res.data } catch (err) { + console.log('ERR: ', err) return null } } diff --git a/packages/ozone/src/config.ts b/packages/ozone/src/config.ts index 232675528c1..2acf4b7b735 100644 --- a/packages/ozone/src/config.ts +++ b/packages/ozone/src/config.ts @@ -7,6 +7,9 @@ export interface ServerConfigValues { publicUrl?: string serverDid: string appviewUrl: string + appviewDid?: string + pdsUrl?: string + pdsDid?: string dbPostgresUrl: string dbPostgresSchema?: string didPlcUrl: string @@ -14,7 +17,6 @@ export interface ServerConfigValues { adminPassword: string moderatorPassword?: string triagePassword?: string - moderationPushUrl?: string } export class ServerConfig { @@ -30,6 +32,9 @@ export class ServerConfig { const port = isNaN(envPort) ? 2584 : envPort const appviewUrl = process.env.APPVIEW_URL assert(appviewUrl) + const appviewDid = process.env.APPVIEW_DID + const pdsUrl = process.env.PDS_URL + const pdsDid = process.env.PDS_DID const dbPostgresUrl = overrides?.dbPostgresUrl || process.env.DB_POSTGRES_URL assert(dbPostgresUrl) @@ -39,10 +44,6 @@ export class ServerConfig { const moderatorPassword = process.env.MODERATOR_PASSWORD || undefined const triagePassword = process.env.TRIAGE_PASSWORD || undefined const labelerDid = process.env.LABELER_DID || 'did:example:labeler' - const moderationPushUrl = - overrides?.moderationPushUrl || - process.env.MODERATION_PUSH_URL || - undefined return new ServerConfig({ version, @@ -51,6 +52,9 @@ export class ServerConfig { publicUrl, serverDid, appviewUrl, + appviewDid, + pdsUrl, + pdsDid, dbPostgresUrl, dbPostgresSchema, didPlcUrl, @@ -58,7 +62,6 @@ export class ServerConfig { adminPassword, moderatorPassword, triagePassword, - moderationPushUrl, ...stripUndefineds(overrides ?? {}), }) } @@ -100,6 +103,18 @@ export class ServerConfig { return this.cfg.appviewUrl } + get appviewDid() { + return this.cfg.appviewDid + } + + get pdsUrl() { + return this.cfg.pdsUrl + } + + get pdsDid() { + return this.cfg.pdsDid + } + get dbPostgresUrl() { return this.cfg.dbPostgresUrl } @@ -127,10 +142,6 @@ export class ServerConfig { get triagePassword() { return this.cfg.triagePassword } - - get moderationPushUrl() { - return this.cfg.moderationPushUrl - } } function stripUndefineds( diff --git a/packages/ozone/src/context.ts b/packages/ozone/src/context.ts index 201260a48ed..5a3bad5603d 100644 --- a/packages/ozone/src/context.ts +++ b/packages/ozone/src/context.ts @@ -2,7 +2,7 @@ import * as plc from '@did-plc/lib' import { IdResolver } from '@atproto/identity' import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' -import { createServiceJwt } from '@atproto/xrpc-server' +import { createServiceAuthHeaders } from '@atproto/xrpc-server' import { Database } from './db' import { ServerConfig } from './config' import { ModerationServiceCreator } from './mod-service' @@ -11,28 +11,19 @@ import { BackgroundQueue } from './background' import { OzoneDaemon } from './daemon' export class AppContext { - public moderationPushAgent: AtpAgent | undefined constructor( private opts: { db: Database - appviewAgent: AtpAgent cfg: ServerConfig modService: ModerationServiceCreator + appviewAgent: AtpAgent + pdsAgent: AtpAgent | undefined signingKey: Keypair idResolver: IdResolver backgroundQueue: BackgroundQueue daemon?: OzoneDaemon }, - ) { - if (opts.cfg.moderationPushUrl) { - const url = new URL(opts.cfg.moderationPushUrl) - this.moderationPushAgent = new AtpAgent({ service: url.origin }) - this.moderationPushAgent.api.setHeader( - 'authorization', - auth.buildBasicAuth(url.username, url.password), - ) - } - } + ) {} get db(): Database { return this.opts.db @@ -50,6 +41,10 @@ export class AppContext { return this.opts.appviewAgent } + get pdsAgent(): AtpAgent | undefined { + return this.opts.pdsAgent + } + get signingKey(): Keypair { return this.opts.signingKey } @@ -96,14 +91,28 @@ export class AppContext { return auth.roleVerifier(this.cfg) } - async serviceAuthJwt(aud: string) { + async serviceAuthHeaders(aud: string) { const iss = this.cfg.serverDid - return createServiceJwt({ + return createServiceAuthHeaders({ iss, aud, keypair: this.signingKey, }) } + + async pdsAuth() { + if (!this.cfg.pdsDid) { + return undefined + } + return this.serviceAuthHeaders(this.cfg.pdsDid) + } + + async appviewAuth() { + if (!this.cfg.appviewDid) { + return undefined + } + return this.serviceAuthHeaders(this.cfg.appviewDid) + } } export default AppContext diff --git a/packages/ozone/src/daemon/config.ts b/packages/ozone/src/daemon/config.ts index 6a042e10aa6..e30f0f8ee44 100644 --- a/packages/ozone/src/daemon/config.ts +++ b/packages/ozone/src/daemon/config.ts @@ -4,9 +4,11 @@ export interface DaemonConfigValues { version: string dbPostgresUrl: string dbPostgresSchema?: string - moderationPushUrl: string + serverDid: string appviewUrl: string - adminPassword: string + appviewDid?: string + pdsUrl?: string + pdsDid?: string } export class DaemonConfig { @@ -19,21 +21,23 @@ export class DaemonConfig { const dbPostgresSchema = overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA assert(dbPostgresUrl) - const moderationPushUrl = - overrides?.moderationPushUrl || process.env.MODERATION_PUSH_URL - assert(moderationPushUrl) + const serverDid = overrides?.serverDid || process.env.SERVER_DID + assert(serverDid) const appviewUrl = overrides?.appviewUrl || process.env.APPVIEW_URL assert(appviewUrl) - const adminPassword = overrides?.adminPassword || process.env.ADMIN_PASSWORD - assert(adminPassword) + const appviewDid = process.env.APPVIEW_DID + const pdsUrl = process.env.PDS_URL + const pdsDid = process.env.PDS_DID return new DaemonConfig({ version, dbPostgresUrl, dbPostgresSchema, - moderationPushUrl, + serverDid, appviewUrl, - adminPassword, + appviewDid, + pdsUrl, + pdsDid, ...stripUndefineds(overrides ?? {}), }) } @@ -50,16 +54,24 @@ export class DaemonConfig { return this.cfg.dbPostgresSchema } - get moderationPushUrl() { - return this.cfg.moderationPushUrl + get serverDid() { + return this.cfg.serverDid } get appviewUrl() { return this.cfg.appviewUrl } - get adminPassword() { - return this.cfg.adminPassword + get appviewDid() { + return this.cfg.appviewDid + } + + get pdsUrl() { + return this.cfg.pdsUrl + } + + get pdsDid() { + return this.cfg.pdsDid } } diff --git a/packages/ozone/src/daemon/event-pusher.ts b/packages/ozone/src/daemon/event-pusher.ts index 91e312eb702..a907480e4e2 100644 --- a/packages/ozone/src/daemon/event-pusher.ts +++ b/packages/ozone/src/daemon/event-pusher.ts @@ -6,6 +6,9 @@ import { RepoPushEvent } from '../db/schema/repo_push_event' import { RecordPushEvent } from '../db/schema/record_push_event' import { BlobPushEvent } from '../db/schema/blob_push_event' import { dbLogger } from '../logger' +import { InputSchema } from '../lexicon/types/com/atproto/admin/updateSubjectStatus' + +type EventSubject = InputSchema['subject'] type PollState = { timer?: NodeJS.Timer @@ -13,6 +16,17 @@ type PollState = { tries: number } +type AuthHeaders = { + headers: { + authorization: string + } +} + +type Service = { + agent: AtpAgent + did: string +} + export class EventPusher { destroyed = false @@ -29,11 +43,36 @@ export class EventPusher { tries: 0, } + appview: Service | undefined + pds: Service | undefined + constructor( public db: Database, - public appviewAgent: AtpAgent, - public moderationPushAgent: AtpAgent, - ) {} + public createAuthHeaders: (aud: string) => Promise, + services: { + appview?: { + url: string + did: string + } + pds?: { + url: string + did: string + } + }, + ) { + if (services.appview) { + this.appview = { + agent: new AtpAgent({ service: services.appview.url }), + did: services.appview.did, + } + } + if (services.pds) { + this.pds = { + agent: new AtpAgent({ service: services.pds.url }), + did: services.pds.did, + } + } + } start() { this.poll(this.repoPollState, () => this.pushRepoEvents()) @@ -133,122 +172,119 @@ export class EventPusher { }) } - private async pushToBoth( - fn: (agent: AtpAgent) => Promise, + private async updateSubjectOnAll( + subject: EventSubject, + takedownId: number | null, ): Promise { try { await Promise.all([ - retryHttp(() => fn(this.appviewAgent)), - retryHttp(() => fn(this.moderationPushAgent)), + this.appview + ? this.updateSubjectOnService(this.appview, subject, takedownId) + : Promise.resolve(), + this.pds + ? this.updateSubjectOnService(this.pds, subject, takedownId) + : Promise.resolve(), ]) return true } catch (err) { - console.log(err) + console.log('ERR: ', err) return false } } - async attemptRepoEvent(txn: Database, evt: RepoPushEvent) { - const succeeded = await this.pushToBoth((agent) => - agent.com.atproto.admin.updateSubjectStatus({ - subject: { - $type: 'com.atproto.admin.defs#repoRef', - did: evt.subjectDid, + private async updateSubjectOnService( + service: Service, + subject: EventSubject, + takedownId: number | null, + ) { + const auth = await this.createAuthHeaders(service.did) + return retryHttp(() => + service.agent.com.atproto.admin.updateSubjectStatus( + { + subject, + takedown: { + applied: !!takedownId, + ref: takedownId?.toString(), + }, }, - takedown: { - applied: !!evt.takedownId, - ref: evt.takedownId?.toString(), + { + ...auth, + encoding: 'application/json', }, - }), + ), ) - if (succeeded) { - await txn.db - .updateTable('repo_push_event') - .set({ confirmedAt: new Date() }) - .where('subjectDid', '=', evt.subjectDid) - .where('eventType', '=', evt.eventType) - .execute() - } else { - await txn.db - .updateTable('repo_push_event') - .set({ - lastAttempted: new Date(), - attempts: evt.attempts ?? 0 + 1, - }) - .where('subjectDid', '=', evt.subjectDid) - .where('eventType', '=', evt.eventType) - .execute() - } + } + + async attemptRepoEvent(txn: Database, evt: RepoPushEvent) { + const succeeded = await this.updateSubjectOnAll( + { + $type: 'com.atproto.admin.defs#repoRef', + did: evt.subjectDid, + }, + evt.takedownId, + ) + await txn.db + .updateTable('repo_push_event') + .set( + succeeded + ? { confirmedAt: new Date() } + : { + lastAttempted: new Date(), + attempts: evt.attempts ?? 0 + 1, + }, + ) + .where('subjectDid', '=', evt.subjectDid) + .where('eventType', '=', evt.eventType) + .execute() } async attemptRecordEvent(txn: Database, evt: RecordPushEvent) { - const succeeded = await this.pushToBoth((agent) => - agent.com.atproto.admin.updateSubjectStatus({ - subject: { - $type: 'com.atproto.repo.strongRef', - uri: evt.subjectUri, - cid: evt.subjectCid, - }, - takedown: { - applied: !!evt.takedownId, - ref: evt.takedownId?.toString(), - }, - }), + const succeeded = await this.updateSubjectOnAll( + { + $type: 'com.atproto.repo.strongRef', + uri: evt.subjectUri, + cid: evt.subjectCid, + }, + evt.takedownId, ) - if (succeeded) { - await txn.db - .updateTable('record_push_event') - .set({ confirmedAt: new Date() }) - .where('subjectUri', '=', evt.subjectUri) - .where('eventType', '=', evt.eventType) - .execute() - } else { - await txn.db - .updateTable('record_push_event') - .set({ - lastAttempted: new Date(), - attempts: evt.attempts ?? 0 + 1, - }) - .where('subjectUri', '=', evt.subjectUri) - .where('eventType', '=', evt.eventType) - .execute() - } + await txn.db + .updateTable('record_push_event') + .set( + succeeded + ? { confirmedAt: new Date() } + : { + lastAttempted: new Date(), + attempts: evt.attempts ?? 0 + 1, + }, + ) + .where('subjectUri', '=', evt.subjectUri) + .where('eventType', '=', evt.eventType) + .execute() } async attemptBlobEvent(txn: Database, evt: BlobPushEvent) { - const succeeded = await this.pushToBoth((agent) => - agent.com.atproto.admin.updateSubjectStatus({ - subject: { - $type: 'com.atproto.admin.defs#repoBlobRef', - did: evt.subjectDid, - cid: evt.subjectBlobCid, - }, - takedown: { - applied: !!evt.takedownId, - ref: evt.takedownId?.toString(), - }, - }), + const succeeded = await this.updateSubjectOnAll( + { + $type: 'com.atproto.admin.defs#repoBlobRef', + did: evt.subjectDid, + cid: evt.subjectBlobCid, + }, + evt.takedownId, ) - if (succeeded) { - await txn.db - .updateTable('blob_push_event') - .set({ confirmedAt: new Date() }) - .where('subjectDid', '=', evt.subjectDid) - .where('subjectBlobCid', '=', evt.subjectBlobCid) - .where('eventType', '=', evt.eventType) - .execute() - } else { - await txn.db - .updateTable('blob_push_event') - .set({ - lastAttempted: new Date(), - attempts: evt.attempts ?? 0 + 1, - }) - .where('subjectDid', '=', evt.subjectDid) - .where('subjectBlobCid', '=', evt.subjectBlobCid) - .where('eventType', '=', evt.eventType) - .execute() - } + await txn.db + .updateTable('blob_push_event') + .set( + succeeded + ? { confirmedAt: new Date() } + : { + lastAttempted: new Date(), + attempts: evt.attempts ?? 0 + 1, + }, + ) + .where('subjectDid', '=', evt.subjectDid) + .where('subjectBlobCid', '=', evt.subjectBlobCid) + .where('eventType', '=', evt.eventType) + .execute() } } diff --git a/packages/ozone/src/daemon/index.ts b/packages/ozone/src/daemon/index.ts index 53f20f5677d..3a6200f2bcd 100644 --- a/packages/ozone/src/daemon/index.ts +++ b/packages/ozone/src/daemon/index.ts @@ -2,32 +2,47 @@ import AtpAgent from '@atproto/api' import Database from '../db' import { DaemonConfig } from './config' import DaemonContext from './context' -import * as auth from '../auth' import { EventPusher } from './event-pusher' import { EventReverser } from './event-reverser' import { ModerationService } from '../mod-service' +import { Keypair } from '@atproto/crypto' +import { createServiceAuthHeaders } from '@atproto/xrpc-server' export { EventPusher } from './event-pusher' export { EventReverser } from './event-reverser' export class OzoneDaemon { constructor(public ctx: DaemonContext) {} - static create(opts: { db: Database; cfg: DaemonConfig }): OzoneDaemon { - const { db, cfg } = opts + static create(opts: { + db: Database + signingKey: Keypair + cfg: DaemonConfig + }): OzoneDaemon { + const { db, signingKey, cfg } = opts const appviewAgent = new AtpAgent({ service: cfg.appviewUrl }) - appviewAgent.api.setHeader( - 'authorization', - auth.buildBasicAuth('admin', cfg.adminPassword), - ) - const url = new URL(opts.cfg.moderationPushUrl) - const moderationPushAgent = new AtpAgent({ service: url.origin }) - moderationPushAgent.api.setHeader( - 'authorization', - auth.buildBasicAuth(url.username, url.password), - ) + const createAuthHeaders = (aud: string) => + createServiceAuthHeaders({ iss: cfg.serverDid, aud, keypair: signingKey }) - const modService = ModerationService.creator(appviewAgent) - const eventPusher = new EventPusher(db, appviewAgent, moderationPushAgent) + const appviewAuth = async () => + cfg.appviewDid ? createAuthHeaders(cfg.appviewDid) : undefined + + const modService = ModerationService.creator(appviewAgent, appviewAuth) + const eventPusher = new EventPusher(db, createAuthHeaders, { + appview: + cfg.appviewUrl && cfg.appviewDid + ? { + url: cfg.appviewUrl, + did: cfg.appviewDid, + } + : undefined, + pds: + cfg.pdsUrl && cfg.pdsDid + ? { + url: cfg.pdsUrl, + did: cfg.pdsDid, + } + : undefined, + }) const eventReverser = new EventReverser(db, modService) const ctx = new DaemonContext({ db, diff --git a/packages/ozone/src/index.ts b/packages/ozone/src/index.ts index cec20cbd870..a907f7133d9 100644 --- a/packages/ozone/src/index.ts +++ b/packages/ozone/src/index.ts @@ -17,9 +17,9 @@ import { BackgroundQueue } from './background' import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' import Database from './db' -import * as auth from './auth' import { OzoneDaemon } from './daemon' import { DaemonConfig } from './daemon/config' +import { createServiceAuthHeaders } from '@atproto/xrpc-server' export type { ServerConfigValues } from './config' export { ServerConfig } from './config' @@ -57,30 +57,42 @@ export class OzoneService { const backgroundQueue = new BackgroundQueue(db) const appviewAgent = new AtpAgent({ service: config.appviewUrl }) - appviewAgent.api.setHeader( - 'authorization', - auth.buildBasicAuth('admin', config.adminPassword), - ) + const pdsAgent = config.pdsUrl + ? new AtpAgent({ service: config.pdsUrl }) + : undefined - const modService = ModerationService.creator(appviewAgent) + const appviewAuth = async () => { + if (!config.appviewDid) return undefined + return createServiceAuthHeaders({ + iss: config.serverDid, + aud: config.appviewDid, + keypair: signingKey, + }) + } + + const modService = ModerationService.creator(appviewAgent, appviewAuth) const daemon = OzoneDaemon.create({ db, + signingKey, cfg: new DaemonConfig({ version: config.version, dbPostgresUrl: config.dbPostgresUrl, dbPostgresSchema: config.dbPostgresSchema, - moderationPushUrl: config.moderationPushUrl ?? '', + serverDid: config.serverDid, appviewUrl: config.appviewUrl, - adminPassword: config.adminPassword, + appviewDid: config.appviewDid, + pdsUrl: config.pdsUrl, + pdsDid: config.pdsDid, }), }) const ctx = new AppContext({ db, cfg: config, - appviewAgent, modService, + appviewAgent, + pdsAgent, signingKey, idResolver, backgroundQueue, diff --git a/packages/ozone/src/mod-service/index.ts b/packages/ozone/src/mod-service/index.ts index ccdc0094cfc..c6b6aeb6d5c 100644 --- a/packages/ozone/src/mod-service/index.ts +++ b/packages/ozone/src/mod-service/index.ts @@ -3,7 +3,7 @@ import { AtUri, INVALID_HANDLE } from '@atproto/syntax' import { InvalidRequestError } from '@atproto/xrpc-server' import { addHoursToDate } from '@atproto/common' import { Database } from '../db' -import { ModerationViews } from './views' +import { AppviewAuth, ModerationViews } from './views' import { Main as StrongRef } from '../lexicon/types/com/atproto/repo/strongRef' import { isModEventComment, @@ -40,13 +40,18 @@ import { export type ModerationServiceCreator = (db: Database) => ModerationService export class ModerationService { - constructor(public db: Database, public appviewAgent: AtpAgent) {} - - static creator(appviewAgent: AtpAgent) { - return (db: Database) => new ModerationService(db, appviewAgent) + constructor( + public db: Database, + public appviewAgent: AtpAgent, + private appviewAuth: AppviewAuth, + ) {} + + static creator(appviewAgent: AtpAgent, appviewAuth: AppviewAuth) { + return (db: Database) => + new ModerationService(db, appviewAgent, appviewAuth) } - views = new ModerationViews(this.db, this.appviewAgent) + views = new ModerationViews(this.db, this.appviewAgent, this.appviewAuth) async getEvent(id: number): Promise { return await this.db.db diff --git a/packages/ozone/src/mod-service/views.ts b/packages/ozone/src/mod-service/views.ts index 6a86723d955..a84162b6d5c 100644 --- a/packages/ozone/src/mod-service/views.ts +++ b/packages/ozone/src/mod-service/views.ts @@ -25,14 +25,32 @@ import { import { REASONOTHER } from '../lexicon/types/com/atproto/moderation/defs' import { subjectFromEventRow, subjectFromStatusRow } from './subject' +export type AppviewAuth = () => Promise< + | { + headers: { + authorization: string + } + } + | undefined +> + export class ModerationViews { - constructor(private db: Database, private appviewAgent: AtpAgent) {} + constructor( + private db: Database, + private appviewAgent: AtpAgent, + private appviewAuth: AppviewAuth, + ) {} async getAccoutInfosByDid(dids: string[]): Promise> { if (dids.length === 0) return new Map() - const res = await this.appviewAgent.api.com.atproto.admin.getAccountInfos({ - dids: dedupeStrs(dids), - }) + const auth = await this.appviewAuth() + if (!auth) return new Map() + const res = await this.appviewAgent.api.com.atproto.admin.getAccountInfos( + { + dids: dedupeStrs(dids), + }, + auth, + ) return res.data.infos.reduce((acc, cur) => { return acc.set(cur.did, cur) }, new Map()) @@ -201,16 +219,21 @@ export class ModerationViews { } > > { + const auth = await this.appviewAuth() + if (!auth) return new Map() const fetched = await Promise.all( subjects.map(async (subject) => { const uri = new AtUri(subject.uri) try { - return await this.appviewAgent.api.com.atproto.repo.getRecord({ - repo: uri.hostname, - collection: uri.collection, - rkey: uri.rkey, - cid: subject.cid, - }) + return await this.appviewAgent.api.com.atproto.repo.getRecord( + { + repo: uri.hostname, + collection: uri.collection, + rkey: uri.rkey, + cid: subject.cid, + }, + auth, + ) } catch { return null } @@ -425,17 +448,11 @@ export class ModerationViews { }) .selectAll() - const [statusRes, accountsRes] = await Promise.all([ + const [statusRes, accountsByDid] = await Promise.all([ builder.execute(), - this.appviewAgent.api.com.atproto.admin.getAccountInfos({ - dids: parsedSubjects.map((s) => s.did), - }), + this.getAccoutInfosByDid(parsedSubjects.map((s) => s.did)), ]) - const accountsByDid = accountsRes.data.infos.reduce((acc, cur) => { - return acc.set(cur.did, cur) - }, new Map()) - return statusRes.reduce((acc, cur) => { const subject = cur.recordPath ? formatSubjectId(cur.did, cur.recordPath) diff --git a/packages/pds/src/api/com/atproto/admin/getAccountInfo.ts b/packages/pds/src/api/com/atproto/admin/getAccountInfo.ts index 9953dc9d56b..7b94a8c57d8 100644 --- a/packages/pds/src/api/com/atproto/admin/getAccountInfo.ts +++ b/packages/pds/src/api/com/atproto/admin/getAccountInfo.ts @@ -1,15 +1,12 @@ import { Server } from '../../../../lexicon' import AppContext from '../../../../context' import { InvalidRequestError } from '@atproto/xrpc-server' -import { ensureValidAdminAud } from '../../../../auth-verifier' import { INVALID_HANDLE } from '@atproto/syntax' export default function (server: Server, ctx: AppContext) { server.com.atproto.admin.getAccountInfo({ auth: ctx.authVerifier.roleOrAdminService, - handler: async ({ params, auth }) => { - // any admin role auth can get account info, but verify aud on service jwt - ensureValidAdminAud(auth, params.did) + handler: async ({ params }) => { const [account, invites, invitedBy] = await Promise.all([ ctx.accountManager.getAccount(params.did, true), ctx.accountManager.getAccountInvitesCodes(params.did), diff --git a/packages/pds/src/api/com/atproto/admin/getSubjectStatus.ts b/packages/pds/src/api/com/atproto/admin/getSubjectStatus.ts index ad212391c58..767714cec36 100644 --- a/packages/pds/src/api/com/atproto/admin/getSubjectStatus.ts +++ b/packages/pds/src/api/com/atproto/admin/getSubjectStatus.ts @@ -4,12 +4,11 @@ import { InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../../../lexicon' import AppContext from '../../../../context' import { OutputSchema } from '../../../../lexicon/types/com/atproto/admin/getSubjectStatus' -import { ensureValidAdminAud } from '../../../../auth-verifier' export default function (server: Server, ctx: AppContext) { server.com.atproto.admin.getSubjectStatus({ auth: ctx.authVerifier.roleOrAdminService, - handler: async ({ params, auth }) => { + handler: async ({ params }) => { const { did, uri, blob } = params let body: OutputSchema | null = null if (blob) { @@ -18,7 +17,6 @@ export default function (server: Server, ctx: AppContext) { 'Must provide a did to request blob state', ) } - ensureValidAdminAud(auth, did) const takedown = await ctx.actorStore.read(did, (store) => store.repo.blob.getBlobTakedownStatus(CID.parse(blob)), ) @@ -34,7 +32,6 @@ export default function (server: Server, ctx: AppContext) { } } else if (uri) { const parsedUri = new AtUri(uri) - ensureValidAdminAud(auth, parsedUri.hostname) const [takedown, cid] = await ctx.actorStore.read( parsedUri.hostname, (store) => @@ -54,7 +51,6 @@ export default function (server: Server, ctx: AppContext) { } } } else if (did) { - ensureValidAdminAud(auth, did) const takedown = await ctx.accountManager.getAccountTakedownStatus(did) if (takedown) { body = { diff --git a/packages/pds/src/api/com/atproto/admin/updateSubjectStatus.ts b/packages/pds/src/api/com/atproto/admin/updateSubjectStatus.ts index 649f906cd7e..29991da2b2c 100644 --- a/packages/pds/src/api/com/atproto/admin/updateSubjectStatus.ts +++ b/packages/pds/src/api/com/atproto/admin/updateSubjectStatus.ts @@ -8,7 +8,6 @@ import { } from '../../../../lexicon/types/com/atproto/admin/defs' import { isMain as isStrongRef } from '../../../../lexicon/types/com/atproto/repo/strongRef' import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server' -import { ensureValidAdminAud } from '../../../../auth-verifier' export default function (server: Server, ctx: AppContext) { server.com.atproto.admin.updateSubjectStatus({ @@ -24,16 +23,13 @@ export default function (server: Server, ctx: AppContext) { const { subject, takedown } = input.body if (takedown) { if (isRepoRef(subject)) { - ensureValidAdminAud(auth, subject.did) await ctx.accountManager.takedownAccount(subject.did, takedown) } else if (isStrongRef(subject)) { const uri = new AtUri(subject.uri) - ensureValidAdminAud(auth, uri.hostname) await ctx.actorStore.transact(uri.hostname, (store) => store.record.updateRecordTakedownStatus(uri, takedown), ) } else if (isRepoBlobRef(subject)) { - ensureValidAdminAud(auth, subject.did) await ctx.actorStore.transact(subject.did, (store) => store.repo.blob.updateBlobTakedownStatus( CID.parse(subject.cid), diff --git a/packages/pds/src/auth-verifier.ts b/packages/pds/src/auth-verifier.ts index 7a09135a72c..dc5f0bc29d5 100644 --- a/packages/pds/src/auth-verifier.ts +++ b/packages/pds/src/auth-verifier.ts @@ -217,7 +217,7 @@ export class AuthVerifier { } const payload = await verifyServiceJwt( jwtStr, - null, + this.dids.entryway ?? this.dids.pds, async (did, forceRefresh) => { if (did !== this.dids.admin) { throw new AuthRequiredError( @@ -381,21 +381,6 @@ export const parseBasicAuth = ( return { username, password } } -export const ensureValidAdminAud = ( - auth: RoleOutput | AdminServiceOutput, - subjectDid: string, -) => { - if ( - auth.credentials.type === 'service' && - auth.credentials.aud !== subjectDid - ) { - throw new AuthRequiredError( - 'jwt audience does not match account did', - 'BadJwtAudience', - ) - } -} - const authScopes = new Set(Object.values(AuthScope)) const isAuthScope = (val: unknown): val is AuthScope => { return authScopes.has(val as any) diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 874ee276272..6a5b2927df1 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -189,7 +189,7 @@ export class AppContext { dids: { pds: cfg.service.did, entryway: cfg.entryway?.did, - admin: cfg.bskyAppView.did, + admin: cfg.modService.did, }, })