Skip to content

Commit

Permalink
pds fanout working
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Dec 20, 2023
1 parent 5ecbe43 commit 4834ea9
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 75 deletions.
3 changes: 2 additions & 1 deletion packages/dev-env/src/ozone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ export class TestOzone {
}

async processAll() {
await Promise.all([this.ctx.backgroundQueue.processAll()])
await this.ctx.backgroundQueue.processAll()
await this.ctx.daemon?.processAll()
}

async close() {
Expand Down
19 changes: 10 additions & 9 deletions packages/ozone/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ import { ServerConfig } from './config'
import { Services } from './services'
import * as auth from './auth'
import { BackgroundQueue } from './background'
import { OzoneDaemon } from './daemon'

export class AppContext {
public moderationPushAgent: AtpAgent | undefined
constructor(
private opts: {
db: Database
appviewAgent: AtpAgent
searchAgent: AtpAgent
cfg: ServerConfig
services: Services
signingKey: Keypair
idResolver: IdResolver
backgroundQueue: BackgroundQueue
daemon?: OzoneDaemon
},
) {
if (opts.cfg.moderationPushUrl) {
Expand Down Expand Up @@ -49,10 +50,6 @@ export class AppContext {
return this.opts.appviewAgent
}

get searchAgent(): AtpAgent {
return this.opts.searchAgent
}

get signingKey(): Keypair {
return this.opts.signingKey
}
Expand All @@ -65,6 +62,14 @@ export class AppContext {
return this.opts.idResolver
}

get backgroundQueue(): BackgroundQueue {
return this.opts.backgroundQueue
}

get daemon(): OzoneDaemon | undefined {
return this.opts.daemon
}

get authVerifier() {
return auth.authVerifier(this.idResolver, { aud: this.cfg.serverDid })
}
Expand Down Expand Up @@ -99,10 +104,6 @@ export class AppContext {
keypair: this.signingKey,
})
}

get backgroundQueue(): BackgroundQueue {
return this.opts.backgroundQueue
}
}

export default AppContext
10 changes: 10 additions & 0 deletions packages/ozone/src/daemon/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export interface DaemonConfigValues {
version: string
dbPostgresUrl: string
dbPostgresSchema?: string
moderationPushUrl: string
appviewUrl: string
adminPassword: string
}
Expand All @@ -18,14 +19,19 @@ export class DaemonConfig {
const dbPostgresSchema =
overrides?.dbPostgresSchema || process.env.DB_POSTGRES_SCHEMA
assert(dbPostgresUrl)
const moderationPushUrl =
overrides?.moderationPushUrl || process.env.MODERATION_PUSH_URL
assert(moderationPushUrl)
const appviewUrl = overrides?.appviewUrl || process.env.APPVIEW_URL
assert(appviewUrl)
const adminPassword = overrides?.adminPassword || process.env.ADMIN_PASSWORD
assert(adminPassword)

return new DaemonConfig({
version,
dbPostgresUrl,
dbPostgresSchema,
moderationPushUrl,
appviewUrl,
adminPassword,
...stripUndefineds(overrides ?? {}),
Expand All @@ -44,6 +50,10 @@ export class DaemonConfig {
return this.cfg.dbPostgresSchema
}

get moderationPushUrl() {
return this.cfg.moderationPushUrl
}

get appviewUrl() {
return this.cfg.appviewUrl
}
Expand Down
6 changes: 6 additions & 0 deletions packages/ozone/src/daemon/context.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { DaemonConfig } from './config'
import { Database } from '../db'
import { Services } from '../services'
import { EventPusher } from './event-pusher'

export class DaemonContext {
constructor(
private opts: {
db: Database
cfg: DaemonConfig
services: Services
eventPusher: EventPusher
},
) {}

Expand All @@ -22,6 +24,10 @@ export class DaemonContext {
get services(): Services {
return this.opts.services
}

get eventPusher(): EventPusher {
return this.opts.eventPusher
}
}

export default DaemonContext
133 changes: 75 additions & 58 deletions packages/ozone/src/daemon/event-pusher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import AtpAgent from '@atproto/api'
import { SECOND, wait } from '@atproto/common'
import Database from '../db'
import { retryHttp } from '../util/retry'
import { RepoPushEvent } from '../db/schema/repo_push_event'
import { RecordPushEvent } from '../db/schema/record_push_event'
import { BlobPushEvent } from '../db/schema/blob_push_event'
import { SECOND, wait } from '@atproto/common'

type PollState = {
promise: Promise<void>
Expand All @@ -27,7 +27,11 @@ export class EventPusher {
tries: 0,
}

constructor(public db: Database, public appviewAgent: AtpAgent) {}
constructor(
public db: Database,
public appviewAgent: AtpAgent,
public moderationPushAgent: AtpAgent,
) {}

start() {
this.repoPollState.promise = this.poll(this.repoPollState, () =>
Expand All @@ -41,6 +45,23 @@ export class EventPusher {
)
}

async processAll() {
await Promise.all([
this.pushRepoEvents(),
this.pushRecordEvents(),
this.pushBlobEvents(),
])
}

async destroy() {
this.destroyed = true
await Promise.all([
this.repoPollState.promise,
this.recordPollState.promise,
this.blobPollState.promise,
])
}

async poll(state: PollState, fn: () => Promise<boolean>) {
if (this.destroyed) return
let hadEvts: boolean
Expand Down Expand Up @@ -105,25 +126,34 @@ export class EventPusher {
})
}

async attemptRepoEvent(txn: Database, evt: RepoPushEvent) {
let succeeded: boolean
private async pushToBoth(
fn: (agent: AtpAgent) => Promise<unknown>,
): Promise<boolean> {
try {
await retryHttp(() =>
this.appviewAgent.com.atproto.admin.updateSubjectStatus({
subject: {
$type: 'com.atproto.admin.defs#repoRef',
did: evt.subjectDid,
},
takedown: {
applied: !!evt.takedownId,
ref: evt.takedownId?.toString(),
},
}),
)
succeeded = true
} catch {
succeeded = false
await Promise.all([
// retryHttp(() => fn(this.appviewAgent)),
retryHttp(() => fn(this.moderationPushAgent)),
])
return true
} catch (err) {
console.log(err)
return false
}
}

async attemptRepoEvent(txn: Database, evt: RepoPushEvent) {
const succeeded = await this.pushToBoth((agent) =>
agent.com.atproto.admin.updateSubjectStatus({
subject: {
$type: 'com.atproto.admin.defs#repoRef',
did: evt.subjectDid,
},
takedown: {
applied: !!evt.takedownId,
ref: evt.takedownId?.toString(),
},
}),
)
if (succeeded) {
await txn.db
.updateTable('repo_push_event')
Expand All @@ -145,25 +175,19 @@ export class EventPusher {
}

async attemptRecordEvent(txn: Database, evt: RecordPushEvent) {
let succeeded: boolean
try {
await retryHttp(() =>
this.appviewAgent.com.atproto.admin.updateSubjectStatus({
subject: {
$type: 'com.atproto.repo.strongRef',
uri: evt.subjectUri,
cid: evt.subjectCid,
},
takedown: {
applied: !!evt.takedownId,
ref: evt.takedownId?.toString(),
},
}),
)
succeeded = true
} catch {
succeeded = false
}
const succeeded = await this.pushToBoth((agent) =>
agent.com.atproto.admin.updateSubjectStatus({
subject: {
$type: 'com.atproto.repo.strongRef',
uri: evt.subjectUri,
cid: evt.subjectCid,
},
takedown: {
applied: !!evt.takedownId,
ref: evt.takedownId?.toString(),
},
}),
)
if (succeeded) {
await txn.db
.updateTable('record_push_event')
Expand All @@ -185,26 +209,19 @@ export class EventPusher {
}

async attemptBlobEvent(txn: Database, evt: BlobPushEvent) {
let succeeded: boolean
try {
await retryHttp(() =>
this.appviewAgent.com.atproto.admin.updateSubjectStatus({
subject: {
$type: 'com.atproto.admin.defs#repoBlobRef',
did: evt.subjectDid,
cid: evt.subjectBlobCid,
recordUri: evt.subjectUri,
},
takedown: {
applied: !!evt.takedownId,
ref: evt.takedownId?.toString(),
},
}),
)
succeeded = true
} catch {
succeeded = false
}
const succeeded = await this.pushToBoth((agent) =>
agent.com.atproto.admin.updateSubjectStatus({
subject: {
$type: 'com.atproto.admin.defs#repoBlobRef',
did: evt.subjectDid,
cid: evt.subjectBlobCid,
},
takedown: {
applied: !!evt.takedownId,
ref: evt.takedownId?.toString(),
},
}),
)
if (succeeded) {
await txn.db
.updateTable('blob_push_event')
Expand Down
24 changes: 22 additions & 2 deletions packages/ozone/src/daemon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createServices } from '../services'
import { DaemonConfig } from './config'
import DaemonContext from './context'
import * as auth from '../auth'
import { EventPusher } from './event-pusher'

export class OzoneDaemon {
constructor(private ctx: DaemonContext) {}
Expand All @@ -14,15 +15,34 @@ export class OzoneDaemon {
'authorization',
auth.buildBasicAuth('admin', cfg.adminPassword),
)
const url = new URL(opts.cfg.moderationPushUrl)
const moderationPushAgent = new AtpAgent({ service: url.origin })
moderationPushAgent.api.setHeader(
'authorization',
auth.buildBasicAuth(url.username, url.password),
)

const services = createServices(appviewAgent)
const eventPusher = new EventPusher(db, appviewAgent, moderationPushAgent)
const ctx = new DaemonContext({
db,
cfg,
services,
eventPusher,
})
return new OzoneDaemon(ctx)
}
async start() {}
async destroy() {}

async start() {
this.ctx.eventPusher.start()
}

async processAll() {
await this.ctx.eventPusher.processAll()
}

async destroy() {
await this.ctx.eventPusher.destroy()
await this.ctx.db.close()
}
}
12 changes: 9 additions & 3 deletions packages/ozone/src/db/migrations/20231219T205730722Z-init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ export async function up(db: Kysely<unknown>): Promise<void> {
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('subjectDid', 'varchar', (col) => col.notNull())
.addColumn('takedownId', 'integer')
.addColumn('confirmedAt', 'varchar')
.addColumn('confirmedAt', 'timestamptz')
.addColumn('lastAttempted', 'timestamptz')
.addColumn('attempts', 'integer')
.addPrimaryKeyConstraint('repo_push_event_pkey', [
'subjectDid',
'eventType',
Expand All @@ -100,7 +102,9 @@ export async function up(db: Kysely<unknown>): Promise<void> {
.addColumn('subjectUri', 'varchar', (col) => col.notNull())
.addColumn('subjectCid', 'varchar')
.addColumn('takedownId', 'integer')
.addColumn('confirmedAt', 'varchar')
.addColumn('confirmedAt', 'timestamptz')
.addColumn('lastAttempted', 'timestamptz')
.addColumn('attempts', 'integer')
.addPrimaryKeyConstraint('record_push_event_pkey', [
'subjectUri',
'eventType',
Expand All @@ -119,7 +123,9 @@ export async function up(db: Kysely<unknown>): Promise<void> {
.addColumn('subjectBlobCid', 'varchar', (col) => col.notNull())
.addColumn('subjectUri', 'varchar')
.addColumn('takedownId', 'integer')
.addColumn('confirmedAt', 'varchar')
.addColumn('confirmedAt', 'timestamptz')
.addColumn('lastAttempted', 'timestamptz')
.addColumn('attempts', 'integer')
.addPrimaryKeyConstraint('blob_push_event_pkey', [
'subjectDid',
'subjectBlobCid',
Expand Down
Loading

0 comments on commit 4834ea9

Please sign in to comment.