Skip to content

Commit

Permalink
♻️ Refactor to use event_pusher table instead of new table
Browse files Browse the repository at this point in the history
  • Loading branch information
foysalit committed Mar 6, 2024
1 parent 071712c commit be01cf0
Show file tree
Hide file tree
Showing 13 changed files with 198 additions and 289 deletions.
3 changes: 1 addition & 2 deletions packages/ozone/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
"@types/express-serve-static-core": "^4.17.36",
"@types/pg": "^8.6.6",
"@types/qs": "^6.9.7",
"axios": "^0.27.2",
"nock": "14.0.0-beta.4"
"axios": "^0.27.2"
}
}
5 changes: 5 additions & 0 deletions packages/ozone/src/api/admin/emitModerationEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../lexicon'
import AppContext from '../../context'
import {
isModEventDivert,
isModEventLabel,
isModEventReverseTakedown,
isModEventTakedown,
Expand Down Expand Up @@ -91,6 +92,10 @@ export default function (server: Server, ctx: AppContext) {
subjectStatus: result.subjectStatus,
})

if (isModEventDivert(event) && subject.isRecord()) {
await moderationTxn.divertBlobs(subject)
}

if (subject.isRepo()) {
if (isTakedownEvent) {
const isSuspend = !!result.event.durationInHours
Expand Down
17 changes: 10 additions & 7 deletions packages/ozone/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => {
plcUrl: env.didPlcUrl,
}

const blobReportServiceCfg = {
url: env.blobReportServiceUrl,
authToken: env.blobReportServiceAuthToken,
}
const blobReportServiceCfg =
env.blobReportServiceUrl && env.blobReportServiceAuthToken
? {
url: env.blobReportServiceUrl,
authToken: env.blobReportServiceAuthToken,
}
: undefined

return {
service: serviceCfg,
Expand All @@ -64,7 +67,7 @@ export type OzoneConfig = {
appview: AppviewConfig
pds: PdsConfig | null
identity: IdentityConfig
blobReportService: BlobReportServiceConfig
blobReportService?: BlobReportServiceConfig
}

export type ServiceConfig = {
Expand All @@ -75,8 +78,8 @@ export type ServiceConfig = {
}

export type BlobReportServiceConfig = {
url?: string
authToken?: string
url: string
authToken: string
}

export type DatabaseConfig = {
Expand Down
13 changes: 7 additions & 6 deletions packages/ozone/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,20 @@ export class AppContext {
cfg.appview.did ? createAuthHeaders(cfg.appview.did) : undefined

const backgroundQueue = new BackgroundQueue(db)
const blobDiverter = cfg.blobReportService
? new BlobDiverter(db, {
idResolver,
serviceConfig: cfg.blobReportService,
})
: undefined
const eventPusher = new EventPusher(db, createAuthHeaders, {
appview: cfg.appview,
pds: cfg.pds ?? undefined,
blobDiverter,
})
const blobDiverter = new BlobDiverter(db, {
idResolver,
serviceConfig: cfg.blobReportService,
})

const modService = ModerationService.creator(
backgroundQueue,
eventPusher,
blobDiverter,
appviewAgent,
appviewAuth,
cfg.service.did,
Expand Down
171 changes: 43 additions & 128 deletions packages/ozone/src/daemon/blob-diverter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {
SECOND,
VerifyCidTransform,
forwardStreamErrors,
getPdsEndpoint,
Expand All @@ -14,18 +13,7 @@ import { retryHttp } from '../util'
import { dbLogger } from '../logger'
import { BlobReportServiceConfig } from '../config'

type PollState = {
timer?: NodeJS.Timer
promise: Promise<void>
}

export class BlobDiverter {
destroyed = false

pollState: PollState = {
promise: Promise.resolve(),
}

serviceConfig: BlobReportServiceConfig
idResolver: IdResolver

Expand All @@ -40,48 +28,6 @@ export class BlobDiverter {
this.idResolver = services.idResolver
}

start() {
this.poll(this.pollState, () => this.divertBlob())
}

poll(state: PollState, fn: () => Promise<void>) {
if (this.destroyed) return
state.promise = fn()
.catch((err) => {
dbLogger.error({ err }, 'blob divert failed')
})
.finally(() => {
state.timer = setTimeout(() => this.poll(state, fn), 30 * SECOND)
})
}

async processAll() {
await Promise.all([this.divertBlob(), this.pollState.promise])
}

async destroy() {
this.destroyed = true
const destroyState = (state: PollState) => {
if (state.timer) {
clearTimeout(state.timer)
}
return state.promise
}
await destroyState(this.pollState)
}

async divertBlob() {
const toPush = await this.db.db
.selectFrom('blob_divert_event')
.select('id')
.forUpdate()
.skipLocked()
.where('divertedAt', 'is', null)
.where('attempts', '<', 10)
.execute()
await Promise.all(toPush.map((evt) => this.attemptBlobDivert(evt.id)))
}

private async getBlob({
pds,
did,
Expand Down Expand Up @@ -111,18 +57,15 @@ export class BlobDiverter {
}
}

private async uploadBlob(
{
imageStream,
contentType,
}: { imageStream: Readable; contentType: string },
{ subjectDid, subjectUri }: { subjectDid: string; subjectUri: string },
) {
if (!this.serviceConfig.authToken || !this.serviceConfig.url) {
return false
}

const url = `${this.serviceConfig.url}?did=${subjectDid}&uri=${subjectUri}`
async sendImage({
url,
imageStream,
contentType,
}: {
url: string
imageStream: Readable
contentType: string
}) {
const result = await axios(url, {
method: 'POST',
data: imageStream,
Expand All @@ -135,20 +78,38 @@ export class BlobDiverter {
return result.status === 200
}

private async uploadBlobOnService({
private async uploadBlob(
{
imageStream,
contentType,
}: { imageStream: Readable; contentType: string },
{
subjectDid,
subjectUri,
}: { subjectDid: string; subjectUri: string | null },
) {
const url = new URL(this.serviceConfig.url)
url.searchParams.set('did', subjectDid)
if (subjectUri) url.searchParams.set('uri', subjectUri)
const result = await this.sendImage({
url: url.toString(),
imageStream,
contentType,
})

return result
}

async uploadBlobOnService({
subjectDid,
subjectUri,
subjectBlobCid,
}: {
subjectDid: string
subjectUri: string
subjectUri: string | null
subjectBlobCid: string
}): Promise<boolean> {
try {
if (!this.serviceConfig.authToken || !this.serviceConfig.url) {
throw new Error('Blob divert service not configured')
}

const didDoc = await this.idResolver.did.resolve(subjectDid)

if (!didDoc) {
Expand All @@ -161,69 +122,23 @@ export class BlobDiverter {
throw new Error('Error resolving PDS')
}

const { imageStream, contentType } = await retryHttp(() =>
this.getBlob({ pds, did: subjectDid, cid: subjectBlobCid }),
)

const uploadResult = await retryHttp(() =>
this.uploadBlob(
// attempt to download and upload within the same retry block since the imageStream is not reusable
const uploadResult = await retryHttp(async () => {
const { imageStream, contentType } = await this.getBlob({
pds,
did: subjectDid,
cid: subjectBlobCid,
})
return this.uploadBlob(
{ imageStream, contentType },
{ subjectDid, subjectUri },
),
)
)
})

return uploadResult
} catch (err) {
dbLogger.error({ err }, 'failed to upload diverted blob')
return false
}
}

async attemptBlobDivert(id: number) {
await this.db.transaction(async (dbTxn) => {
const evt = await dbTxn.db
.selectFrom('blob_divert_event')
.selectAll()
.forUpdate()
.skipLocked()
.where('id', '=', id)
.where('divertedAt', 'is', null)
.executeTakeFirst()
if (!evt) return

const succeeded = await this.uploadBlobOnService(evt)
await dbTxn.db
.updateTable('blob_divert_event')
.set(
succeeded
? { divertedAt: new Date() }
: {
lastAttempted: new Date(),
attempts: (evt.attempts ?? 0) + 1,
},
)
.where('subjectDid', '=', evt.subjectDid)
.where('subjectBlobCid', '=', evt.subjectBlobCid)
.execute()
})
}

async logDivertEvent(values: {
subjectDid: string
subjectUri: string
subjectBlobCid: string
}) {
return this.db.db
.insertInto('blob_divert_event')
.values(values)
.onConflict((oc) =>
oc.columns(['subjectDid', 'subjectBlobCid']).doUpdateSet({
divertedAt: null,
attempts: 0,
lastAttempted: null,
}),
)
.returning('id')
.execute()
}
}
16 changes: 9 additions & 7 deletions packages/ozone/src/daemon/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export type DaemonContextOptions = {
modService: ModerationServiceCreator
signingKey: Keypair
eventPusher: EventPusher
blobDiverter: BlobDiverter
blobDiverter?: BlobDiverter
eventReverser: EventReverser
}

Expand Down Expand Up @@ -49,19 +49,21 @@ export class DaemonContext {
const appviewAuth = async () =>
cfg.appview.did ? createAuthHeaders(cfg.appview.did) : undefined

const blobDiverter = cfg.blobReportService
? new BlobDiverter(db, {
idResolver,
serviceConfig: cfg.blobReportService,
})
: undefined
const eventPusher = new EventPusher(db, createAuthHeaders, {
appview: cfg.appview,
pds: cfg.pds ?? undefined,
})
const blobDiverter = new BlobDiverter(db, {
idResolver,
serviceConfig: cfg.blobReportService,
blobDiverter,
})
const backgroundQueue = new BackgroundQueue(db)
const modService = ModerationService.creator(
backgroundQueue,
eventPusher,
blobDiverter,
appviewAgent,
appviewAuth,
cfg.service.did,
Expand Down Expand Up @@ -96,7 +98,7 @@ export class DaemonContext {
return this.opts.eventPusher
}

get blobDiverter(): BlobDiverter {
get blobDiverter(): BlobDiverter | undefined {
return this.opts.blobDiverter
}

Expand Down
Loading

0 comments on commit be01cf0

Please sign in to comment.