Skip to content

Commit

Permalink
refactor event reversal
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Dec 21, 2023
1 parent c1df5af commit c880002
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 235 deletions.
3 changes: 3 additions & 0 deletions packages/dev-env/src/ozone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ export class TestOzone {

await server.start()

// don't do event reversal in dev-env
await server.ctx.daemon?.ctx.eventReverser.destroy()

return new TestOzone(url, port, server)
}

Expand Down
6 changes: 6 additions & 0 deletions packages/ozone/src/daemon/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { DaemonConfig } from './config'
import { Database } from '../db'
import { Services } from '../services'
import { EventPusher } from './event-pusher'
import { EventReverser } from './event-reverser'

export class DaemonContext {
constructor(
Expand All @@ -10,6 +11,7 @@ export class DaemonContext {
cfg: DaemonConfig
services: Services
eventPusher: EventPusher
eventReverser: EventReverser
},
) {}

Expand All @@ -28,6 +30,10 @@ export class DaemonContext {
get eventPusher(): EventPusher {
return this.opts.eventPusher
}

get eventReverser(): EventReverser {
return this.opts.eventReverser
}
}

export default DaemonContext
69 changes: 69 additions & 0 deletions packages/ozone/src/daemon/event-reverser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { MINUTE, wait } from '@atproto/common'
import { dbLogger } from '../logger'
import { ReversalSubject } from '../services/moderation'
import Database from '../db'
import { Services } from '../services'

export class EventReverser {
destroyed = false
reversalPromise: Promise<void> = Promise.resolve()

constructor(private db: Database, private services: Services) {}

start() {
this.reversalPromise = this.poll()
}

async poll() {
if (this.destroyed) return
try {
await this.findAndRevertDueActions()
} catch (err) {
dbLogger.error({ err }, 'moderation action reversal errored')
}
await waitForInterval()
this.reversalPromise = this.poll()
}

async destroy() {
this.destroyed = true
await this.reversalPromise
}

async revertState(subject: ReversalSubject) {
await this.db.transaction(async (dbTxn) => {
const moderationTxn = this.services.moderation(dbTxn)
const originalEvent =
await moderationTxn.getLastReversibleEventForSubject(subject)
if (originalEvent) {
await moderationTxn.revertState({
action: originalEvent.action,
createdBy: originalEvent.createdBy,
comment:
'[SCHEDULED_REVERSAL] Reverting action as originally scheduled',
subject: subject.subject,
createdAt: new Date(),
})
}
})
}

async findAndRevertDueActions() {
const moderationService = this.services.moderation(this.db)
const subjectsDueForReversal =
await moderationService.getSubjectsDueForReversal()

// We shouldn't have too many actions due for reversal at any given time, so running in parallel is probably fine
// Internally, each reversal runs within its own transaction
await Promise.all(subjectsDueForReversal.map(this.revertState.bind(this)))
}
}

const waitForInterval = async () => {
// super basic synchronization by agreeing when the intervals land relative to unix timestamp
const now = Date.now()
const intervalMs = MINUTE
const nextIteration = Math.ceil(now / intervalMs)
const nextInMs = nextIteration * intervalMs - now
await wait(nextInMs)
}
10 changes: 9 additions & 1 deletion packages/ozone/src/daemon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ import { DaemonConfig } from './config'
import DaemonContext from './context'
import * as auth from '../auth'
import { EventPusher } from './event-pusher'
import { EventReverser } from './event-reverser'

export { EventPusher } from './event-pusher'
export { EventReverser } from './event-reverser'

export class OzoneDaemon {
constructor(private ctx: DaemonContext) {}
constructor(public ctx: DaemonContext) {}
static create(opts: { db: Database; cfg: DaemonConfig }): OzoneDaemon {
const { db, cfg } = opts
const appviewAgent = new AtpAgent({ service: cfg.appviewUrl })
Expand All @@ -24,24 +28,28 @@ export class OzoneDaemon {

const services = createServices(appviewAgent)
const eventPusher = new EventPusher(db, appviewAgent, moderationPushAgent)
const eventReverser = new EventReverser(db, services)
const ctx = new DaemonContext({
db,
cfg,
services,
eventPusher,
eventReverser,
})
return new OzoneDaemon(ctx)
}

async start() {
this.ctx.eventPusher.start()
this.ctx.eventReverser.start()
}

async processAll() {
await this.ctx.eventPusher.processAll()
}

async destroy() {
await this.ctx.eventReverser.destroy()
await this.ctx.eventPusher.destroy()
await this.ctx.db.close()
}
Expand Down
63 changes: 0 additions & 63 deletions packages/ozone/src/db/leader.ts

This file was deleted.

86 changes: 0 additions & 86 deletions packages/ozone/src/db/periodic-moderation-event-reversal.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/ozone/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { DaemonConfig } from './daemon/config'
export type { ServerConfigValues } from './config'
export { ServerConfig } from './config'
export { Database } from './db'
export { PeriodicModerationEventReversal } from './db/periodic-moderation-event-reversal'
export { OzoneDaemon, EventPusher, EventReverser } from './daemon'
export { AppContext } from './context'

export class OzoneService {
Expand Down
4 changes: 2 additions & 2 deletions packages/ozone/src/services/moderation/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CID } from 'multiformats/cid'
import { AtUri } from '@atproto/syntax'
import { AtUri, INVALID_HANDLE } from '@atproto/syntax'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Database } from '../../db'
import { ModerationViews } from './views'
Expand Down Expand Up @@ -549,7 +549,7 @@ export class ModerationService {
)
const resultsWithHandles = results.map((r) => ({
...r,
handle: infos.get(r.did)?.handle,
handle: infos.get(r.did)?.handle ?? INVALID_HANDLE,
}))

return {
Expand Down
Loading

0 comments on commit c880002

Please sign in to comment.