diff --git a/.github/workflows/build-and-push-ozone-aws.yaml b/.github/workflows/build-and-push-ozone-aws.yaml index 53f95c5b731..ff8162bb941 100644 --- a/.github/workflows/build-and-push-ozone-aws.yaml +++ b/.github/workflows/build-and-push-ozone-aws.yaml @@ -3,6 +3,7 @@ on: push: branches: - main + - divert-blobs env: REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} diff --git a/lexicons/tools/ozone/moderation/defs.json b/lexicons/tools/ozone/moderation/defs.json index e8420d2bd32..e88ac98303e 100644 --- a/lexicons/tools/ozone/moderation/defs.json +++ b/lexicons/tools/ozone/moderation/defs.json @@ -26,7 +26,8 @@ "#modEventEscalate", "#modEventMute", "#modEventEmail", - "#modEventResolveAppeal" + "#modEventResolveAppeal", + "#modEventDivert" ] }, "subject": { @@ -67,7 +68,8 @@ "#modEventEscalate", "#modEventMute", "#modEventEmail", - "#modEventResolveAppeal" + "#modEventResolveAppeal", + "#modEventDivert" ] }, "subject": { @@ -317,6 +319,13 @@ } } }, + "modEventDivert": { + "type": "object", + "description": "Divert a record's blobs to a 3rd party service for further scanning/tagging", + "properties": { + "comment": { "type": "string" } + } + }, "modEventTag": { "type": "object", "description": "Add/Remove a tag on a subject", diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index 3f46bf6b17e..e9a40ffd7dc 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -7976,6 +7976,7 @@ export const schemaDict = { 'lex:tools.ozone.moderation.defs#modEventMute', 'lex:tools.ozone.moderation.defs#modEventEmail', 'lex:tools.ozone.moderation.defs#modEventResolveAppeal', + 'lex:tools.ozone.moderation.defs#modEventDivert', ], }, subject: { @@ -8034,6 +8035,7 @@ export const schemaDict = { 'lex:tools.ozone.moderation.defs#modEventMute', 'lex:tools.ozone.moderation.defs#modEventEmail', 'lex:tools.ozone.moderation.defs#modEventResolveAppeal', + 'lex:tools.ozone.moderation.defs#modEventDivert', ], }, subject: { @@ -8320,6 +8322,16 @@ export const schemaDict = { }, }, }, + modEventDivert: { + type: 'object', + description: + "Divert a record's blobs to a 3rd party service for further scanning/tagging", + properties: { + comment: { + type: 'string', + }, + }, + }, modEventTag: { type: 'object', description: 'Add/Remove a tag on a subject', diff --git a/packages/api/src/client/types/tools/ozone/moderation/defs.ts b/packages/api/src/client/types/tools/ozone/moderation/defs.ts index 9134639b3d3..f6f546b6bef 100644 --- a/packages/api/src/client/types/tools/ozone/moderation/defs.ts +++ b/packages/api/src/client/types/tools/ozone/moderation/defs.ts @@ -24,6 +24,7 @@ export interface ModEventView { | ModEventMute | ModEventEmail | ModEventResolveAppeal + | ModEventDivert | { $type: string; [k: string]: unknown } subject: | ComAtprotoAdminDefs.RepoRef @@ -62,6 +63,7 @@ export interface ModEventViewDetail { | ModEventMute | ModEventEmail | ModEventResolveAppeal + | ModEventDivert | { $type: string; [k: string]: unknown } subject: | RepoView @@ -367,6 +369,24 @@ export function validateModEventEmail(v: unknown): ValidationResult { return lexicons.validate('tools.ozone.moderation.defs#modEventEmail', v) } +/** Divert a record's blobs to a 3rd party service for further scanning/tagging */ +export interface ModEventDivert { + comment?: string + [k: string]: unknown +} + +export function isModEventDivert(v: unknown): v is ModEventDivert { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'tools.ozone.moderation.defs#modEventDivert' + ) +} + +export function validateModEventDivert(v: unknown): ValidationResult { + return lexicons.validate('tools.ozone.moderation.defs#modEventDivert', v) +} + /** Add/Remove a tag on a subject */ export interface ModEventTag { /** Tags to be added to the subject. If already exists, won't be duplicated. */ diff --git a/packages/ozone/src/api/moderation/emitEvent.ts b/packages/ozone/src/api/moderation/emitEvent.ts index 71b42e3e433..1d971811673 100644 --- a/packages/ozone/src/api/moderation/emitEvent.ts +++ b/packages/ozone/src/api/moderation/emitEvent.ts @@ -2,160 +2,204 @@ import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../lexicon' import AppContext from '../../context' import { + isModEventDivert, isModEventEmail, isModEventLabel, isModEventReverseTakedown, isModEventTakedown, } from '../../lexicon/types/tools/ozone/moderation/defs' +import { HandlerInput } from '../../lexicon/types/tools/ozone/moderation/emitEvent' import { subjectFromInput } from '../../mod-service/subject' import { ModerationLangService } from '../../mod-service/lang' import { retryHttp } from '../../util' +import { ModeratorOutput, AdminTokenOutput } from '../../auth-verifier' + +const handleModerationEvent = async ({ + ctx, + input, + auth, +}: { + ctx: AppContext + input: HandlerInput + auth: ModeratorOutput | AdminTokenOutput +}) => { + const access = auth.credentials + const createdBy = + auth.credentials.type === 'moderator' + ? auth.credentials.iss + : input.body.createdBy + const db = ctx.db + const moderationService = ctx.modService(db) + const { event } = input.body + const isTakedownEvent = isModEventTakedown(event) + const isReverseTakedownEvent = isModEventReverseTakedown(event) + const isLabelEvent = isModEventLabel(event) + const subject = subjectFromInput( + input.body.subject, + input.body.subjectBlobCids, + ) + + // apply access rules + + // if less than moderator access then can only take ack and escalation actions + if (isTakedownEvent || isReverseTakedownEvent) { + if (!access.isModerator) { + throw new AuthRequiredError( + 'Must be a full moderator to take this type of action', + ) + } -export default function (server: Server, ctx: AppContext) { - server.tools.ozone.moderation.emitEvent({ - auth: ctx.authVerifier.modOrAdminToken, - handler: async ({ input, auth }) => { - const access = auth.credentials - const createdBy = - auth.credentials.type === 'moderator' - ? auth.credentials.iss - : input.body.createdBy - const db = ctx.db - const moderationService = ctx.modService(db) - const { event } = input.body - const isTakedownEvent = isModEventTakedown(event) - const isReverseTakedownEvent = isModEventReverseTakedown(event) - const isLabelEvent = isModEventLabel(event) - const subject = subjectFromInput( - input.body.subject, - input.body.subjectBlobCids, + // Non admins should not be able to take down feed generators + if ( + !access.isAdmin && + subject.recordPath?.includes('app.bsky.feed.generator/') + ) { + throw new AuthRequiredError( + 'Must be a full admin to take this type of action on feed generators', ) + } + } + // if less than moderator access then can not apply labels + if (!access.isModerator && isLabelEvent) { + throw new AuthRequiredError('Must be a full moderator to label content') + } - // apply access rules - - // if less than moderator access then can only take ack and escalation actions - if (isTakedownEvent || isReverseTakedownEvent) { - if (!access.isModerator) { - throw new AuthRequiredError( - 'Must be a full moderator to take this type of action', - ) - } - - // Non admins should not be able to take down feed generators - if ( - !access.isAdmin && - subject.recordPath?.includes('app.bsky.feed.generator/') - ) { - throw new AuthRequiredError( - 'Must be a full admin to take this type of action on feed generators', - ) - } - } - // if less than moderator access then can not apply labels - if (!access.isModerator && isLabelEvent) { - throw new AuthRequiredError('Must be a full moderator to label content') - } + if (isLabelEvent) { + validateLabels([ + ...(event.createLabelVals ?? []), + ...(event.negateLabelVals ?? []), + ]) + } - if (isLabelEvent) { - validateLabels([ - ...(event.createLabelVals ?? []), - ...(event.negateLabelVals ?? []), - ]) - } + if (isTakedownEvent || isReverseTakedownEvent) { + const status = await moderationService.getStatus(subject) - if (isTakedownEvent || isReverseTakedownEvent) { - const status = await moderationService.getStatus(subject) + if (status?.takendown && isTakedownEvent) { + throw new InvalidRequestError(`Subject is already taken down`) + } + + if (!status?.takendown && isReverseTakedownEvent) { + throw new InvalidRequestError(`Subject is not taken down`) + } - if (status?.takendown && isTakedownEvent) { - throw new InvalidRequestError(`Subject is already taken down`) - } + if (status?.takendown && isReverseTakedownEvent && subject.isRecord()) { + // due to the way blob status is modeled, we should reverse takedown on all + // blobs for the record being restored, which aren't taken down on another record. + subject.blobCids = status.blobCids ?? [] + } + } - if (!status?.takendown && isReverseTakedownEvent) { - throw new InvalidRequestError(`Subject is not taken down`) - } + if (isModEventEmail(event) && event.content) { + // sending email prior to logging the event to avoid a long transaction below + if (!subject.isRepo()) { + throw new InvalidRequestError('Email can only be sent to a repo subject') + } + const { content, subjectLine } = event + await retryHttp(() => + ctx.modService(db).sendEmail({ + subject: subjectLine, + content, + recipientDid: subject.did, + }), + ) + } - if (status?.takendown && isReverseTakedownEvent && subject.isRecord()) { - // due to the way blob status is modeled, we should reverse takedown on all - // blobs for the record being restored, which aren't taken down on another record. - subject.blobCids = status.blobCids ?? [] - } - } + if (isModEventDivert(event) && subject.isRecord()) { + if (!ctx.blobDiverter) { + throw new InvalidRequestError( + 'BlobDiverter not configured for this service', + ) + } + await ctx.blobDiverter.uploadBlobOnService(subject.info()) + } - if (isModEventEmail(event) && event.content) { - // sending email prior to logging the event to avoid a long transaction below - if (!subject.isRepo()) { - throw new InvalidRequestError( - 'Email can only be sent to a repo subject', - ) - } - const { content, subjectLine } = event - await retryHttp(() => - ctx.modService(db).sendEmail({ - subject: subjectLine, - content, - recipientDid: subject.did, - }), - ) + const moderationEvent = await db.transaction(async (dbTxn) => { + const moderationTxn = ctx.modService(dbTxn) + + const result = await moderationTxn.logEvent({ + event, + subject, + createdBy, + }) + + const moderationLangService = new ModerationLangService(moderationTxn) + await moderationLangService.tagSubjectWithLang({ + subject, + createdBy: ctx.cfg.service.did, + subjectStatus: result.subjectStatus, + }) + + if (subject.isRepo()) { + if (isTakedownEvent) { + const isSuspend = !!result.event.durationInHours + await moderationTxn.takedownRepo(subject, result.event.id, isSuspend) + } else if (isReverseTakedownEvent) { + await moderationTxn.reverseTakedownRepo(subject) } + } - const moderationEvent = await db.transaction(async (dbTxn) => { - const moderationTxn = ctx.modService(dbTxn) + if (subject.isRecord()) { + if (isTakedownEvent) { + await moderationTxn.takedownRecord(subject, result.event.id) + } else if (isReverseTakedownEvent) { + await moderationTxn.reverseTakedownRecord(subject) + } + } - const result = await moderationTxn.logEvent({ - event, - subject, - createdBy, - }) + if (isLabelEvent) { + await moderationTxn.formatAndCreateLabels( + result.event.subjectUri ?? result.event.subjectDid, + result.event.subjectCid, + { + create: result.event.createLabelVals?.length + ? result.event.createLabelVals.split(' ') + : undefined, + negate: result.event.negateLabelVals?.length + ? result.event.negateLabelVals.split(' ') + : undefined, + }, + ) + } - const moderationLangService = new ModerationLangService(moderationTxn) - await moderationLangService.tagSubjectWithLang({ - subject, - createdBy: ctx.cfg.service.did, - subjectStatus: result.subjectStatus, - }) + return result.event + }) - if (subject.isRepo()) { - if (isTakedownEvent) { - const isSuspend = !!result.event.durationInHours - await moderationTxn.takedownRepo( - subject, - result.event.id, - isSuspend, - ) - } else if (isReverseTakedownEvent) { - await moderationTxn.reverseTakedownRepo(subject) - } - } - - if (subject.isRecord()) { - if (isTakedownEvent) { - await moderationTxn.takedownRecord(subject, result.event.id) - } else if (isReverseTakedownEvent) { - await moderationTxn.reverseTakedownRecord(subject) - } - } - - if (isLabelEvent) { - await moderationTxn.formatAndCreateLabels( - result.event.subjectUri ?? result.event.subjectDid, - result.event.subjectCid, - { - create: result.event.createLabelVals?.length - ? result.event.createLabelVals.split(' ') - : undefined, - negate: result.event.negateLabelVals?.length - ? result.event.negateLabelVals.split(' ') - : undefined, - }, - ) - } + return moderationService.views.formatEvent(moderationEvent) +} - return result.event +export default function (server: Server, ctx: AppContext) { + server.tools.ozone.moderation.emitEvent({ + auth: ctx.authVerifier.modOrAdminToken, + handler: async ({ input, auth }) => { + const moderationEvent = await handleModerationEvent({ + input, + auth, + ctx, }) + // On divert events, we need to automatically take down the blobs + if (isModEventDivert(input.body.event)) { + await handleModerationEvent({ + auth, + ctx, + input: { + ...input, + body: { + ...input.body, + event: { + ...input.body.event, + $type: 'tools.ozone.moderation.defs#modEventTakedown', + comment: + '[DIVERT_SIDE_EFFECT]: Automatically taking down after divert event', + }, + }, + }, + }) + } + return { encoding: 'application/json', - body: moderationService.views.formatEvent(moderationEvent), + body: moderationEvent, } }, }) diff --git a/packages/ozone/src/auth-verifier.ts b/packages/ozone/src/auth-verifier.ts index 2eec84cb683..b3ab579d754 100644 --- a/packages/ozone/src/auth-verifier.ts +++ b/packages/ozone/src/auth-verifier.ts @@ -7,7 +7,7 @@ type ReqCtx = { req: express.Request } -type AdminTokenOutput = { +export type AdminTokenOutput = { credentials: { type: 'admin_token' isAdmin: true @@ -16,7 +16,7 @@ type AdminTokenOutput = { } } -type ModeratorOutput = { +export type ModeratorOutput = { credentials: { type: 'moderator' aud: string diff --git a/packages/ozone/src/config/config.ts b/packages/ozone/src/config/config.ts index 8621dfd2957..b3fb1c8dbe9 100644 --- a/packages/ozone/src/config/config.ts +++ b/packages/ozone/src/config/config.ts @@ -50,6 +50,13 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => { plcUrl: env.didPlcUrl, } + const blobDivertServiceCfg = + env.blobDivertUrl && env.blobDivertAdminPassword + ? { + url: env.blobDivertUrl, + adminPassword: env.blobDivertAdminPassword, + } + : null const accessCfg: OzoneConfig['access'] = { admins: env.adminDids, moderators: env.moderatorDids, @@ -63,6 +70,7 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => { pds: pdsCfg, cdn: cdnCfg, identity: identityCfg, + blobDivert: blobDivertServiceCfg, access: accessCfg, } } @@ -74,6 +82,7 @@ export type OzoneConfig = { pds: PdsConfig | null cdn: CdnConfig identity: IdentityConfig + blobDivert: BlobDivertConfig | null access: AccessConfig } @@ -85,6 +94,11 @@ export type ServiceConfig = { devMode?: boolean } +export type BlobDivertConfig = { + url: string + adminPassword: string +} + export type DatabaseConfig = { postgresUrl: string postgresSchema?: string diff --git a/packages/ozone/src/config/env.ts b/packages/ozone/src/config/env.ts index 06850df0c5e..a879339aacd 100644 --- a/packages/ozone/src/config/env.ts +++ b/packages/ozone/src/config/env.ts @@ -25,6 +25,8 @@ export const readEnv = (): OzoneEnvironment => { triageDids: envList('OZONE_TRIAGE_DIDS'), adminPassword: envStr('OZONE_ADMIN_PASSWORD'), signingKeyHex: envStr('OZONE_SIGNING_KEY_HEX'), + blobDivertUrl: envStr('OZONE_BLOB_DIVERT_URL'), + blobDivertAdminPassword: envStr('OZONE_BLOB_DIVERT_ADMIN_PASSWORD'), } } @@ -52,4 +54,6 @@ export type OzoneEnvironment = { triageDids: string[] adminPassword?: string signingKeyHex?: string + blobDivertUrl?: string + blobDivertAdminPassword?: string } diff --git a/packages/ozone/src/context.ts b/packages/ozone/src/context.ts index b7d9a3e9e89..d0cbd9ae347 100644 --- a/packages/ozone/src/context.ts +++ b/packages/ozone/src/context.ts @@ -14,6 +14,7 @@ import { CommunicationTemplateService, CommunicationTemplateServiceCreator, } from './communication-service/template' +import { BlobDiverter } from './daemon/blob-diverter' import { AuthVerifier } from './auth-verifier' import { ImageInvalidator } from './image-invalidator' import { getSigningKeyId } from './util' @@ -25,6 +26,7 @@ export type AppContextOptions = { communicationTemplateService: CommunicationTemplateServiceCreator appviewAgent: AtpAgent pdsAgent: AtpAgent | undefined + blobDiverter?: BlobDiverter signingKey: Keypair signingKeyId: number idResolver: IdResolver @@ -56,6 +58,10 @@ export class AppContext { ? new AtpAgent({ service: cfg.pds.url }) : undefined + const idResolver = new IdResolver({ + plcUrl: cfg.identity.plcUrl, + }) + const createAuthHeaders = (aud: string) => createServiceAuthHeaders({ iss: `${cfg.service.did}#atproto_labeler`, @@ -64,15 +70,16 @@ export class AppContext { }) const backgroundQueue = new BackgroundQueue(db) + const blobDiverter = cfg.blobDivert + ? new BlobDiverter(db, { + idResolver, + serviceConfig: cfg.blobDivert, + }) + : undefined const eventPusher = new EventPusher(db, createAuthHeaders, { appview: cfg.appview.pushEvents ? cfg.appview : undefined, pds: cfg.pds ?? undefined, }) - - const idResolver = new IdResolver({ - plcUrl: cfg.identity.plcUrl, - }) - const modService = ModerationService.creator( signingKey, signingKeyId, @@ -111,6 +118,7 @@ export class AppContext { backgroundQueue, sequencer, authVerifier, + blobDiverter, ...(overrides ?? {}), }, secrets, @@ -137,6 +145,10 @@ export class AppContext { return this.opts.modService } + get blobDiverter(): BlobDiverter | undefined { + return this.opts.blobDiverter + } + get communicationTemplateService(): CommunicationTemplateServiceCreator { return this.opts.communicationTemplateService } diff --git a/packages/ozone/src/daemon/blob-diverter.ts b/packages/ozone/src/daemon/blob-diverter.ts new file mode 100644 index 00000000000..386c3a090cb --- /dev/null +++ b/packages/ozone/src/daemon/blob-diverter.ts @@ -0,0 +1,150 @@ +import { + VerifyCidTransform, + forwardStreamErrors, + getPdsEndpoint, +} from '@atproto/common' +import { IdResolver } from '@atproto/identity' +import axios from 'axios' +import { Readable } from 'stream' +import { CID } from 'multiformats/cid' + +import Database from '../db' +import { retryHttp } from '../util' +import { BlobDivertConfig } from '../config' + +export class BlobDiverter { + serviceConfig: BlobDivertConfig + idResolver: IdResolver + + constructor( + public db: Database, + services: { + idResolver: IdResolver + serviceConfig: BlobDivertConfig + }, + ) { + this.serviceConfig = services.serviceConfig + this.idResolver = services.idResolver + } + + private async getBlob({ + pds, + did, + cid, + }: { + pds: string + did: string + cid: string + }) { + const blobResponse = await axios.get( + `${pds}/xrpc/com.atproto.sync.getBlob`, + { + params: { did, cid }, + decompress: true, + responseType: 'stream', + timeout: 5000, // 5sec of inactivity on the connection + }, + ) + const imageStream: Readable = blobResponse.data + const verifyCid = new VerifyCidTransform(CID.parse(cid)) + forwardStreamErrors(imageStream, verifyCid) + + return { + contentType: + blobResponse.headers['content-type'] || 'application/octet-stream', + imageStream: imageStream.pipe(verifyCid), + } + } + + async sendImage({ + url, + imageStream, + contentType, + }: { + url: string + imageStream: Readable + contentType: string + }) { + const result = await axios(url, { + method: 'POST', + data: imageStream, + headers: { + Authorization: basicAuth('admin', this.serviceConfig.adminPassword), + 'Content-Type': contentType, + }, + }) + + return result.status === 200 + } + + private async uploadBlob( + { + imageStream, + contentType, + }: { imageStream: Readable; contentType: string }, + { + subjectDid, + subjectUri, + }: { subjectDid: string; subjectUri: string | null }, + ) { + const url = new URL(this.serviceConfig.url) + url.searchParams.set('did', subjectDid) + if (subjectUri) url.searchParams.set('uri', subjectUri) + const result = await this.sendImage({ + url: url.toString(), + imageStream, + contentType, + }) + + return result + } + + async uploadBlobOnService({ + subjectDid, + subjectUri, + subjectBlobCids, + }: { + subjectDid: string + subjectUri: string + subjectBlobCids: string[] + }): Promise { + const didDoc = await this.idResolver.did.resolve(subjectDid) + + if (!didDoc) { + throw new Error('Error resolving DID') + } + + const pds = getPdsEndpoint(didDoc) + + if (!pds) { + throw new Error('Error resolving PDS') + } + + // attempt to download and upload within the same retry block since the imageStream is not reusable + const uploadResult = await Promise.all( + subjectBlobCids.map((cid) => + retryHttp(async () => { + const { imageStream, contentType } = await this.getBlob({ + pds, + cid, + did: subjectDid, + }) + return this.uploadBlob( + { imageStream, contentType }, + { subjectDid, subjectUri }, + ) + }), + ), + ) + + if (uploadResult.includes(false)) { + throw new Error(`Error uploading blob ${subjectUri}`) + } + + return true + } +} + +const basicAuth = (username: string, password: string) => { + return 'Basic ' + Buffer.from(`${username}:${password}`).toString('base64') +} diff --git a/packages/ozone/src/daemon/context.ts b/packages/ozone/src/daemon/context.ts index 64cf3c9423e..3ffa021d37e 100644 --- a/packages/ozone/src/daemon/context.ts +++ b/packages/ozone/src/daemon/context.ts @@ -1,5 +1,6 @@ import { Keypair, Secp256k1Keypair } from '@atproto/crypto' import { createServiceAuthHeaders } from '@atproto/xrpc-server' +import { IdResolver } from '@atproto/identity' import AtpAgent from '@atproto/api' import { OzoneConfig, OzoneSecrets } from '../config' import { Database } from '../db' @@ -7,7 +8,6 @@ import { EventPusher } from './event-pusher' import { EventReverser } from './event-reverser' import { ModerationService, ModerationServiceCreator } from '../mod-service' import { BackgroundQueue } from '../background' -import { IdResolver } from '@atproto/identity' import { getSigningKeyId } from '../util' export type DaemonContextOptions = { @@ -34,6 +34,10 @@ export class DaemonContext { const signingKey = await Secp256k1Keypair.import(secrets.signingKeyHex) const signingKeyId = await getSigningKeyId(db, signingKey.did()) + const idResolver = new IdResolver({ + plcUrl: cfg.identity.plcUrl, + }) + const appviewAgent = new AtpAgent({ service: cfg.appview.url }) const createAuthHeaders = (aud: string) => createServiceAuthHeaders({ @@ -48,9 +52,6 @@ export class DaemonContext { }) const backgroundQueue = new BackgroundQueue(db) - const idResolver = new IdResolver({ - plcUrl: cfg.identity.plcUrl, - }) const modService = ModerationService.creator( signingKey, diff --git a/packages/ozone/src/daemon/event-pusher.ts b/packages/ozone/src/daemon/event-pusher.ts index d1ff52b7d14..ea4b5ecd35d 100644 --- a/packages/ozone/src/daemon/event-pusher.ts +++ b/packages/ozone/src/daemon/event-pusher.ts @@ -6,6 +6,8 @@ import { RepoPushEventType } from '../db/schema/repo_push_event' import { retryHttp } from '../util' import { dbLogger } from '../logger' import { InputSchema } from '../lexicon/types/com/atproto/admin/updateSubjectStatus' +import { BlobPushEvent } from '../db/schema/blob_push_event' +import { Insertable, Selectable } from 'kysely' type EventSubject = InputSchema['subject'] @@ -285,20 +287,53 @@ export class EventPusher { subject, evt.takedownRef, ) - await dbTxn.db - .updateTable('blob_push_event') - .set( - succeeded - ? { confirmedAt: new Date() } - : { - lastAttempted: new Date(), - attempts: (evt.attempts ?? 0) + 1, - }, - ) - .where('subjectDid', '=', evt.subjectDid) - .where('subjectBlobCid', '=', evt.subjectBlobCid) - .where('eventType', '=', evt.eventType) - .execute() + await this.markBlobEventAttempt(dbTxn, evt, succeeded) }) } + + async markBlobEventAttempt( + dbTxn: Database, + event: Selectable, + succeeded: boolean, + ) { + await dbTxn.db + .updateTable('blob_push_event') + .set( + succeeded + ? { confirmedAt: new Date() } + : { + lastAttempted: new Date(), + attempts: (event.attempts ?? 0) + 1, + }, + ) + .where('subjectDid', '=', event.subjectDid) + .where('subjectBlobCid', '=', event.subjectBlobCid) + .where('eventType', '=', event.eventType) + .execute() + } + + async logBlobPushEvent( + blobValues: Insertable[], + takedownRef?: string | null, + ) { + return this.db.db + .insertInto('blob_push_event') + .values(blobValues) + .onConflict((oc) => + oc.columns(['subjectDid', 'subjectBlobCid', 'eventType']).doUpdateSet({ + takedownRef, + confirmedAt: null, + attempts: 0, + lastAttempted: null, + }), + ) + .returning([ + 'id', + 'subjectDid', + 'subjectUri', + 'subjectBlobCid', + 'eventType', + ]) + .execute() + } } diff --git a/packages/ozone/src/daemon/index.ts b/packages/ozone/src/daemon/index.ts index aa5d7b12734..501b8caad5c 100644 --- a/packages/ozone/src/daemon/index.ts +++ b/packages/ozone/src/daemon/index.ts @@ -3,6 +3,7 @@ import DaemonContext from './context' import { AppContextOptions } from '../context' export { EventPusher } from './event-pusher' +export { BlobDiverter } from './blob-diverter' export { EventReverser } from './event-reverser' export class OzoneDaemon { diff --git a/packages/ozone/src/lexicon/lexicons.ts b/packages/ozone/src/lexicon/lexicons.ts index 3f46bf6b17e..e9a40ffd7dc 100644 --- a/packages/ozone/src/lexicon/lexicons.ts +++ b/packages/ozone/src/lexicon/lexicons.ts @@ -7976,6 +7976,7 @@ export const schemaDict = { 'lex:tools.ozone.moderation.defs#modEventMute', 'lex:tools.ozone.moderation.defs#modEventEmail', 'lex:tools.ozone.moderation.defs#modEventResolveAppeal', + 'lex:tools.ozone.moderation.defs#modEventDivert', ], }, subject: { @@ -8034,6 +8035,7 @@ export const schemaDict = { 'lex:tools.ozone.moderation.defs#modEventMute', 'lex:tools.ozone.moderation.defs#modEventEmail', 'lex:tools.ozone.moderation.defs#modEventResolveAppeal', + 'lex:tools.ozone.moderation.defs#modEventDivert', ], }, subject: { @@ -8320,6 +8322,16 @@ export const schemaDict = { }, }, }, + modEventDivert: { + type: 'object', + description: + "Divert a record's blobs to a 3rd party service for further scanning/tagging", + properties: { + comment: { + type: 'string', + }, + }, + }, modEventTag: { type: 'object', description: 'Add/Remove a tag on a subject', diff --git a/packages/ozone/src/lexicon/types/tools/ozone/moderation/defs.ts b/packages/ozone/src/lexicon/types/tools/ozone/moderation/defs.ts index 6df5e41b35d..b150d735f6c 100644 --- a/packages/ozone/src/lexicon/types/tools/ozone/moderation/defs.ts +++ b/packages/ozone/src/lexicon/types/tools/ozone/moderation/defs.ts @@ -24,6 +24,7 @@ export interface ModEventView { | ModEventMute | ModEventEmail | ModEventResolveAppeal + | ModEventDivert | { $type: string; [k: string]: unknown } subject: | ComAtprotoAdminDefs.RepoRef @@ -62,6 +63,7 @@ export interface ModEventViewDetail { | ModEventMute | ModEventEmail | ModEventResolveAppeal + | ModEventDivert | { $type: string; [k: string]: unknown } subject: | RepoView @@ -367,6 +369,24 @@ export function validateModEventEmail(v: unknown): ValidationResult { return lexicons.validate('tools.ozone.moderation.defs#modEventEmail', v) } +/** Divert a record's blobs to a 3rd party service for further scanning/tagging */ +export interface ModEventDivert { + comment?: string + [k: string]: unknown +} + +export function isModEventDivert(v: unknown): v is ModEventDivert { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'tools.ozone.moderation.defs#modEventDivert' + ) +} + +export function validateModEventDivert(v: unknown): ValidationResult { + return lexicons.validate('tools.ozone.moderation.defs#modEventDivert', v) +} + /** Add/Remove a tag on a subject */ export interface ModEventTag { /** Tags to be added to the subject. If already exists, won't be duplicated. */ diff --git a/packages/ozone/src/mod-service/index.ts b/packages/ozone/src/mod-service/index.ts index 0faabd03439..bda6a286d8d 100644 --- a/packages/ozone/src/mod-service/index.ts +++ b/packages/ozone/src/mod-service/index.ts @@ -565,27 +565,17 @@ export class ModerationService { for (const cid of blobCids) { blobValues.push({ eventType, + takedownRef, subjectDid: subject.did, + subjectUri: subject.uri || null, subjectBlobCid: cid.toString(), - takedownRef, }) } } - const blobEvts = await this.db.db - .insertInto('blob_push_event') - .values(blobValues) - .onConflict((oc) => - oc - .columns(['subjectDid', 'subjectBlobCid', 'eventType']) - .doUpdateSet({ - takedownRef, - confirmedAt: null, - attempts: 0, - lastAttempted: null, - }), - ) - .returning(['id', 'subjectDid', 'subjectBlobCid', 'eventType']) - .execute() + const blobEvts = await this.eventPusher.logBlobPushEvent( + blobValues, + takedownRef, + ) this.db.onCommit(() => { this.backgroundQueue.add(async () => { diff --git a/packages/ozone/tests/__snapshots__/blob-divert.test.ts.snap b/packages/ozone/tests/__snapshots__/blob-divert.test.ts.snap new file mode 100644 index 00000000000..4210f0d6a0d --- /dev/null +++ b/packages/ozone/tests/__snapshots__/blob-divert.test.ts.snap @@ -0,0 +1,22 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`blob divert sends blobs to configured divert service and marks divert date 1`] = ` +Object { + "createdAt": "1970-01-01T00:00:00.000Z", + "createdBy": "user(0)", + "event": Object { + "$type": "tools.ozone.moderation.defs#modEventDivert", + "comment": "Diverting for test", + }, + "id": 1, + "subject": Object { + "$type": "com.atproto.repo.strongRef", + "cid": "cids(0)", + "uri": "record(0)", + }, + "subjectBlobCids": Array [ + "cids(1)", + "cids(2)", + ], +} +`; diff --git a/packages/ozone/tests/blob-divert.test.ts b/packages/ozone/tests/blob-divert.test.ts new file mode 100644 index 00000000000..a07cbf62b73 --- /dev/null +++ b/packages/ozone/tests/blob-divert.test.ts @@ -0,0 +1,87 @@ +import { + ModeratorClient, + SeedClient, + TestNetwork, + basicSeed, +} from '@atproto/dev-env' +import { BlobDiverter } from '../src/daemon' +import { forSnapshot } from './_util' + +describe('blob divert', () => { + let network: TestNetwork + let sc: SeedClient + let modClient: ModeratorClient + + beforeAll(async () => { + network = await TestNetwork.create({ + dbPostgresSchema: 'ozone_blob_divert_test', + ozone: { + blobDivertUrl: `https://blob-report.com`, + blobDivertAdminPassword: 'test-auth-token', + }, + }) + sc = network.getSeedClient() + modClient = network.ozone.getModClient() + await basicSeed(sc) + await network.processAll() + }) + + afterAll(async () => { + await network.close() + }) + + const mockReportServiceResponse = (result: boolean) => { + return jest + .spyOn(BlobDiverter.prototype, 'sendImage') + .mockImplementation(async () => { + return result + }) + } + + const getSubject = () => ({ + $type: 'com.atproto.repo.strongRef', + uri: sc.posts[sc.dids.carol][0].ref.uriStr, + cid: sc.posts[sc.dids.carol][0].ref.cidStr, + }) + + const emitDivertEvent = async () => + modClient.emitEvent( + { + subject: getSubject(), + event: { + $type: 'tools.ozone.moderation.defs#modEventDivert', + comment: 'Diverting for test', + }, + createdBy: sc.dids.alice, + subjectBlobCids: sc.posts[sc.dids.carol][0].images.map((img) => + img.image.ref.toString(), + ), + }, + 'moderator', + ) + + it('fails and keeps attempt count when report service fails to accept upload.', async () => { + // Simulate failure to fail upload + const reportServiceRequest = mockReportServiceResponse(false) + + await expect(emitDivertEvent()).rejects.toThrow() + + expect(reportServiceRequest).toHaveBeenCalled() + }) + + it('sends blobs to configured divert service and marks divert date', async () => { + // Simulate failure to accept upload + const reportServiceRequest = mockReportServiceResponse(true) + + const divertEvent = await emitDivertEvent() + + expect(reportServiceRequest).toHaveBeenCalled() + expect(forSnapshot(divertEvent)).toMatchSnapshot() + + const { subjectStatuses } = await modClient.queryStatuses({ + subject: getSubject().uri, + }) + + expect(subjectStatuses[0].takendown).toBe(true) + }) +}) diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index 3f46bf6b17e..e9a40ffd7dc 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -7976,6 +7976,7 @@ export const schemaDict = { 'lex:tools.ozone.moderation.defs#modEventMute', 'lex:tools.ozone.moderation.defs#modEventEmail', 'lex:tools.ozone.moderation.defs#modEventResolveAppeal', + 'lex:tools.ozone.moderation.defs#modEventDivert', ], }, subject: { @@ -8034,6 +8035,7 @@ export const schemaDict = { 'lex:tools.ozone.moderation.defs#modEventMute', 'lex:tools.ozone.moderation.defs#modEventEmail', 'lex:tools.ozone.moderation.defs#modEventResolveAppeal', + 'lex:tools.ozone.moderation.defs#modEventDivert', ], }, subject: { @@ -8320,6 +8322,16 @@ export const schemaDict = { }, }, }, + modEventDivert: { + type: 'object', + description: + "Divert a record's blobs to a 3rd party service for further scanning/tagging", + properties: { + comment: { + type: 'string', + }, + }, + }, modEventTag: { type: 'object', description: 'Add/Remove a tag on a subject', diff --git a/packages/pds/src/lexicon/types/tools/ozone/moderation/defs.ts b/packages/pds/src/lexicon/types/tools/ozone/moderation/defs.ts index 6df5e41b35d..b150d735f6c 100644 --- a/packages/pds/src/lexicon/types/tools/ozone/moderation/defs.ts +++ b/packages/pds/src/lexicon/types/tools/ozone/moderation/defs.ts @@ -24,6 +24,7 @@ export interface ModEventView { | ModEventMute | ModEventEmail | ModEventResolveAppeal + | ModEventDivert | { $type: string; [k: string]: unknown } subject: | ComAtprotoAdminDefs.RepoRef @@ -62,6 +63,7 @@ export interface ModEventViewDetail { | ModEventMute | ModEventEmail | ModEventResolveAppeal + | ModEventDivert | { $type: string; [k: string]: unknown } subject: | RepoView @@ -367,6 +369,24 @@ export function validateModEventEmail(v: unknown): ValidationResult { return lexicons.validate('tools.ozone.moderation.defs#modEventEmail', v) } +/** Divert a record's blobs to a 3rd party service for further scanning/tagging */ +export interface ModEventDivert { + comment?: string + [k: string]: unknown +} + +export function isModEventDivert(v: unknown): v is ModEventDivert { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'tools.ozone.moderation.defs#modEventDivert' + ) +} + +export function validateModEventDivert(v: unknown): ValidationResult { + return lexicons.validate('tools.ozone.moderation.defs#modEventDivert', v) +} + /** Add/Remove a tag on a subject */ export interface ModEventTag { /** Tags to be added to the subject. If already exists, won't be duplicated. */ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 47fabe9593b..1f673eeaa28 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1,9 +1,5 @@ lockfileVersion: '6.0' -settings: - autoInstallPeers: true - excludeLinksFromLockfile: false - importers: .: @@ -12055,3 +12051,7 @@ packages: /zod@3.21.4: resolution: {integrity: sha512-m46AKbrzKVzOzs/DZgVnG5H55N1sv1M8qZU3A8RIKbs3mrACDNeIOeilDymVb2HdmP8uwshOCF4uJ8uM9rCqJw==} + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false