From be01cf0291121d6445789b0e8cbb29d236b14c5d Mon Sep 17 00:00:00 2001 From: Foysal Ahamed Date: Wed, 6 Mar 2024 00:13:54 +0000 Subject: [PATCH] :recycle: Refactor to use event_pusher table instead of new table --- packages/ozone/package.json | 3 +- .../src/api/admin/emitModerationEvent.ts | 5 + packages/ozone/src/config/config.ts | 17 +- packages/ozone/src/context.ts | 13 +- packages/ozone/src/daemon/blob-diverter.ts | 171 +++++------------- packages/ozone/src/daemon/context.ts | 16 +- packages/ozone/src/daemon/event-pusher.ts | 91 +++++++--- packages/ozone/src/daemon/index.ts | 3 - .../20240228T153256972Z-blob-divert-event.ts | 27 --- packages/ozone/src/db/migrations/index.ts | 1 - .../ozone/src/db/schema/blob_push_event.ts | 5 +- packages/ozone/src/mod-service/index.ts | 77 +++----- packages/ozone/tests/blob-divert.test.ts | 58 +++--- 13 files changed, 198 insertions(+), 289 deletions(-) delete mode 100644 packages/ozone/src/db/migrations/20240228T153256972Z-blob-divert-event.ts diff --git a/packages/ozone/package.json b/packages/ozone/package.json index 433f95eeb46..e76dc644f10 100644 --- a/packages/ozone/package.json +++ b/packages/ozone/package.json @@ -65,7 +65,6 @@ "@types/express-serve-static-core": "^4.17.36", "@types/pg": "^8.6.6", "@types/qs": "^6.9.7", - "axios": "^0.27.2", - "nock": "14.0.0-beta.4" + "axios": "^0.27.2" } } diff --git a/packages/ozone/src/api/admin/emitModerationEvent.ts b/packages/ozone/src/api/admin/emitModerationEvent.ts index ef4c5fd2822..748707eb47c 100644 --- a/packages/ozone/src/api/admin/emitModerationEvent.ts +++ b/packages/ozone/src/api/admin/emitModerationEvent.ts @@ -2,6 +2,7 @@ import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../lexicon' import AppContext from '../../context' import { + isModEventDivert, isModEventLabel, isModEventReverseTakedown, isModEventTakedown, @@ -91,6 +92,10 @@ export default function (server: Server, ctx: AppContext) { subjectStatus: result.subjectStatus, }) + if (isModEventDivert(event) && subject.isRecord()) { + await moderationTxn.divertBlobs(subject) + } + if (subject.isRepo()) { if (isTakedownEvent) { const isSuspend = !!result.event.durationInHours diff --git a/packages/ozone/src/config/config.ts b/packages/ozone/src/config/config.ts index 60f103fb6d7..490ce398b8b 100644 --- a/packages/ozone/src/config/config.ts +++ b/packages/ozone/src/config/config.ts @@ -43,10 +43,13 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => { plcUrl: env.didPlcUrl, } - const blobReportServiceCfg = { - url: env.blobReportServiceUrl, - authToken: env.blobReportServiceAuthToken, - } + const blobReportServiceCfg = + env.blobReportServiceUrl && env.blobReportServiceAuthToken + ? { + url: env.blobReportServiceUrl, + authToken: env.blobReportServiceAuthToken, + } + : undefined return { service: serviceCfg, @@ -64,7 +67,7 @@ export type OzoneConfig = { appview: AppviewConfig pds: PdsConfig | null identity: IdentityConfig - blobReportService: BlobReportServiceConfig + blobReportService?: BlobReportServiceConfig } export type ServiceConfig = { @@ -75,8 +78,8 @@ export type ServiceConfig = { } export type BlobReportServiceConfig = { - url?: string - authToken?: string + url: string + authToken: string } export type DatabaseConfig = { diff --git a/packages/ozone/src/context.ts b/packages/ozone/src/context.ts index 9018935d222..2d451a8a2fa 100644 --- a/packages/ozone/src/context.ts +++ b/packages/ozone/src/context.ts @@ -65,19 +65,20 @@ export class AppContext { cfg.appview.did ? createAuthHeaders(cfg.appview.did) : undefined const backgroundQueue = new BackgroundQueue(db) + const blobDiverter = cfg.blobReportService + ? new BlobDiverter(db, { + idResolver, + serviceConfig: cfg.blobReportService, + }) + : undefined const eventPusher = new EventPusher(db, createAuthHeaders, { appview: cfg.appview, pds: cfg.pds ?? undefined, + blobDiverter, }) - const blobDiverter = new BlobDiverter(db, { - idResolver, - serviceConfig: cfg.blobReportService, - }) - const modService = ModerationService.creator( backgroundQueue, eventPusher, - blobDiverter, appviewAgent, appviewAuth, cfg.service.did, diff --git a/packages/ozone/src/daemon/blob-diverter.ts b/packages/ozone/src/daemon/blob-diverter.ts index d8be3fcddf3..b49c1fe1796 100644 --- a/packages/ozone/src/daemon/blob-diverter.ts +++ b/packages/ozone/src/daemon/blob-diverter.ts @@ -1,5 +1,4 @@ import { - SECOND, VerifyCidTransform, forwardStreamErrors, getPdsEndpoint, @@ -14,18 +13,7 @@ import { retryHttp } from '../util' import { dbLogger } from '../logger' import { BlobReportServiceConfig } from '../config' -type PollState = { - timer?: NodeJS.Timer - promise: Promise -} - export class BlobDiverter { - destroyed = false - - pollState: PollState = { - promise: Promise.resolve(), - } - serviceConfig: BlobReportServiceConfig idResolver: IdResolver @@ -40,48 +28,6 @@ export class BlobDiverter { this.idResolver = services.idResolver } - start() { - this.poll(this.pollState, () => this.divertBlob()) - } - - poll(state: PollState, fn: () => Promise) { - if (this.destroyed) return - state.promise = fn() - .catch((err) => { - dbLogger.error({ err }, 'blob divert failed') - }) - .finally(() => { - state.timer = setTimeout(() => this.poll(state, fn), 30 * SECOND) - }) - } - - async processAll() { - await Promise.all([this.divertBlob(), this.pollState.promise]) - } - - async destroy() { - this.destroyed = true - const destroyState = (state: PollState) => { - if (state.timer) { - clearTimeout(state.timer) - } - return state.promise - } - await destroyState(this.pollState) - } - - async divertBlob() { - const toPush = await this.db.db - .selectFrom('blob_divert_event') - .select('id') - .forUpdate() - .skipLocked() - .where('divertedAt', 'is', null) - .where('attempts', '<', 10) - .execute() - await Promise.all(toPush.map((evt) => this.attemptBlobDivert(evt.id))) - } - private async getBlob({ pds, did, @@ -111,18 +57,15 @@ export class BlobDiverter { } } - private async uploadBlob( - { - imageStream, - contentType, - }: { imageStream: Readable; contentType: string }, - { subjectDid, subjectUri }: { subjectDid: string; subjectUri: string }, - ) { - if (!this.serviceConfig.authToken || !this.serviceConfig.url) { - return false - } - - const url = `${this.serviceConfig.url}?did=${subjectDid}&uri=${subjectUri}` + async sendImage({ + url, + imageStream, + contentType, + }: { + url: string + imageStream: Readable + contentType: string + }) { const result = await axios(url, { method: 'POST', data: imageStream, @@ -135,20 +78,38 @@ export class BlobDiverter { return result.status === 200 } - private async uploadBlobOnService({ + private async uploadBlob( + { + imageStream, + contentType, + }: { imageStream: Readable; contentType: string }, + { + subjectDid, + subjectUri, + }: { subjectDid: string; subjectUri: string | null }, + ) { + const url = new URL(this.serviceConfig.url) + url.searchParams.set('did', subjectDid) + if (subjectUri) url.searchParams.set('uri', subjectUri) + const result = await this.sendImage({ + url: url.toString(), + imageStream, + contentType, + }) + + return result + } + + async uploadBlobOnService({ subjectDid, subjectUri, subjectBlobCid, }: { subjectDid: string - subjectUri: string + subjectUri: string | null subjectBlobCid: string }): Promise { try { - if (!this.serviceConfig.authToken || !this.serviceConfig.url) { - throw new Error('Blob divert service not configured') - } - const didDoc = await this.idResolver.did.resolve(subjectDid) if (!didDoc) { @@ -161,16 +122,18 @@ export class BlobDiverter { throw new Error('Error resolving PDS') } - const { imageStream, contentType } = await retryHttp(() => - this.getBlob({ pds, did: subjectDid, cid: subjectBlobCid }), - ) - - const uploadResult = await retryHttp(() => - this.uploadBlob( + // attempt to download and upload within the same retry block since the imageStream is not reusable + const uploadResult = await retryHttp(async () => { + const { imageStream, contentType } = await this.getBlob({ + pds, + did: subjectDid, + cid: subjectBlobCid, + }) + return this.uploadBlob( { imageStream, contentType }, { subjectDid, subjectUri }, - ), - ) + ) + }) return uploadResult } catch (err) { @@ -178,52 +141,4 @@ export class BlobDiverter { return false } } - - async attemptBlobDivert(id: number) { - await this.db.transaction(async (dbTxn) => { - const evt = await dbTxn.db - .selectFrom('blob_divert_event') - .selectAll() - .forUpdate() - .skipLocked() - .where('id', '=', id) - .where('divertedAt', 'is', null) - .executeTakeFirst() - if (!evt) return - - const succeeded = await this.uploadBlobOnService(evt) - await dbTxn.db - .updateTable('blob_divert_event') - .set( - succeeded - ? { divertedAt: new Date() } - : { - lastAttempted: new Date(), - attempts: (evt.attempts ?? 0) + 1, - }, - ) - .where('subjectDid', '=', evt.subjectDid) - .where('subjectBlobCid', '=', evt.subjectBlobCid) - .execute() - }) - } - - async logDivertEvent(values: { - subjectDid: string - subjectUri: string - subjectBlobCid: string - }) { - return this.db.db - .insertInto('blob_divert_event') - .values(values) - .onConflict((oc) => - oc.columns(['subjectDid', 'subjectBlobCid']).doUpdateSet({ - divertedAt: null, - attempts: 0, - lastAttempted: null, - }), - ) - .returning('id') - .execute() - } } diff --git a/packages/ozone/src/daemon/context.ts b/packages/ozone/src/daemon/context.ts index 4ad53812388..266e7d4143a 100644 --- a/packages/ozone/src/daemon/context.ts +++ b/packages/ozone/src/daemon/context.ts @@ -16,7 +16,7 @@ export type DaemonContextOptions = { modService: ModerationServiceCreator signingKey: Keypair eventPusher: EventPusher - blobDiverter: BlobDiverter + blobDiverter?: BlobDiverter eventReverser: EventReverser } @@ -49,19 +49,21 @@ export class DaemonContext { const appviewAuth = async () => cfg.appview.did ? createAuthHeaders(cfg.appview.did) : undefined + const blobDiverter = cfg.blobReportService + ? new BlobDiverter(db, { + idResolver, + serviceConfig: cfg.blobReportService, + }) + : undefined const eventPusher = new EventPusher(db, createAuthHeaders, { appview: cfg.appview, pds: cfg.pds ?? undefined, - }) - const blobDiverter = new BlobDiverter(db, { - idResolver, - serviceConfig: cfg.blobReportService, + blobDiverter, }) const backgroundQueue = new BackgroundQueue(db) const modService = ModerationService.creator( backgroundQueue, eventPusher, - blobDiverter, appviewAgent, appviewAuth, cfg.service.did, @@ -96,7 +98,7 @@ export class DaemonContext { return this.opts.eventPusher } - get blobDiverter(): BlobDiverter { + get blobDiverter(): BlobDiverter | undefined { return this.opts.blobDiverter } diff --git a/packages/ozone/src/daemon/event-pusher.ts b/packages/ozone/src/daemon/event-pusher.ts index faaee4529ed..c09b9be1e0f 100644 --- a/packages/ozone/src/daemon/event-pusher.ts +++ b/packages/ozone/src/daemon/event-pusher.ts @@ -5,6 +5,9 @@ import { retryHttp } from '../util' import { dbLogger } from '../logger' import { InputSchema } from '../lexicon/types/com/atproto/admin/updateSubjectStatus' import assert from 'assert' +import { BlobPushEvent } from '../db/schema/blob_push_event' +import { Insertable, Selectable } from 'kysely' +import { BlobDiverter } from './blob-diverter' type EventSubject = InputSchema['subject'] @@ -39,6 +42,7 @@ export class EventPusher { appview: Service | undefined pds: Service | undefined + blobDiverter: BlobDiverter | undefined constructor( public db: Database, @@ -52,8 +56,10 @@ export class EventPusher { url: string did: string } + blobDiverter }, ) { + this.blobDiverter = services.blobDiverter if (services.appview) { this.appview = { agent: new AtpAgent({ service: services.appview.url }), @@ -265,32 +271,67 @@ export class EventPusher { .executeTakeFirst() if (!evt) return - const service = evt.eventType === 'pds_takedown' ? this.pds : this.appview - assert(service) - const subject = { - $type: 'com.atproto.admin.defs#repoBlobRef', - did: evt.subjectDid, - cid: evt.subjectBlobCid, - } - const succeeded = await this.updateSubjectOnService( - service, - subject, - evt.takedownRef, - ) - await dbTxn.db - .updateTable('blob_push_event') - .set( - succeeded - ? { confirmedAt: new Date() } - : { - lastAttempted: new Date(), - attempts: evt.attempts ?? 0 + 1, - }, + let succeeded = false + if (evt.eventType === 'blob_divert') { + succeeded = await (this.blobDiverter + ? this.blobDiverter.uploadBlobOnService(evt) + : Promise.resolve(false)) + } else { + const service = + evt.eventType === 'pds_takedown' ? this.pds : this.appview + assert(service) + const subject = { + $type: 'com.atproto.admin.defs#repoBlobRef', + did: evt.subjectDid, + cid: evt.subjectBlobCid, + } + succeeded = await this.updateSubjectOnService( + service, + subject, + evt.takedownRef, ) - .where('subjectDid', '=', evt.subjectDid) - .where('subjectBlobCid', '=', evt.subjectBlobCid) - .where('eventType', '=', evt.eventType) - .execute() + } + await this.markBlobEventAttempt(dbTxn, evt, succeeded) }) } + + async markBlobEventAttempt( + dbTxn: Database, + event: Selectable, + succeeded: boolean, + ) { + await dbTxn.db + .updateTable('blob_push_event') + .set( + succeeded + ? { confirmedAt: new Date() } + : { + lastAttempted: new Date(), + attempts: (event.attempts ?? 0) + 1, + }, + ) + .where('subjectDid', '=', event.subjectDid) + .where('subjectBlobCid', '=', event.subjectBlobCid) + .where('eventType', '=', event.eventType) + .execute() + } + + async logBlobPushEvent( + blobValues: Insertable[], + takedownRef?: string | null, + ) { + return this.db.db + .insertInto('blob_push_event') + .values(blobValues) + .onConflict((oc) => + oc.columns(['subjectDid', 'subjectBlobCid', 'eventType']).doUpdateSet({ + takedownRef, + confirmedAt: null, + attempts: 0, + lastAttempted: null, + }), + ) + .returning('id') + .execute() + } } diff --git a/packages/ozone/src/daemon/index.ts b/packages/ozone/src/daemon/index.ts index 8f1d67273f2..501b8caad5c 100644 --- a/packages/ozone/src/daemon/index.ts +++ b/packages/ozone/src/daemon/index.ts @@ -20,18 +20,15 @@ export class OzoneDaemon { async start() { this.ctx.eventPusher.start() this.ctx.eventReverser.start() - this.ctx.blobDiverter.start() } async processAll() { await this.ctx.eventPusher.processAll() - await this.ctx.blobDiverter.processAll() } async destroy() { await this.ctx.eventReverser.destroy() await this.ctx.eventPusher.destroy() - await this.ctx.blobDiverter.destroy() await this.ctx.db.close() } } diff --git a/packages/ozone/src/db/migrations/20240228T153256972Z-blob-divert-event.ts b/packages/ozone/src/db/migrations/20240228T153256972Z-blob-divert-event.ts deleted file mode 100644 index 933cd3df4b5..00000000000 --- a/packages/ozone/src/db/migrations/20240228T153256972Z-blob-divert-event.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { Kysely } from 'kysely' - -export async function up(db: Kysely): Promise { - await db.schema - .createTable('blob_divert_event') - .addColumn('id', 'serial', (col) => col.primaryKey()) - .addColumn('subjectDid', 'varchar', (col) => col.notNull()) - .addColumn('subjectBlobCid', 'varchar', (col) => col.notNull()) - .addColumn('subjectUri', 'varchar') - .addColumn('divertedAt', 'timestamptz') - .addColumn('lastAttempted', 'timestamptz') - .addColumn('attempts', 'integer', (col) => col.notNull().defaultTo(0)) - .addUniqueConstraint('blob_divert_event_unique_evt', [ - 'subjectDid', - 'subjectBlobCid', - ]) - .execute() - await db.schema - .createIndex('blob_divert_unique_idx') - .on('blob_divert_event') - .columns(['divertedAt', 'attempts']) - .execute() -} - -export async function down(db: Kysely): Promise { - await db.schema.dropTable('blob_divert_event').execute() -} diff --git a/packages/ozone/src/db/migrations/index.ts b/packages/ozone/src/db/migrations/index.ts index 4ccaa5791f5..1a823f860c5 100644 --- a/packages/ozone/src/db/migrations/index.ts +++ b/packages/ozone/src/db/migrations/index.ts @@ -6,4 +6,3 @@ export * as _20231219T205730722Z from './20231219T205730722Z-init' export * as _20240116T085607200Z from './20240116T085607200Z-communication-template' export * as _20240201T051104136Z from './20240201T051104136Z-mod-event-blobs' export * as _20240208T213404429Z from './20240208T213404429Z-add-tags-column-to-moderation-subject' -export * as _20240228T153256972Z from './20240228T153256972Z-blob-divert-event' diff --git a/packages/ozone/src/db/schema/blob_push_event.ts b/packages/ozone/src/db/schema/blob_push_event.ts index f38649e675c..d4cd8e8b482 100644 --- a/packages/ozone/src/db/schema/blob_push_event.ts +++ b/packages/ozone/src/db/schema/blob_push_event.ts @@ -2,7 +2,10 @@ import { Generated } from 'kysely' export const eventTableName = 'blob_push_event' -export type BlobPushEventType = 'pds_takedown' | 'appview_takedown' +export type BlobPushEventType = + | 'pds_takedown' + | 'appview_takedown' + | 'blob_divert' export interface BlobPushEvent { id: Generated diff --git a/packages/ozone/src/mod-service/index.ts b/packages/ozone/src/mod-service/index.ts index 988703f70e3..e31f29e10a8 100644 --- a/packages/ozone/src/mod-service/index.ts +++ b/packages/ozone/src/mod-service/index.ts @@ -15,7 +15,6 @@ import { isModEventTag, RepoRef, RepoBlobRef, - isModEventDivert, } from '../lexicon/types/com/atproto/admin/defs' import { adjustModerationSubjectStatus, @@ -44,7 +43,6 @@ import { BlobPushEvent } from '../db/schema/blob_push_event' import { BackgroundQueue } from '../background' import { EventPusher } from '../daemon' import { jsonb } from '../db/types' -import { BlobDiverter } from '../daemon/blob-diverter' import { LabelChannel } from '../db/schema/label' export type ModerationServiceCreator = (db: Database) => ModerationService @@ -54,7 +52,6 @@ export class ModerationService { public db: Database, public backgroundQueue: BackgroundQueue, public eventPusher: EventPusher, - public blobDiverter: BlobDiverter, public appviewAgent: AtpAgent, private appviewAuth: AppviewAuth, public serverDid: string, @@ -63,7 +60,6 @@ export class ModerationService { static creator( backgroundQueue: BackgroundQueue, eventPusher: EventPusher, - blobDiverter: BlobDiverter, appviewAgent: AtpAgent, appviewAuth: AppviewAuth, serverDid: string, @@ -73,7 +69,6 @@ export class ModerationService { db, backgroundQueue, eventPusher, - blobDiverter, appviewAgent, appviewAuth, serverDid, @@ -327,33 +322,6 @@ export class ModerationService { subject.blobCids, ) - if ( - isModEventDivert(event) && - subjectInfo.subjectUri && - subjectInfo.subjectBlobCids?.length - ) { - const blobDiverts = await Promise.all( - subjectInfo.subjectBlobCids?.map((subjectBlobCid) => - this.blobDiverter.logDivertEvent({ - subjectDid: subjectInfo.subjectDid, - subjectBlobCid: subjectBlobCid, - // TODO: Check is done above already - // @ts-ignore - subjectUri: subjectInfo.subjectUri, - }), - ), - ) - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await Promise.all( - blobDiverts.map((divert) => - this.blobDiverter.attemptBlobDivert(divert[0].id), - ), - ) - }) - }) - } - return { event: modEvent, subjectStatus } } @@ -569,27 +537,17 @@ export class ModerationService { for (const cid of blobCids) { blobValues.push({ eventType, + takedownRef, subjectDid: subject.did, + subjectUri: subject.uri || null, subjectBlobCid: cid.toString(), - takedownRef, }) } } - const blobEvts = await this.db.db - .insertInto('blob_push_event') - .values(blobValues) - .onConflict((oc) => - oc - .columns(['subjectDid', 'subjectBlobCid', 'eventType']) - .doUpdateSet({ - takedownRef, - confirmedAt: null, - attempts: 0, - lastAttempted: null, - }), - ) - .returning('id') - .execute() + const blobEvts = await this.eventPusher.logBlobPushEvent( + blobValues, + takedownRef, + ) this.db.onCommit(() => { this.backgroundQueue.add(async () => { @@ -601,6 +559,29 @@ export class ModerationService { } } + async divertBlobs(subject: RecordSubject) { + const subjectInfo = subject.info() + + const blobDiverts = await this.eventPusher.logBlobPushEvent( + subjectInfo.subjectBlobCids.map((subjectBlobCid) => ({ + subjectDid: subjectInfo.subjectDid, + subjectBlobCid: subjectBlobCid, + subjectUri: subjectInfo.subjectUri, + eventType: 'blob_divert', + })), + ) + + this.db.onCommit(() => { + this.backgroundQueue.add(async () => { + await Promise.all( + blobDiverts.map((divert) => + this.eventPusher.attemptBlobEvent(divert[0].id), + ), + ) + }) + }) + } + async reverseTakedownRecord(subject: RecordSubject) { this.db.assertTransaction() const labels: string[] = [UNSPECCED_TAKEDOWN_LABEL] diff --git a/packages/ozone/tests/blob-divert.test.ts b/packages/ozone/tests/blob-divert.test.ts index 193e9518b21..6d57e6d0473 100644 --- a/packages/ozone/tests/blob-divert.test.ts +++ b/packages/ozone/tests/blob-divert.test.ts @@ -1,10 +1,6 @@ import { SeedClient, TestNetwork, basicSeed } from '@atproto/dev-env' import AtpAgent from '@atproto/api' -import nock from 'nock' - -const BLOB_DIVERT_SERVICE_HOST = 'http://blob-report.com' -const BLOB_DIVERT_SERVICE_PATH = '/xrpc/com.atproto.unspecced.reportBlob' -const BLOB_DIVERT_SERVICE_AUTH_TOKEN = 'Basic test' +import { BlobDiverter } from '../src/daemon' describe('blob divert', () => { let network: TestNetwork @@ -15,8 +11,8 @@ describe('blob divert', () => { network = await TestNetwork.create({ dbPostgresSchema: 'ozone_blob_divert_test', ozone: { - blobReportServiceUrl: `${BLOB_DIVERT_SERVICE_HOST}${BLOB_DIVERT_SERVICE_PATH}`, - blobReportServiceAuthToken: BLOB_DIVERT_SERVICE_AUTH_TOKEN, + blobReportServiceUrl: `https://blob-report.com`, + blobReportServiceAuthToken: 'test-auth-token', }, }) agent = network.pds.getClient() @@ -29,25 +25,22 @@ describe('blob divert', () => { await network.close() }) - afterEach(() => { - nock.cleanAll() - }) - - const mockReportServiceResponse = ( - status: number, - data: Record, - ) => - nock(BLOB_DIVERT_SERVICE_HOST) - .persist() - .post(BLOB_DIVERT_SERVICE_PATH, () => true) - .query(true) - .reply(status, data) + const mockReportServiceResponse = (result: boolean) => { + return jest + .spyOn(BlobDiverter.prototype, 'sendImage') + .mockImplementation(async () => { + return result + }) + // nock(BLOB_DIVERT_SERVICE_HOST) + // .persist() + // .post(BLOB_DIVERT_SERVICE_PATH, () => true) + // .query(true) + // .reply(status, data) + } it('fails and keeps attempt count when report service fails to accept upload.', async () => { - // Simulate failure to accept upload - const reportServiceRequest = mockReportServiceResponse(401, { - success: false, - }) + // Simulate failure to fail upload + const reportServiceRequest = mockReportServiceResponse(false) await agent.api.com.atproto.admin.emitModerationEvent( { @@ -71,31 +64,28 @@ describe('blob divert', () => { await network.ozone.processAll() const divertEvents = await network.ozone.ctx.db.db - .selectFrom('blob_divert_event') + .selectFrom('blob_push_event') .selectAll() .execute() expect(divertEvents[0].attempts).toBeGreaterThan(0) expect(divertEvents[1].attempts).toBeGreaterThan(0) - reportServiceRequest.done() + expect(reportServiceRequest).toHaveBeenCalled() }) it('sends blobs to configured divert service and marks divert date', async () => { // Simulate failure to accept upload - const reportServiceRequest = mockReportServiceResponse(200, { - success: true, - }) + const reportServiceRequest = mockReportServiceResponse(true) await network.ozone.processAll() const divertEvents = await network.ozone.ctx.db.db - .selectFrom('blob_divert_event') + .selectFrom('blob_push_event') .selectAll() .execute() - expect(divertEvents[0].divertedAt).toBeTruthy() - expect(divertEvents[1].divertedAt).toBeTruthy() - reportServiceRequest.done() - reportServiceRequest.persist(false) + expect(divertEvents[0].confirmedAt).toBeTruthy() + expect(divertEvents[1].confirmedAt).toBeTruthy() + expect(reportServiceRequest).toHaveBeenCalled() }) })