From 4834ea914b481e074f43985059b183b67383c0be Mon Sep 17 00:00:00 2001 From: dholms Date: Wed, 20 Dec 2023 16:19:37 -0600 Subject: [PATCH] pds fanout working --- packages/dev-env/src/ozone.ts | 3 +- packages/ozone/src/context.ts | 19 +-- packages/ozone/src/daemon/config.ts | 10 ++ packages/ozone/src/daemon/context.ts | 6 + packages/ozone/src/daemon/event-pusher.ts | 133 ++++++++++-------- packages/ozone/src/daemon/index.ts | 24 +++- .../db/migrations/20231219T205730722Z-init.ts | 12 +- packages/ozone/src/index.ts | 18 ++- .../ozone/src/services/moderation/index.ts | 1 - packages/ozone/tests/moderation.test.ts | 10 ++ 10 files changed, 161 insertions(+), 75 deletions(-) diff --git a/packages/dev-env/src/ozone.ts b/packages/dev-env/src/ozone.ts index 4bcf70386c5..4ab3d745339 100644 --- a/packages/dev-env/src/ozone.ts +++ b/packages/dev-env/src/ozone.ts @@ -106,7 +106,8 @@ export class TestOzone { } async processAll() { - await Promise.all([this.ctx.backgroundQueue.processAll()]) + await this.ctx.backgroundQueue.processAll() + await this.ctx.daemon?.processAll() } async close() { diff --git a/packages/ozone/src/context.ts b/packages/ozone/src/context.ts index 3a21758bba7..67906df1b4e 100644 --- a/packages/ozone/src/context.ts +++ b/packages/ozone/src/context.ts @@ -8,6 +8,7 @@ import { ServerConfig } from './config' import { Services } from './services' import * as auth from './auth' import { BackgroundQueue } from './background' +import { OzoneDaemon } from './daemon' export class AppContext { public moderationPushAgent: AtpAgent | undefined @@ -15,12 +16,12 @@ export class AppContext { private opts: { db: Database appviewAgent: AtpAgent - searchAgent: AtpAgent cfg: ServerConfig services: Services signingKey: Keypair idResolver: IdResolver backgroundQueue: BackgroundQueue + daemon?: OzoneDaemon }, ) { if (opts.cfg.moderationPushUrl) { @@ -49,10 +50,6 @@ export class AppContext { return this.opts.appviewAgent } - get searchAgent(): AtpAgent { - return this.opts.searchAgent - } - get signingKey(): Keypair { return this.opts.signingKey } @@ -65,6 +62,14 @@ export class AppContext { return this.opts.idResolver } + get backgroundQueue(): BackgroundQueue { + return this.opts.backgroundQueue + } + + get daemon(): OzoneDaemon | undefined { + return this.opts.daemon + } + get authVerifier() { return auth.authVerifier(this.idResolver, { aud: this.cfg.serverDid }) } @@ -99,10 +104,6 @@ export class AppContext { keypair: this.signingKey, }) } - - get backgroundQueue(): BackgroundQueue { - return this.opts.backgroundQueue - } } export default AppContext diff --git a/packages/ozone/src/daemon/config.ts b/packages/ozone/src/daemon/config.ts index b0db59d04fb..6a042e10aa6 100644 --- a/packages/ozone/src/daemon/config.ts +++ b/packages/ozone/src/daemon/config.ts @@ -4,6 +4,7 @@ export interface DaemonConfigValues { version: string dbPostgresUrl: string dbPostgresSchema?: string + moderationPushUrl: string appviewUrl: string adminPassword: string } @@ -18,14 +19,19 @@ export class DaemonConfig { const dbPostgresSchema = overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA assert(dbPostgresUrl) + const moderationPushUrl = + overrides?.moderationPushUrl || process.env.MODERATION_PUSH_URL + assert(moderationPushUrl) const appviewUrl = overrides?.appviewUrl || process.env.APPVIEW_URL assert(appviewUrl) const adminPassword = overrides?.adminPassword || process.env.ADMIN_PASSWORD assert(adminPassword) + return new DaemonConfig({ version, dbPostgresUrl, dbPostgresSchema, + moderationPushUrl, appviewUrl, adminPassword, ...stripUndefineds(overrides ?? {}), @@ -44,6 +50,10 @@ export class DaemonConfig { return this.cfg.dbPostgresSchema } + get moderationPushUrl() { + return this.cfg.moderationPushUrl + } + get appviewUrl() { return this.cfg.appviewUrl } diff --git a/packages/ozone/src/daemon/context.ts b/packages/ozone/src/daemon/context.ts index 4206a15399d..003825f11fe 100644 --- a/packages/ozone/src/daemon/context.ts +++ b/packages/ozone/src/daemon/context.ts @@ -1,6 +1,7 @@ import { DaemonConfig } from './config' import { Database } from '../db' import { Services } from '../services' +import { EventPusher } from './event-pusher' export class DaemonContext { constructor( @@ -8,6 +9,7 @@ export class DaemonContext { db: Database cfg: DaemonConfig services: Services + eventPusher: EventPusher }, ) {} @@ -22,6 +24,10 @@ export class DaemonContext { get services(): Services { return this.opts.services } + + get eventPusher(): EventPusher { + return this.opts.eventPusher + } } export default DaemonContext diff --git a/packages/ozone/src/daemon/event-pusher.ts b/packages/ozone/src/daemon/event-pusher.ts index 89faca071ed..fda09c70cb9 100644 --- a/packages/ozone/src/daemon/event-pusher.ts +++ b/packages/ozone/src/daemon/event-pusher.ts @@ -1,10 +1,10 @@ import AtpAgent from '@atproto/api' +import { SECOND, wait } from '@atproto/common' import Database from '../db' import { retryHttp } from '../util/retry' import { RepoPushEvent } from '../db/schema/repo_push_event' import { RecordPushEvent } from '../db/schema/record_push_event' import { BlobPushEvent } from '../db/schema/blob_push_event' -import { SECOND, wait } from '@atproto/common' type PollState = { promise: Promise @@ -27,7 +27,11 @@ export class EventPusher { tries: 0, } - constructor(public db: Database, public appviewAgent: AtpAgent) {} + constructor( + public db: Database, + public appviewAgent: AtpAgent, + public moderationPushAgent: AtpAgent, + ) {} start() { this.repoPollState.promise = this.poll(this.repoPollState, () => @@ -41,6 +45,23 @@ export class EventPusher { ) } + async processAll() { + await Promise.all([ + this.pushRepoEvents(), + this.pushRecordEvents(), + this.pushBlobEvents(), + ]) + } + + async destroy() { + this.destroyed = true + await Promise.all([ + this.repoPollState.promise, + this.recordPollState.promise, + this.blobPollState.promise, + ]) + } + async poll(state: PollState, fn: () => Promise) { if (this.destroyed) return let hadEvts: boolean @@ -105,25 +126,34 @@ export class EventPusher { }) } - async attemptRepoEvent(txn: Database, evt: RepoPushEvent) { - let succeeded: boolean + private async pushToBoth( + fn: (agent: AtpAgent) => Promise, + ): Promise { try { - await retryHttp(() => - this.appviewAgent.com.atproto.admin.updateSubjectStatus({ - subject: { - $type: 'com.atproto.admin.defs#repoRef', - did: evt.subjectDid, - }, - takedown: { - applied: !!evt.takedownId, - ref: evt.takedownId?.toString(), - }, - }), - ) - succeeded = true - } catch { - succeeded = false + await Promise.all([ + // retryHttp(() => fn(this.appviewAgent)), + retryHttp(() => fn(this.moderationPushAgent)), + ]) + return true + } catch (err) { + console.log(err) + return false } + } + + async attemptRepoEvent(txn: Database, evt: RepoPushEvent) { + const succeeded = await this.pushToBoth((agent) => + agent.com.atproto.admin.updateSubjectStatus({ + subject: { + $type: 'com.atproto.admin.defs#repoRef', + did: evt.subjectDid, + }, + takedown: { + applied: !!evt.takedownId, + ref: evt.takedownId?.toString(), + }, + }), + ) if (succeeded) { await txn.db .updateTable('repo_push_event') @@ -145,25 +175,19 @@ export class EventPusher { } async attemptRecordEvent(txn: Database, evt: RecordPushEvent) { - let succeeded: boolean - try { - await retryHttp(() => - this.appviewAgent.com.atproto.admin.updateSubjectStatus({ - subject: { - $type: 'com.atproto.repo.strongRef', - uri: evt.subjectUri, - cid: evt.subjectCid, - }, - takedown: { - applied: !!evt.takedownId, - ref: evt.takedownId?.toString(), - }, - }), - ) - succeeded = true - } catch { - succeeded = false - } + const succeeded = await this.pushToBoth((agent) => + agent.com.atproto.admin.updateSubjectStatus({ + subject: { + $type: 'com.atproto.repo.strongRef', + uri: evt.subjectUri, + cid: evt.subjectCid, + }, + takedown: { + applied: !!evt.takedownId, + ref: evt.takedownId?.toString(), + }, + }), + ) if (succeeded) { await txn.db .updateTable('record_push_event') @@ -185,26 +209,19 @@ export class EventPusher { } async attemptBlobEvent(txn: Database, evt: BlobPushEvent) { - let succeeded: boolean - try { - await retryHttp(() => - this.appviewAgent.com.atproto.admin.updateSubjectStatus({ - subject: { - $type: 'com.atproto.admin.defs#repoBlobRef', - did: evt.subjectDid, - cid: evt.subjectBlobCid, - recordUri: evt.subjectUri, - }, - takedown: { - applied: !!evt.takedownId, - ref: evt.takedownId?.toString(), - }, - }), - ) - succeeded = true - } catch { - succeeded = false - } + const succeeded = await this.pushToBoth((agent) => + agent.com.atproto.admin.updateSubjectStatus({ + subject: { + $type: 'com.atproto.admin.defs#repoBlobRef', + did: evt.subjectDid, + cid: evt.subjectBlobCid, + }, + takedown: { + applied: !!evt.takedownId, + ref: evt.takedownId?.toString(), + }, + }), + ) if (succeeded) { await txn.db .updateTable('blob_push_event') diff --git a/packages/ozone/src/daemon/index.ts b/packages/ozone/src/daemon/index.ts index dd0320b703a..783aab3cfc8 100644 --- a/packages/ozone/src/daemon/index.ts +++ b/packages/ozone/src/daemon/index.ts @@ -4,6 +4,7 @@ import { createServices } from '../services' import { DaemonConfig } from './config' import DaemonContext from './context' import * as auth from '../auth' +import { EventPusher } from './event-pusher' export class OzoneDaemon { constructor(private ctx: DaemonContext) {} @@ -14,15 +15,34 @@ export class OzoneDaemon { 'authorization', auth.buildBasicAuth('admin', cfg.adminPassword), ) + const url = new URL(opts.cfg.moderationPushUrl) + const moderationPushAgent = new AtpAgent({ service: url.origin }) + moderationPushAgent.api.setHeader( + 'authorization', + auth.buildBasicAuth(url.username, url.password), + ) const services = createServices(appviewAgent) + const eventPusher = new EventPusher(db, appviewAgent, moderationPushAgent) const ctx = new DaemonContext({ db, cfg, services, + eventPusher, }) return new OzoneDaemon(ctx) } - async start() {} - async destroy() {} + + async start() { + this.ctx.eventPusher.start() + } + + async processAll() { + await this.ctx.eventPusher.processAll() + } + + async destroy() { + await this.ctx.eventPusher.destroy() + await this.ctx.db.close() + } } diff --git a/packages/ozone/src/db/migrations/20231219T205730722Z-init.ts b/packages/ozone/src/db/migrations/20231219T205730722Z-init.ts index f719e284241..ec9cb974287 100644 --- a/packages/ozone/src/db/migrations/20231219T205730722Z-init.ts +++ b/packages/ozone/src/db/migrations/20231219T205730722Z-init.ts @@ -86,7 +86,9 @@ export async function up(db: Kysely): Promise { .addColumn('eventType', 'varchar', (col) => col.notNull()) .addColumn('subjectDid', 'varchar', (col) => col.notNull()) .addColumn('takedownId', 'integer') - .addColumn('confirmedAt', 'varchar') + .addColumn('confirmedAt', 'timestamptz') + .addColumn('lastAttempted', 'timestamptz') + .addColumn('attempts', 'integer') .addPrimaryKeyConstraint('repo_push_event_pkey', [ 'subjectDid', 'eventType', @@ -100,7 +102,9 @@ export async function up(db: Kysely): Promise { .addColumn('subjectUri', 'varchar', (col) => col.notNull()) .addColumn('subjectCid', 'varchar') .addColumn('takedownId', 'integer') - .addColumn('confirmedAt', 'varchar') + .addColumn('confirmedAt', 'timestamptz') + .addColumn('lastAttempted', 'timestamptz') + .addColumn('attempts', 'integer') .addPrimaryKeyConstraint('record_push_event_pkey', [ 'subjectUri', 'eventType', @@ -119,7 +123,9 @@ export async function up(db: Kysely): Promise { .addColumn('subjectBlobCid', 'varchar', (col) => col.notNull()) .addColumn('subjectUri', 'varchar') .addColumn('takedownId', 'integer') - .addColumn('confirmedAt', 'varchar') + .addColumn('confirmedAt', 'timestamptz') + .addColumn('lastAttempted', 'timestamptz') + .addColumn('attempts', 'integer') .addPrimaryKeyConstraint('blob_push_event_pkey', [ 'subjectDid', 'subjectBlobCid', diff --git a/packages/ozone/src/index.ts b/packages/ozone/src/index.ts index d52beceb1f8..8720aee6b53 100644 --- a/packages/ozone/src/index.ts +++ b/packages/ozone/src/index.ts @@ -18,6 +18,8 @@ import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' import Database from './db' import * as auth from './auth' +import { OzoneDaemon } from './daemon' +import { DaemonConfig } from './daemon/config' export type { ServerConfigValues } from './config' export { ServerConfig } from './config' @@ -56,7 +58,6 @@ export class OzoneService { const backgroundQueue = new BackgroundQueue(db) const appviewAgent = new AtpAgent({ service: config.appviewUrl }) - appviewAgent.api.setHeader( 'authorization', auth.buildBasicAuth('admin', config.adminPassword), @@ -64,6 +65,18 @@ export class OzoneService { const services = createServices(appviewAgent) + const daemon = OzoneDaemon.create({ + db, + cfg: new DaemonConfig({ + version: config.version, + dbPostgresUrl: config.dbPrimaryPostgresUrl, + dbPostgresSchema: config.dbPostgresSchema, + moderationPushUrl: config.moderationPushUrl ?? '', + appviewUrl: config.appviewUrl, + adminPassword: config.adminPassword, + }), + }) + const ctx = new AppContext({ db, cfg: config, @@ -72,6 +85,7 @@ export class OzoneService { signingKey, idResolver, backgroundQueue, + daemon, }) let server = createServer({ @@ -112,6 +126,7 @@ export class OzoneService { 'background queue stats', ) }, 10000) + await this.ctx.daemon?.start() const server = this.app.listen(this.ctx.cfg.port) this.server = server server.keepAliveTimeout = 90000 @@ -125,6 +140,7 @@ export class OzoneService { async destroy(): Promise { await this.terminator?.terminate() await this.ctx.backgroundQueue.destroy() + await this.ctx.daemon?.destroy() await this.ctx.db.close() clearInterval(this.dbStatsInterval) } diff --git a/packages/ozone/src/services/moderation/index.ts b/packages/ozone/src/services/moderation/index.ts index 0e9e1a9a190..4cc5b802213 100644 --- a/packages/ozone/src/services/moderation/index.ts +++ b/packages/ozone/src/services/moderation/index.ts @@ -13,7 +13,6 @@ import { isModEventEmail, RepoRef, RepoBlobRef, - AccountView, } from '../../lexicon/types/com/atproto/admin/defs' import { addHoursToDate } from '../../util/date' import { diff --git a/packages/ozone/tests/moderation.test.ts b/packages/ozone/tests/moderation.test.ts index 69ecd141925..c5a7492f433 100644 --- a/packages/ozone/tests/moderation.test.ts +++ b/packages/ozone/tests/moderation.test.ts @@ -629,10 +629,12 @@ describe('moderation', () => { }), ).rejects.toThrow('Subject is not taken down') }) + it('fans out repo takedowns to pds', async () => { await performTakedown({ account: sc.dids.bob, }) + await ozone.processAll() const res1 = await pdsAgent.api.com.atproto.admin.getSubjectStatus( { @@ -644,6 +646,7 @@ describe('moderation', () => { // cleanup await performReverseTakedown({ account: sc.dids.bob }) + await ozone.processAll() const res2 = await pdsAgent.api.com.atproto.admin.getSubjectStatus( { @@ -661,6 +664,7 @@ describe('moderation', () => { await performTakedown({ content: { uri, cid }, }) + await ozone.processAll() const res1 = await pdsAgent.api.com.atproto.admin.getSubjectStatus( { uri }, { headers: network.pds.adminAuthHeaders() }, @@ -669,6 +673,7 @@ describe('moderation', () => { // cleanup await performReverseTakedown({ content: { uri, cid } }) + await ozone.processAll() const res2 = await pdsAgent.api.com.atproto.admin.getSubjectStatus( { uri }, @@ -737,6 +742,7 @@ describe('moderation', () => { // right away without having to wait n number of hours for a successful assertion durationInHours: -1, }) + await ozone.processAll() const { data: statusesAfterTakedown } = await agent.api.com.atproto.admin.queryModerationStatuses( @@ -754,6 +760,7 @@ describe('moderation', () => { network.ozone.ctx, ) await periodicReversal.findAndRevertDueActions() + await ozone.processAll() const [{ data: eventList }, { data: statuses }] = await Promise.all([ agent.api.com.atproto.admin.queryModerationEvents( @@ -875,6 +882,7 @@ describe('moderation', () => { }, subjectBlobCids: [blob.image.ref.toString()], }) + await ozone.processAll() }) it('sets blobCids in moderation status', async () => { @@ -923,6 +931,8 @@ describe('moderation', () => { subjectBlobCids: [blob.image.ref.toString()], }) + await ozone.processAll() + // Can resolve blob const blobPath = `/blob/${sc.dids.carol}/${blob.image.ref.toString()}` const resolveBlob = await fetch(`${network.bsky.url}${blobPath}`)