diff --git a/.github/workflows/build-and-push-bsky-aws.yaml b/.github/workflows/build-and-push-bsky-aws.yaml index 63842580237..4c0fd6e1cfe 100644 --- a/.github/workflows/build-and-push-bsky-aws.yaml +++ b/.github/workflows/build-and-push-bsky-aws.yaml @@ -3,7 +3,7 @@ on: push: branches: - main - - ozone-service + - ozone-service-partial-bav env: REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} diff --git a/.github/workflows/build-and-push-ozone-aws.yaml b/.github/workflows/build-and-push-ozone-aws.yaml index 46534d509cc..53f95c5b731 100644 --- a/.github/workflows/build-and-push-ozone-aws.yaml +++ b/.github/workflows/build-and-push-ozone-aws.yaml @@ -3,7 +3,6 @@ on: push: branches: - main - - ozone-service env: REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} diff --git a/.github/workflows/build-and-push-pds-aws.yaml b/.github/workflows/build-and-push-pds-aws.yaml index b3db5d46831..097f782d88e 100644 --- a/.github/workflows/build-and-push-pds-aws.yaml +++ b/.github/workflows/build-and-push-pds-aws.yaml @@ -3,7 +3,6 @@ on: push: branches: - main - - ozone-service env: REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }} USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }} diff --git a/.github/workflows/build-and-push-pds-ghcr.yaml b/.github/workflows/build-and-push-pds-ghcr.yaml index aa531b8735b..b11230ab531 100644 --- a/.github/workflows/build-and-push-pds-ghcr.yaml +++ b/.github/workflows/build-and-push-pds-ghcr.yaml @@ -3,7 +3,6 @@ on: push: branches: - main - - ozone-service env: REGISTRY: ghcr.io USERNAME: ${{ github.actor }} diff --git a/packages/bsky/src/api/com/atproto/admin/emitModerationEvent.ts b/packages/bsky/src/api/com/atproto/admin/emitModerationEvent.ts new file mode 100644 index 00000000000..d806be2ca09 --- /dev/null +++ b/packages/bsky/src/api/com/atproto/admin/emitModerationEvent.ts @@ -0,0 +1,220 @@ +import { CID } from 'multiformats/cid' +import { AtUri } from '@atproto/syntax' +import { + AuthRequiredError, + InvalidRequestError, + UpstreamFailureError, +} from '@atproto/xrpc-server' +import { Server } from '../../../../lexicon' +import AppContext from '../../../../context' +import { getSubject } from '../moderation/util' +import { + isModEventLabel, + isModEventReverseTakedown, + isModEventTakedown, +} from '../../../../lexicon/types/com/atproto/admin/defs' +import { TakedownSubjects } from '../../../../services/moderation' +import { retryHttp } from '../../../../util/retry' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.admin.emitModerationEvent({ + auth: ctx.authVerifier.role, + handler: async ({ input, auth }) => { + const access = auth.credentials + const db = ctx.db.getPrimary() + const moderationService = ctx.services.moderation(db) + const { subject, createdBy, subjectBlobCids, event } = input.body + const isTakedownEvent = isModEventTakedown(event) + const isReverseTakedownEvent = isModEventReverseTakedown(event) + const isLabelEvent = isModEventLabel(event) + + // apply access rules + + // if less than moderator access then can not takedown an account + if (!access.moderator && isTakedownEvent && 'did' in subject) { + throw new AuthRequiredError( + 'Must be a full moderator to perform an account takedown', + ) + } + // if less than moderator access then can only take ack and escalation actions + if (!access.moderator && (isTakedownEvent || isReverseTakedownEvent)) { + throw new AuthRequiredError( + 'Must be a full moderator to take this type of action', + ) + } + // if less than moderator access then can not apply labels + if (!access.moderator && isLabelEvent) { + throw new AuthRequiredError('Must be a full moderator to label content') + } + + if (isLabelEvent) { + validateLabels([ + ...(event.createLabelVals ?? []), + ...(event.negateLabelVals ?? []), + ]) + } + + const subjectInfo = getSubject(subject) + + if (isTakedownEvent || isReverseTakedownEvent) { + const isSubjectTakendown = await moderationService.isSubjectTakendown( + subjectInfo, + ) + + if (isSubjectTakendown && isTakedownEvent) { + throw new InvalidRequestError(`Subject is already taken down`) + } + + if (!isSubjectTakendown && isReverseTakedownEvent) { + throw new InvalidRequestError(`Subject is not taken down`) + } + } + + const { result: moderationEvent, takenDown } = await db.transaction( + async (dbTxn) => { + const moderationTxn = ctx.services.moderation(dbTxn) + const labelTxn = ctx.services.label(dbTxn) + + const result = await moderationTxn.logEvent({ + event, + subject: subjectInfo, + subjectBlobCids: + subjectBlobCids?.map((cid) => CID.parse(cid)) ?? [], + createdBy, + }) + + let takenDown: TakedownSubjects | undefined + + if ( + result.subjectType === 'com.atproto.admin.defs#repoRef' && + result.subjectDid + ) { + // No credentials to revoke on appview + if (isTakedownEvent) { + takenDown = await moderationTxn.takedownRepoOld({ + takedownId: result.id, + did: result.subjectDid, + }) + } + + if (isReverseTakedownEvent) { + await moderationTxn.reverseTakedownRepo({ + did: result.subjectDid, + }) + takenDown = { + subjects: [ + { + $type: 'com.atproto.admin.defs#repoRef', + did: result.subjectDid, + }, + ], + did: result.subjectDid, + } + } + } + + if ( + result.subjectType === 'com.atproto.repo.strongRef' && + result.subjectUri + ) { + const blobCids = subjectBlobCids?.map((cid) => CID.parse(cid)) ?? [] + if (isTakedownEvent) { + takenDown = await moderationTxn.takedownRecordOld({ + takedownId: result.id, + uri: new AtUri(result.subjectUri), + // TODO: I think this will always be available for strongRefs? + cid: CID.parse(result.subjectCid as string), + blobCids, + }) + } + + if (isReverseTakedownEvent) { + await moderationTxn.reverseTakedownRecord({ + uri: new AtUri(result.subjectUri), + }) + takenDown = { + did: result.subjectDid, + subjects: [ + { + $type: 'com.atproto.repo.strongRef', + uri: result.subjectUri, + cid: result.subjectCid ?? '', + }, + ...blobCids.map((cid) => ({ + $type: 'com.atproto.admin.defs#repoBlobRef', + did: result.subjectDid, + cid: cid.toString(), + recordUri: result.subjectUri, + })), + ], + } + } + } + + if (isLabelEvent) { + await labelTxn.formatAndCreate( + ctx.cfg.labelerDid, + result.subjectUri ?? result.subjectDid, + result.subjectCid, + { + create: result.createLabelVals?.length + ? result.createLabelVals.split(' ') + : undefined, + negate: result.negateLabelVals?.length + ? result.negateLabelVals.split(' ') + : undefined, + }, + ) + } + + return { result, takenDown } + }, + ) + + if (takenDown && ctx.moderationPushAgent) { + const { did, subjects } = takenDown + if (did && subjects.length > 0) { + const agent = ctx.moderationPushAgent + const results = await Promise.allSettled( + subjects.map((subject) => + retryHttp(() => + agent.api.com.atproto.admin.updateSubjectStatus({ + subject, + takedown: isTakedownEvent + ? { + applied: true, + ref: moderationEvent.id.toString(), + } + : { + applied: false, + }, + }), + ), + ), + ) + const hadFailure = results.some((r) => r.status === 'rejected') + if (hadFailure) { + throw new UpstreamFailureError('failed to apply action on PDS') + } + } + } + + return { + encoding: 'application/json', + body: await moderationService.views.event(moderationEvent), + } + }, + }) +} + +const validateLabels = (labels: string[]) => { + for (const label of labels) { + for (const char of badChars) { + if (label.includes(char)) { + throw new InvalidRequestError(`Invalid label: ${label}`) + } + } + } +} + +const badChars = [' ', ',', ';', `'`, `"`] diff --git a/packages/bsky/src/api/com/atproto/admin/getModerationEvent.ts b/packages/bsky/src/api/com/atproto/admin/getModerationEvent.ts new file mode 100644 index 00000000000..e15d7f6e33a --- /dev/null +++ b/packages/bsky/src/api/com/atproto/admin/getModerationEvent.ts @@ -0,0 +1,19 @@ +import { Server } from '../../../../lexicon' +import AppContext from '../../../../context' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.admin.getModerationEvent({ + auth: ctx.authVerifier.role, + handler: async ({ params }) => { + const { id } = params + const db = ctx.db.getPrimary() + const moderationService = ctx.services.moderation(db) + const event = await moderationService.getEventOrThrow(id) + const eventDetail = await moderationService.views.eventDetail(event) + return { + encoding: 'application/json', + body: eventDetail, + } + }, + }) +} diff --git a/packages/bsky/src/api/com/atproto/admin/getRecord.ts b/packages/bsky/src/api/com/atproto/admin/getRecord.ts new file mode 100644 index 00000000000..4f67915139f --- /dev/null +++ b/packages/bsky/src/api/com/atproto/admin/getRecord.ts @@ -0,0 +1,39 @@ +import { InvalidRequestError } from '@atproto/xrpc-server' +import { Server } from '../../../../lexicon' +import AppContext from '../../../../context' +import { addAccountInfoToRepoView, getPdsAccountInfo } from './util' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.admin.getRecord({ + auth: ctx.authVerifier.role, + handler: async ({ params, auth }) => { + const { uri, cid } = params + const db = ctx.db.getPrimary() + const result = await db.db + .selectFrom('record') + .selectAll() + .where('uri', '=', uri) + .if(!!cid, (qb) => qb.where('cid', '=', cid ?? '')) + .executeTakeFirst() + if (!result) { + throw new InvalidRequestError('Record not found', 'RecordNotFound') + } + + const [record, accountInfo] = await Promise.all([ + ctx.services.moderation(db).views.recordDetail(result), + getPdsAccountInfo(ctx, result.did), + ]) + + record.repo = addAccountInfoToRepoView( + record.repo, + accountInfo, + auth.credentials.moderator, + ) + + return { + encoding: 'application/json', + body: record, + } + }, + }) +} diff --git a/packages/bsky/src/api/com/atproto/admin/getRepo.ts b/packages/bsky/src/api/com/atproto/admin/getRepo.ts new file mode 100644 index 00000000000..c074660f1ad --- /dev/null +++ b/packages/bsky/src/api/com/atproto/admin/getRepo.ts @@ -0,0 +1,32 @@ +import { InvalidRequestError } from '@atproto/xrpc-server' +import { Server } from '../../../../lexicon' +import AppContext from '../../../../context' +import { addAccountInfoToRepoViewDetail, getPdsAccountInfo } from './util' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.admin.getRepo({ + auth: ctx.authVerifier.role, + handler: async ({ params, auth }) => { + const { did } = params + const db = ctx.db.getPrimary() + const result = await ctx.services.actor(db).getActor(did, true) + if (!result) { + throw new InvalidRequestError('Repo not found', 'RepoNotFound') + } + const [partialRepo, accountInfo] = await Promise.all([ + ctx.services.moderation(db).views.repoDetail(result), + getPdsAccountInfo(ctx, result.did), + ]) + + const repo = addAccountInfoToRepoViewDetail( + partialRepo, + accountInfo, + auth.credentials.moderator, + ) + return { + encoding: 'application/json', + body: repo, + } + }, + }) +} diff --git a/packages/bsky/src/api/com/atproto/admin/queryModerationEvents.ts b/packages/bsky/src/api/com/atproto/admin/queryModerationEvents.ts new file mode 100644 index 00000000000..2c41b90285a --- /dev/null +++ b/packages/bsky/src/api/com/atproto/admin/queryModerationEvents.ts @@ -0,0 +1,38 @@ +import { Server } from '../../../../lexicon' +import AppContext from '../../../../context' +import { getEventType } from '../moderation/util' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.admin.queryModerationEvents({ + auth: ctx.authVerifier.role, + handler: async ({ params }) => { + const { + subject, + limit = 50, + cursor, + sortDirection = 'desc', + types, + includeAllUserRecords = false, + createdBy, + } = params + const db = ctx.db.getPrimary() + const moderationService = ctx.services.moderation(db) + const results = await moderationService.getEvents({ + types: types?.length ? types.map(getEventType) : [], + subject, + createdBy, + limit, + cursor, + sortDirection, + includeAllUserRecords, + }) + return { + encoding: 'application/json', + body: { + cursor: results.cursor, + events: await moderationService.views.event(results.events), + }, + } + }, + }) +} diff --git a/packages/bsky/src/api/com/atproto/admin/queryModerationStatuses.ts b/packages/bsky/src/api/com/atproto/admin/queryModerationStatuses.ts new file mode 100644 index 00000000000..843d54e498a --- /dev/null +++ b/packages/bsky/src/api/com/atproto/admin/queryModerationStatuses.ts @@ -0,0 +1,57 @@ +import { Server } from '../../../../lexicon' +import AppContext from '../../../../context' +import { getReviewState } from '../moderation/util' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.admin.queryModerationStatuses({ + auth: ctx.authVerifier.role, + handler: async ({ params }) => { + const { + subject, + takendown, + appealed, + reviewState, + reviewedAfter, + reviewedBefore, + reportedAfter, + reportedBefore, + ignoreSubjects, + lastReviewedBy, + sortDirection = 'desc', + sortField = 'lastReportedAt', + includeMuted = false, + limit = 50, + cursor, + } = params + const db = ctx.db.getPrimary() + const moderationService = ctx.services.moderation(db) + const results = await moderationService.getSubjectStatuses({ + reviewState: getReviewState(reviewState), + subject, + takendown, + appealed, + reviewedAfter, + reviewedBefore, + reportedAfter, + reportedBefore, + includeMuted, + ignoreSubjects, + sortDirection, + lastReviewedBy, + sortField, + limit, + cursor, + }) + const subjectStatuses = moderationService.views.subjectStatus( + results.statuses, + ) + return { + encoding: 'application/json', + body: { + cursor: results.cursor, + subjectStatuses, + }, + } + }, + }) +} diff --git a/packages/bsky/src/api/com/atproto/admin/searchRepos.ts b/packages/bsky/src/api/com/atproto/admin/searchRepos.ts new file mode 100644 index 00000000000..7edd753d531 --- /dev/null +++ b/packages/bsky/src/api/com/atproto/admin/searchRepos.ts @@ -0,0 +1,27 @@ +import { Server } from '../../../../lexicon' +import AppContext from '../../../../context' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.admin.searchRepos({ + auth: ctx.authVerifier.role, + handler: async ({ params }) => { + const db = ctx.db.getPrimary() + const moderationService = ctx.services.moderation(db) + const { limit, cursor } = params + // prefer new 'q' query param over deprecated 'term' + const query = params.q ?? params.term + + const { results, cursor: resCursor } = await ctx.services + .actor(db) + .getSearchResults({ query, limit, cursor, includeSoftDeleted: true }) + + return { + encoding: 'application/json', + body: { + cursor: resCursor, + repos: await moderationService.views.repo(results), + }, + } + }, + }) +} diff --git a/packages/bsky/src/api/com/atproto/admin/util.ts b/packages/bsky/src/api/com/atproto/admin/util.ts new file mode 100644 index 00000000000..7dfd10cce5c --- /dev/null +++ b/packages/bsky/src/api/com/atproto/admin/util.ts @@ -0,0 +1,52 @@ +import AppContext from '../../../../context' +import { + RepoView, + RepoViewDetail, + AccountView, +} from '../../../../lexicon/types/com/atproto/admin/defs' + +export const getPdsAccountInfo = async ( + ctx: AppContext, + did: string, +): Promise => { + const agent = ctx.moderationPushAgent + if (!agent) return null + try { + const res = await agent.api.com.atproto.admin.getAccountInfo({ did }) + return res.data + } catch (err) { + return null + } +} + +export const addAccountInfoToRepoViewDetail = ( + repoView: RepoViewDetail, + accountInfo: AccountView | null, + includeEmail = false, +): RepoViewDetail => { + if (!accountInfo) return repoView + return { + ...repoView, + email: includeEmail ? accountInfo.email : undefined, + invitedBy: accountInfo.invitedBy, + invitesDisabled: accountInfo.invitesDisabled, + inviteNote: accountInfo.inviteNote, + invites: accountInfo.invites, + emailConfirmedAt: accountInfo.emailConfirmedAt, + } +} + +export const addAccountInfoToRepoView = ( + repoView: RepoView, + accountInfo: AccountView | null, + includeEmail = false, +): RepoView => { + if (!accountInfo) return repoView + return { + ...repoView, + email: includeEmail ? accountInfo.email : undefined, + invitedBy: accountInfo.invitedBy, + invitesDisabled: accountInfo.invitesDisabled, + inviteNote: accountInfo.inviteNote, + } +} diff --git a/packages/bsky/src/api/com/atproto/moderation/createReport.ts b/packages/bsky/src/api/com/atproto/moderation/createReport.ts new file mode 100644 index 00000000000..c3e8fd50d74 --- /dev/null +++ b/packages/bsky/src/api/com/atproto/moderation/createReport.ts @@ -0,0 +1,53 @@ +import { AuthRequiredError, ForbiddenError } from '@atproto/xrpc-server' +import { Server } from '../../../../lexicon' +import AppContext from '../../../../context' +import { getReasonType, getSubject } from './util' +import { softDeleted } from '../../../../db/util' +import { REASONAPPEAL } from '../../../../lexicon/types/com/atproto/moderation/defs' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.moderation.createReport({ + // @TODO anonymous reports w/ optional auth are a temporary measure + auth: ctx.authVerifier.standardOptional, + handler: async ({ input, auth }) => { + const { reasonType, reason, subject } = input.body + const requester = auth.credentials.iss + + const db = ctx.db.getPrimary() + + if (requester) { + // Don't accept reports from users that are fully taken-down + const actor = await ctx.services.actor(db).getActor(requester, true) + if (actor && softDeleted(actor)) { + throw new AuthRequiredError() + } + } + + const reportReasonType = getReasonType(reasonType) + const reportSubject = getSubject(subject) + const subjectDid = + 'did' in reportSubject ? reportSubject.did : reportSubject.uri.host + + // If the report is an appeal, the requester must be the author of the subject + if (reasonType === REASONAPPEAL && requester !== subjectDid) { + throw new ForbiddenError('You cannot appeal this report') + } + + const report = await db.transaction(async (dbTxn) => { + const moderationTxn = ctx.services.moderation(dbTxn) + return moderationTxn.report({ + reasonType: reportReasonType, + reason, + subject: reportSubject, + reportedBy: requester || ctx.cfg.serverDid, + }) + }) + + const moderationService = ctx.services.moderation(db) + return { + encoding: 'application/json', + body: moderationService.views.reportPublic(report), + } + }, + }) +} diff --git a/packages/bsky/src/api/com/atproto/moderation/util.ts b/packages/bsky/src/api/com/atproto/moderation/util.ts new file mode 100644 index 00000000000..fbb144b1c0a --- /dev/null +++ b/packages/bsky/src/api/com/atproto/moderation/util.ts @@ -0,0 +1,91 @@ +import { CID } from 'multiformats/cid' +import { InvalidRequestError } from '@atproto/xrpc-server' +import { AtUri } from '@atproto/syntax' +import { InputSchema as ReportInput } from '../../../../lexicon/types/com/atproto/moderation/createReport' +import { InputSchema as ActionInput } from '../../../../lexicon/types/com/atproto/admin/emitModerationEvent' +import { + REASONOTHER, + REASONSPAM, + REASONMISLEADING, + REASONRUDE, + REASONSEXUAL, + REASONVIOLATION, + REASONAPPEAL, +} from '../../../../lexicon/types/com/atproto/moderation/defs' +import { + REVIEWCLOSED, + REVIEWESCALATED, + REVIEWOPEN, +} from '../../../../lexicon/types/com/atproto/admin/defs' +import { ModerationEvent } from '../../../../db/tables/moderation' +import { ModerationSubjectStatusRow } from '../../../../services/moderation/types' + +type SubjectInput = ReportInput['subject'] | ActionInput['subject'] + +export const getSubject = (subject: SubjectInput) => { + if ( + subject.$type === 'com.atproto.admin.defs#repoRef' && + typeof subject.did === 'string' + ) { + return { did: subject.did } + } + if ( + subject.$type === 'com.atproto.repo.strongRef' && + typeof subject.uri === 'string' && + typeof subject.cid === 'string' + ) { + const uri = new AtUri(subject.uri) + return { + uri, + cid: CID.parse(subject.cid), + } + } + throw new InvalidRequestError('Invalid subject') +} + +export const getReasonType = (reasonType: ReportInput['reasonType']) => { + if (reasonTypes.has(reasonType)) { + return reasonType as NonNullable['reportType'] + } + throw new InvalidRequestError('Invalid reason type') +} + +export const getEventType = (type: string) => { + if (eventTypes.has(type)) { + return type as ModerationEvent['action'] + } + throw new InvalidRequestError('Invalid event type') +} + +export const getReviewState = (reviewState?: string) => { + if (!reviewState) return undefined + if (reviewStates.has(reviewState)) { + return reviewState as ModerationSubjectStatusRow['reviewState'] + } + throw new InvalidRequestError('Invalid review state') +} + +const reviewStates = new Set([REVIEWCLOSED, REVIEWESCALATED, REVIEWOPEN]) + +const reasonTypes = new Set([ + REASONOTHER, + REASONSPAM, + REASONMISLEADING, + REASONRUDE, + REASONSEXUAL, + REASONVIOLATION, + REASONAPPEAL, +]) + +const eventTypes = new Set([ + 'com.atproto.admin.defs#modEventTakedown', + 'com.atproto.admin.defs#modEventAcknowledge', + 'com.atproto.admin.defs#modEventEscalate', + 'com.atproto.admin.defs#modEventComment', + 'com.atproto.admin.defs#modEventLabel', + 'com.atproto.admin.defs#modEventReport', + 'com.atproto.admin.defs#modEventMute', + 'com.atproto.admin.defs#modEventUnmute', + 'com.atproto.admin.defs#modEventReverseTakedown', + 'com.atproto.admin.defs#modEventEmail', +]) diff --git a/packages/bsky/src/api/index.ts b/packages/bsky/src/api/index.ts index cd99f0ad4dd..96f5da7308a 100644 --- a/packages/bsky/src/api/index.ts +++ b/packages/bsky/src/api/index.ts @@ -47,6 +47,15 @@ import resolveHandle from './com/atproto/identity/resolveHandle' import getRecord from './com/atproto/repo/getRecord' import fetchLabels from './com/atproto/temp/fetchLabels' +import emitModerationEvent from './com/atproto/admin/emitModerationEvent' +import getModerationEvent from './com/atproto/admin/getModerationEvent' +import getRepo from './com/atproto/admin/getRepo' +import getAdminRecord from './com/atproto/admin/getRecord' +import queryModerationEvents from './com/atproto/admin/queryModerationEvents' +import queryModerationStatuses from './com/atproto/admin/queryModerationStatuses' +import searchRepos from './com/atproto/admin/searchRepos' +import createReport from './com/atproto/moderation/createReport' + export * as health from './health' export * as wellKnown from './well-known' @@ -102,5 +111,15 @@ export default function (server: Server, ctx: AppContext) { resolveHandle(server, ctx) getRecord(server, ctx) fetchLabels(server, ctx) + + emitModerationEvent(server, ctx) + getModerationEvent(server, ctx) + getRepo(server, ctx) + getAdminRecord(server, ctx) + queryModerationEvents(server, ctx) + queryModerationStatuses(server, ctx) + searchRepos(server, ctx) + createReport(server, ctx) + return server } diff --git a/packages/bsky/src/config.ts b/packages/bsky/src/config.ts index faa1ac7953d..4fe26bb9b1f 100644 --- a/packages/bsky/src/config.ts +++ b/packages/bsky/src/config.ts @@ -34,6 +34,8 @@ export interface ServerConfigValues { adminPassword: string moderatorPassword: string triagePassword: string + labelerDid: string + modServiceUrl: string modServiceDid: string rateLimitsEnabled: boolean rateLimitBypassKey?: string @@ -115,6 +117,15 @@ export class ServerConfig { assert(moderatorPassword) const triagePassword = process.env.TRIAGE_PASSWORD || undefined assert(triagePassword) + const labelerDid = + overrides?.labelerDid || process.env.LABELER_DID || undefined + assert(labelerDid) + + const modServiceUrl = + overrides?.modServiceUrl || + process.env.MODERATION_SERVICE_URL || + undefined + assert(modServiceUrl) const modServiceDid = overrides?.modServiceDid || process.env.MODERATION_SERVICE_DID || @@ -155,6 +166,8 @@ export class ServerConfig { adminPassword, moderatorPassword, triagePassword, + labelerDid, + modServiceUrl, modServiceDid, rateLimitsEnabled, rateLimitBypassKey, @@ -280,6 +293,14 @@ export class ServerConfig { return this.cfg.triagePassword } + get labelerDid() { + return this.cfg.labelerDid + } + + get modServiceUrl() { + return this.cfg.modServiceUrl + } + get modServiceDid() { return this.cfg.modServiceDid } diff --git a/packages/bsky/src/context.ts b/packages/bsky/src/context.ts index 9a3eb222cdf..822f24111ae 100644 --- a/packages/bsky/src/context.ts +++ b/packages/bsky/src/context.ts @@ -12,9 +12,11 @@ import { BackgroundQueue } from './background' import { MountedAlgos } from './feed-gen/types' import { NotificationServer } from './notifications' import { Redis } from './redis' -import { AuthVerifier } from './auth-verifier' +import { AuthVerifier, buildBasicAuth } from './auth-verifier' export class AppContext { + public moderationPushAgent: AtpAgent | undefined + constructor( private opts: { db: DatabaseCoordinator @@ -31,7 +33,13 @@ export class AppContext { notifServer: NotificationServer authVerifier: AuthVerifier }, - ) {} + ) { + this.moderationPushAgent = new AtpAgent({ service: this.cfg.modServiceUrl }) + this.moderationPushAgent.api.setHeader( + 'authorization', + buildBasicAuth('admin', this.cfg.adminPassword), + ) + } get db(): DatabaseCoordinator { return this.opts.db diff --git a/packages/bsky/src/db/tables/moderation.ts b/packages/bsky/src/db/tables/moderation.ts index f1ac3572785..99f5e73310d 100644 --- a/packages/bsky/src/db/tables/moderation.ts +++ b/packages/bsky/src/db/tables/moderation.ts @@ -20,6 +20,7 @@ export interface ModerationEvent { | 'com.atproto.admin.defs#modEventMute' | 'com.atproto.admin.defs#modEventReverseTakedown' | 'com.atproto.admin.defs#modEventEmail' + | 'com.atproto.admin.defs#modEventResolveAppeal' subjectType: 'com.atproto.admin.defs#repoRef' | 'com.atproto.repo.strongRef' subjectDid: string subjectUri: string | null @@ -47,9 +48,11 @@ export interface ModerationSubjectStatus { lastReviewedBy: string | null lastReviewedAt: string | null lastReportedAt: string | null + lastAppealedAt: string | null muteUntil: string | null suspendUntil: string | null takendown: boolean + appealed: boolean | null comment: string | null } diff --git a/packages/bsky/src/services/moderation/index.ts b/packages/bsky/src/services/moderation/index.ts index 71380e16884..441fd193402 100644 --- a/packages/bsky/src/services/moderation/index.ts +++ b/packages/bsky/src/services/moderation/index.ts @@ -1,9 +1,37 @@ import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/syntax' +import { InvalidRequestError } from '@atproto/xrpc-server' import { PrimaryDatabase } from '../../db' +import { ModerationViews } from './views' import { ImageUriBuilder } from '../../image/uri' -import { ImageInvalidator } from '../../image/invalidator' +import { Main as StrongRef } from '../../lexicon/types/com/atproto/repo/strongRef' import { StatusAttr } from '../../lexicon/types/com/atproto/admin/defs' +import { ImageInvalidator } from '../../image/invalidator' +import { + isModEventComment, + isModEventLabel, + isModEventMute, + isModEventReport, + isModEventTakedown, + isModEventEmail, + RepoRef, + RepoBlobRef, +} from '../../lexicon/types/com/atproto/admin/defs' +import { + adjustModerationSubjectStatus, + getStatusIdentifierFromSubject, +} from './status' +import { + ModEventType, + ModerationEventRow, + ModerationEventRowWithHandle, + ModerationSubjectStatusRow, + ReversibleModerationEvent, + SubjectInfo, +} from './types' +import { ModerationEvent } from '../../db/tables/moderation' +import { paginate } from '../../db/pagination' +import { StatusKeyset, TimeIdKeyset } from './pagination' export class ModerationService { constructor( @@ -20,14 +48,401 @@ export class ModerationService { new ModerationService(db, imgUriBuilder, imgInvalidator) } - async takedownRepo(info: { takedownRef: string; did: string }) { - const { takedownRef, did } = info + views = new ModerationViews(this.db) + + async getEvent(id: number): Promise { + return await this.db.db + .selectFrom('moderation_event') + .selectAll() + .where('id', '=', id) + .executeTakeFirst() + } + + async getEventOrThrow(id: number): Promise { + const event = await this.getEvent(id) + if (!event) throw new InvalidRequestError('Moderation event not found') + return event + } + + async getEvents(opts: { + subject?: string + createdBy?: string + limit: number + cursor?: string + includeAllUserRecords: boolean + types: ModerationEvent['action'][] + sortDirection?: 'asc' | 'desc' + }): Promise<{ cursor?: string; events: ModerationEventRowWithHandle[] }> { + const { + subject, + createdBy, + limit, + cursor, + includeAllUserRecords, + sortDirection = 'desc', + types, + } = opts + let builder = this.db.db + .selectFrom('moderation_event') + .leftJoin( + 'actor as creatorActor', + 'creatorActor.did', + 'moderation_event.createdBy', + ) + .leftJoin( + 'actor as subjectActor', + 'subjectActor.did', + 'moderation_event.subjectDid', + ) + if (subject) { + builder = builder.where((qb) => { + if (includeAllUserRecords) { + // If subject is an at-uri, we need to extract the DID from the at-uri + // otherwise, subject is probably a DID already + if (subject.startsWith('at://')) { + const uri = new AtUri(subject) + return qb.where('subjectDid', '=', uri.hostname) + } + return qb.where('subjectDid', '=', subject) + } + return qb + .where((subQb) => + subQb + .where('subjectDid', '=', subject) + .where('subjectUri', 'is', null), + ) + .orWhere('subjectUri', '=', subject) + }) + } + if (types.length) { + builder = builder.where((qb) => { + if (types.length === 1) { + return qb.where('action', '=', types[0]) + } + + return qb.where('action', 'in', types) + }) + } + if (createdBy) { + builder = builder.where('createdBy', '=', createdBy) + } + + const { ref } = this.db.db.dynamic + const keyset = new TimeIdKeyset( + ref(`moderation_event.createdAt`), + ref('moderation_event.id'), + ) + const paginatedBuilder = paginate(builder, { + limit, + cursor, + keyset, + direction: sortDirection, + tryIndex: true, + }) + + const result = await paginatedBuilder + .selectAll(['moderation_event']) + .select([ + 'subjectActor.handle as subjectHandle', + 'creatorActor.handle as creatorHandle', + ]) + .execute() + + return { cursor: keyset.packFromResult(result), events: result } + } + + async getReport(id: number): Promise { + return await this.db.db + .selectFrom('moderation_event') + .where('action', '=', 'com.atproto.admin.defs#modEventReport') + .selectAll() + .where('id', '=', id) + .executeTakeFirst() + } + + async getCurrentStatus( + subject: { did: string } | { uri: AtUri } | { cids: CID[] }, + ) { + let builder = this.db.db.selectFrom('moderation_subject_status').selectAll() + if ('did' in subject) { + builder = builder.where('did', '=', subject.did) + } else if ('uri' in subject) { + builder = builder.where('recordPath', '=', subject.uri.toString()) + } + // TODO: Handle the cid status + return await builder.execute() + } + + buildSubjectInfo( + subject: { did: string } | { uri: AtUri; cid: CID }, + subjectBlobCids?: CID[], + ): SubjectInfo { + if ('did' in subject) { + if (subjectBlobCids?.length) { + throw new InvalidRequestError('Blobs do not apply to repo subjects') + } + // Allowing dids that may not exist: may have been deleted but needs to remain actionable. + return { + subjectType: 'com.atproto.admin.defs#repoRef', + subjectDid: subject.did, + subjectUri: null, + subjectCid: null, + } + } + + // Allowing records/blobs that may not exist: may have been deleted but needs to remain actionable. + return { + subjectType: 'com.atproto.repo.strongRef', + subjectDid: subject.uri.host, + subjectUri: subject.uri.toString(), + subjectCid: subject.cid.toString(), + } + } + + async logEvent(info: { + event: ModEventType + subject: { did: string } | { uri: AtUri; cid: CID } + subjectBlobCids?: CID[] + createdBy: string + createdAt?: Date + }): Promise { + this.db.assertTransaction() + const { + event, + createdBy, + subject, + subjectBlobCids, + createdAt = new Date(), + } = info + + // Resolve subject info + const subjectInfo = this.buildSubjectInfo(subject, subjectBlobCids) + + const createLabelVals = + isModEventLabel(event) && event.createLabelVals.length > 0 + ? event.createLabelVals.join(' ') + : undefined + const negateLabelVals = + isModEventLabel(event) && event.negateLabelVals.length > 0 + ? event.negateLabelVals.join(' ') + : undefined + + const meta: Record = {} + + if (isModEventReport(event)) { + meta.reportType = event.reportType + } + + if (isModEventComment(event) && event.sticky) { + meta.sticky = event.sticky + } + + if (isModEventEmail(event)) { + meta.subjectLine = event.subjectLine + } + + const modEvent = await this.db.db + .insertInto('moderation_event') + .values({ + comment: event.comment ? `${event.comment}` : null, + action: event.$type as ModerationEvent['action'], + createdAt: createdAt.toISOString(), + createdBy, + createLabelVals, + negateLabelVals, + durationInHours: event.durationInHours + ? Number(event.durationInHours) + : null, + meta, + expiresAt: + (isModEventTakedown(event) || isModEventMute(event)) && + event.durationInHours + ? addHoursToDate(event.durationInHours, createdAt).toISOString() + : undefined, + ...subjectInfo, + }) + .returningAll() + .executeTakeFirstOrThrow() + + await adjustModerationSubjectStatus(this.db, modEvent, subjectBlobCids) + + return modEvent + } + + async getLastReversibleEventForSubject({ + did, + muteUntil, + recordPath, + suspendUntil, + }: ModerationSubjectStatusRow) { + const isSuspended = suspendUntil && new Date(suspendUntil) < new Date() + const isMuted = muteUntil && new Date(muteUntil) < new Date() + + // If the subject is neither suspended nor muted don't bother finding the last reversible event + // Ideally, this should never happen because the caller of this method should only call this + // after ensuring that the suspended or muted subjects are being reversed + if (!isSuspended && !isMuted) { + return null + } + + let builder = this.db.db + .selectFrom('moderation_event') + .where('subjectDid', '=', did) + + if (recordPath) { + builder = builder.where('subjectUri', 'like', `%${recordPath}%`) + } + + // Means the subject was suspended and needs to be unsuspended + if (isSuspended) { + builder = builder + .where('action', '=', 'com.atproto.admin.defs#modEventTakedown') + .where('durationInHours', 'is not', null) + } + if (isMuted) { + builder = builder + .where('action', '=', 'com.atproto.admin.defs#modEventMute') + .where('durationInHours', 'is not', null) + } + + return await builder + .orderBy('id', 'desc') + .selectAll() + .limit(1) + .executeTakeFirst() + } + + async getSubjectsDueForReversal(): Promise { + const subjectsDueForReversal = await this.db.db + .selectFrom('moderation_subject_status') + .where('suspendUntil', '<', new Date().toISOString()) + .orWhere('muteUntil', '<', new Date().toISOString()) + .selectAll() + .execute() + + return subjectsDueForReversal + } + + async isSubjectSuspended(did: string): Promise { + const res = await this.db.db + .selectFrom('moderation_subject_status') + .where('did', '=', did) + .where('recordPath', '=', '') + .where('suspendUntil', '>', new Date().toISOString()) + .select('did') + .limit(1) + .executeTakeFirst() + return !!res + } + + async revertState({ + createdBy, + createdAt, + comment, + action, + subject, + }: ReversibleModerationEvent): Promise<{ + result: ModerationEventRow + restored?: TakedownSubjects + }> { + const isRevertingTakedown = + action === 'com.atproto.admin.defs#modEventTakedown' + this.db.assertTransaction() + const result = await this.logEvent({ + event: { + $type: isRevertingTakedown + ? 'com.atproto.admin.defs#modEventReverseTakedown' + : 'com.atproto.admin.defs#modEventUnmute', + comment: comment ?? undefined, + }, + createdAt, + createdBy, + subject, + }) + + let restored: TakedownSubjects | undefined + + if (!isRevertingTakedown) { + return { result, restored } + } + + if ( + result.subjectType === 'com.atproto.admin.defs#repoRef' && + result.subjectDid + ) { + await this.reverseTakedownRepo({ + did: result.subjectDid, + }) + restored = { + did: result.subjectDid, + subjects: [ + { + $type: 'com.atproto.admin.defs#repoRef', + did: result.subjectDid, + }, + ], + } + } + + if ( + result.subjectType === 'com.atproto.repo.strongRef' && + result.subjectUri + ) { + const uri = new AtUri(result.subjectUri) + await this.reverseTakedownRecord({ + uri, + }) + const did = uri.hostname + // TODO: MOD_EVENT This bit needs testing + const subjectStatus = await this.db.db + .selectFrom('moderation_subject_status') + .where('did', '=', uri.host) + .where('recordPath', '=', `${uri.collection}/${uri.rkey}`) + .select('blobCids') + .executeTakeFirst() + const blobCids = subjectStatus?.blobCids || [] + restored = { + did, + subjects: [ + { + $type: 'com.atproto.repo.strongRef', + uri: result.subjectUri, + cid: result.subjectCid ?? '', + }, + ...blobCids.map((cid) => ({ + $type: 'com.atproto.admin.defs#repoBlobRef', + did, + cid, + recordUri: result.subjectUri, + })), + ], + } + } + + return { result, restored } + } + + async takedownRepoOld(info: { + takedownId: number + did: string + }): Promise { + const { takedownId, did } = info await this.db.db .updateTable('actor') - .set({ takedownRef }) + .set({ takedownRef: takedownId.toString() }) .where('did', '=', did) .where('takedownRef', 'is', null) .executeTakeFirst() + + return { + did, + subjects: [ + { + $type: 'com.atproto.admin.defs#repoRef', + did, + }, + ], + } } async reverseTakedownRepo(info: { did: string }) { @@ -38,14 +453,48 @@ export class ModerationService { .execute() } - async takedownRecord(info: { takedownRef: string; uri: AtUri; cid: CID }) { - const { takedownRef, uri } = info + async takedownRecordOld(info: { + takedownId: number + uri: AtUri + cid: CID + blobCids?: CID[] + }): Promise { + const { takedownId, uri, cid, blobCids } = info + const did = uri.hostname + this.db.assertTransaction() await this.db.db .updateTable('record') - .set({ takedownRef }) + .set({ takedownRef: takedownId.toString() }) .where('uri', '=', uri.toString()) .where('takedownRef', 'is', null) .executeTakeFirst() + if (blobCids) { + await Promise.all( + blobCids.map(async (cid) => { + const paths = ImageUriBuilder.presets.map((id) => { + const imgUri = this.imgUriBuilder.getPresetUri(id, uri.host, cid) + return imgUri.replace(this.imgUriBuilder.endpoint, '') + }) + await this.imgInvalidator.invalidate(cid.toString(), paths) + }), + ) + } + return { + did, + subjects: [ + { + $type: 'com.atproto.repo.strongRef', + uri: uri.toString(), + cid: cid.toString(), + }, + ...(blobCids || []).map((cid) => ({ + $type: 'com.atproto.admin.defs#repoBlobRef', + did, + cid: cid.toString(), + recordUri: uri.toString(), + })), + ], + } } async reverseTakedownRecord(info: { uri: AtUri }) { @@ -56,6 +505,190 @@ export class ModerationService { .execute() } + async report(info: { + reasonType: NonNullable['reportType'] + reason?: string + subject: { did: string } | { uri: AtUri; cid: CID } + reportedBy: string + createdAt?: Date + }): Promise { + const { + reasonType, + reason, + reportedBy, + createdAt = new Date(), + subject, + } = info + + const event = await this.logEvent({ + event: { + $type: 'com.atproto.admin.defs#modEventReport', + reportType: reasonType, + comment: reason, + }, + createdBy: reportedBy, + subject, + createdAt, + }) + + return event + } + + async getSubjectStatuses({ + cursor, + limit = 50, + takendown, + appealed, + reviewState, + reviewedAfter, + reviewedBefore, + reportedAfter, + reportedBefore, + includeMuted, + ignoreSubjects, + sortDirection, + lastReviewedBy, + sortField, + subject, + }: { + cursor?: string + limit?: number + takendown?: boolean + appealed?: boolean | null + reviewedBefore?: string + reviewState?: ModerationSubjectStatusRow['reviewState'] + reviewedAfter?: string + reportedAfter?: string + reportedBefore?: string + includeMuted?: boolean + subject?: string + ignoreSubjects?: string[] + sortDirection: 'asc' | 'desc' + lastReviewedBy?: string + sortField: 'lastReviewedAt' | 'lastReportedAt' + }) { + let builder = this.db.db + .selectFrom('moderation_subject_status') + .leftJoin('actor', 'actor.did', 'moderation_subject_status.did') + + if (subject) { + const subjectInfo = getStatusIdentifierFromSubject(subject) + builder = builder + .where('moderation_subject_status.did', '=', subjectInfo.did) + .where((qb) => + subjectInfo.recordPath + ? qb.where('recordPath', '=', subjectInfo.recordPath) + : qb.where('recordPath', '=', ''), + ) + } + + if (ignoreSubjects?.length) { + builder = builder + .where('moderation_subject_status.did', 'not in', ignoreSubjects) + .where('recordPath', 'not in', ignoreSubjects) + } + + if (reviewState) { + builder = builder.where('reviewState', '=', reviewState) + } + + if (lastReviewedBy) { + builder = builder.where('lastReviewedBy', '=', lastReviewedBy) + } + + if (reviewedAfter) { + builder = builder.where('lastReviewedAt', '>', reviewedAfter) + } + + if (reviewedBefore) { + builder = builder.where('lastReviewedAt', '<', reviewedBefore) + } + + if (reportedAfter) { + builder = builder.where('lastReviewedAt', '>', reportedAfter) + } + + if (reportedBefore) { + builder = builder.where('lastReportedAt', '<', reportedBefore) + } + + if (takendown) { + builder = builder.where('takendown', '=', true) + } + + if (appealed !== undefined) { + builder = + appealed === null + ? builder.where('appealed', 'is', null) + : builder.where('appealed', '=', appealed) + } + + if (!includeMuted) { + builder = builder.where((qb) => + qb + .where('muteUntil', '<', new Date().toISOString()) + .orWhere('muteUntil', 'is', null), + ) + } + + const { ref } = this.db.db.dynamic + const keyset = new StatusKeyset( + ref(`moderation_subject_status.${sortField}`), + ref('moderation_subject_status.id'), + ) + const paginatedBuilder = paginate(builder, { + limit, + cursor, + keyset, + direction: sortDirection, + tryIndex: true, + nullsLast: true, + }) + + const results = await paginatedBuilder + .select('actor.handle as handle') + .selectAll('moderation_subject_status') + .execute() + + return { statuses: results, cursor: keyset.packFromResult(results) } + } + + async isSubjectTakendown( + subject: { did: string } | { uri: AtUri }, + ): Promise { + const { did, recordPath } = getStatusIdentifierFromSubject( + 'did' in subject ? subject.did : subject.uri, + ) + const builder = this.db.db + .selectFrom('moderation_subject_status') + .where('did', '=', did) + .where('recordPath', '=', recordPath || '') + + const result = await builder.select('takendown').executeTakeFirst() + + return !!result?.takendown + } + + async takedownRepo(info: { takedownRef: string; did: string }) { + const { takedownRef, did } = info + await this.db.db + .updateTable('actor') + .set({ takedownRef }) + .where('did', '=', did) + .where('takedownRef', 'is', null) + .executeTakeFirst() + } + + async takedownRecord(info: { takedownRef: string; uri: AtUri; cid: CID }) { + const { takedownRef, uri } = info + await this.db.db + .updateTable('record') + .set({ takedownRef }) + .where('uri', '=', uri.toString()) + .where('takedownRef', 'is', null) + .executeTakeFirst() + } + async takedownBlob(info: { takedownRef: string; did: string; cid: string }) { const { takedownRef, did, cid } = info await this.db.db @@ -113,6 +746,18 @@ export class ModerationService { } } +export type TakedownSubjects = { + did: string + subjects: (RepoRef | RepoBlobRef | StrongRef)[] +} + const formatStatus = (ref: string | null): StatusAttr => { return ref ? { applied: true, ref } : { applied: false } } + +export function addHoursToDate(hours: number, startingDate?: Date): Date { + // When date is passe, let's clone before calling `setHours()` so that we are not mutating the original date + const currentDate = startingDate ? new Date(startingDate) : new Date() + currentDate.setHours(currentDate.getHours() + hours) + return currentDate +} diff --git a/packages/bsky/src/services/moderation/pagination.ts b/packages/bsky/src/services/moderation/pagination.ts new file mode 100644 index 00000000000..c68de0822d4 --- /dev/null +++ b/packages/bsky/src/services/moderation/pagination.ts @@ -0,0 +1,96 @@ +import { InvalidRequestError } from '@atproto/xrpc-server' +import { DynamicModule, sql } from 'kysely' + +import { Cursor, GenericKeyset } from '../../db/pagination' + +type StatusKeysetParam = { + lastReviewedAt: string | null + lastReportedAt: string | null + id: number +} + +export class StatusKeyset extends GenericKeyset { + labelResult(result: StatusKeysetParam): Cursor + labelResult(result: StatusKeysetParam) { + const primaryField = ( + this.primary as ReturnType + ).dynamicReference.includes('lastReviewedAt') + ? 'lastReviewedAt' + : 'lastReportedAt' + + return { + primary: result[primaryField] + ? new Date(`${result[primaryField]}`).getTime().toString() + : '', + secondary: result.id.toString(), + } + } + labeledResultToCursor(labeled: Cursor) { + return { + primary: labeled.primary, + secondary: labeled.secondary, + } + } + cursorToLabeledResult(cursor: Cursor) { + return { + primary: cursor.primary + ? new Date(parseInt(cursor.primary, 10)).toISOString() + : '', + secondary: cursor.secondary, + } + } + unpackCursor(cursorStr?: string): Cursor | undefined { + if (!cursorStr) return + const result = cursorStr.split('::') + const [primary, secondary, ...others] = result + if (!secondary || others.length > 0) { + throw new InvalidRequestError('Malformed cursor') + } + return { + primary, + secondary, + } + } + // This is specifically built to handle nullable columns as primary sorting column + getSql(labeled?: Cursor, direction?: 'asc' | 'desc') { + if (labeled === undefined) return + if (direction === 'asc') { + return !labeled.primary + ? sql`(${this.primary} IS NULL AND ${this.secondary} > ${labeled.secondary})` + : sql`((${this.primary}, ${this.secondary}) > (${labeled.primary}, ${labeled.secondary}) OR (${this.primary} is null))` + } else { + return !labeled.primary + ? sql`(${this.primary} IS NULL AND ${this.secondary} < ${labeled.secondary})` + : sql`((${this.primary}, ${this.secondary}) < (${labeled.primary}, ${labeled.secondary}) OR (${this.primary} is null))` + } + } +} + +type TimeIdKeysetParam = { + id: number + createdAt: string +} +type TimeIdResult = TimeIdKeysetParam + +export class TimeIdKeyset extends GenericKeyset { + labelResult(result: TimeIdResult): Cursor + labelResult(result: TimeIdResult) { + return { primary: result.createdAt, secondary: result.id.toString() } + } + labeledResultToCursor(labeled: Cursor) { + return { + primary: new Date(labeled.primary).getTime().toString(), + secondary: labeled.secondary, + } + } + cursorToLabeledResult(cursor: Cursor) { + const primaryDate = new Date(parseInt(cursor.primary, 10)) + if (isNaN(primaryDate.getTime())) { + throw new InvalidRequestError('Malformed cursor') + } + return { + primary: primaryDate.toISOString(), + secondary: cursor.secondary, + } + } +} diff --git a/packages/bsky/src/services/moderation/status.ts b/packages/bsky/src/services/moderation/status.ts new file mode 100644 index 00000000000..151f6137a05 --- /dev/null +++ b/packages/bsky/src/services/moderation/status.ts @@ -0,0 +1,265 @@ +// This may require better organization but for now, just dumping functions here containing DB queries for moderation status + +import { AtUri } from '@atproto/syntax' +import { PrimaryDatabase } from '../../db' +import { ModerationSubjectStatus } from '../../db/tables/moderation' +import { + REVIEWOPEN, + REVIEWCLOSED, + REVIEWESCALATED, +} from '../../lexicon/types/com/atproto/admin/defs' +import { ModerationEventRow, ModerationSubjectStatusRow } from './types' +import { HOUR } from '@atproto/common' +import { CID } from 'multiformats/cid' +import { sql } from 'kysely' +import { REASONAPPEAL } from '../../lexicon/types/com/atproto/moderation/defs' + +const getSubjectStatusForModerationEvent = ({ + action, + createdBy, + createdAt, + durationInHours, +}: { + action: string + createdBy: string + createdAt: string + durationInHours: number | null +}): Partial | null => { + switch (action) { + case 'com.atproto.admin.defs#modEventAcknowledge': + return { + lastReviewedBy: createdBy, + reviewState: REVIEWCLOSED, + lastReviewedAt: createdAt, + } + case 'com.atproto.admin.defs#modEventReport': + return { + reviewState: REVIEWOPEN, + lastReportedAt: createdAt, + } + case 'com.atproto.admin.defs#modEventEscalate': + return { + lastReviewedBy: createdBy, + reviewState: REVIEWESCALATED, + lastReviewedAt: createdAt, + } + case 'com.atproto.admin.defs#modEventReverseTakedown': + return { + lastReviewedBy: createdBy, + reviewState: REVIEWCLOSED, + takendown: false, + suspendUntil: null, + lastReviewedAt: createdAt, + } + case 'com.atproto.admin.defs#modEventUnmute': + return { + lastReviewedBy: createdBy, + muteUntil: null, + reviewState: REVIEWOPEN, + lastReviewedAt: createdAt, + } + case 'com.atproto.admin.defs#modEventTakedown': + return { + takendown: true, + lastReviewedBy: createdBy, + reviewState: REVIEWCLOSED, + lastReviewedAt: createdAt, + suspendUntil: durationInHours + ? new Date(Date.now() + durationInHours * HOUR).toISOString() + : null, + } + case 'com.atproto.admin.defs#modEventMute': + return { + lastReviewedBy: createdBy, + reviewState: REVIEWOPEN, + lastReviewedAt: createdAt, + // By default, mute for 24hrs + muteUntil: new Date( + Date.now() + (durationInHours || 24) * HOUR, + ).toISOString(), + } + case 'com.atproto.admin.defs#modEventComment': + return { + lastReviewedBy: createdBy, + lastReviewedAt: createdAt, + } + case 'com.atproto.admin.defs#modEventResolveAppeal': + return { + appealed: false, + } + default: + return null + } +} + +// Based on a given moderation action event, this function will update the moderation status of the subject +// If there's no existing status, it will create one +// If the action event does not affect the status, it will do nothing +export const adjustModerationSubjectStatus = async ( + db: PrimaryDatabase, + moderationEvent: ModerationEventRow, + blobCids?: CID[], +) => { + const { + action, + subjectDid, + subjectUri, + subjectCid, + createdBy, + meta, + comment, + createdAt, + } = moderationEvent + + const isAppealEvent = + action === 'com.atproto.admin.defs#modEventReport' && + meta?.reportType === REASONAPPEAL + + const subjectStatus = getSubjectStatusForModerationEvent({ + action, + createdBy, + createdAt, + durationInHours: moderationEvent.durationInHours, + }) + + // If there are no subjectStatus that means there are no side-effect of the incoming event + if (!subjectStatus) { + return null + } + + const now = new Date().toISOString() + // If subjectUri exists, it's not a repoRef so pass along the uri to get identifier back + const identifier = getStatusIdentifierFromSubject(subjectUri || subjectDid) + + db.assertTransaction() + + const currentStatus = await db.db + .selectFrom('moderation_subject_status') + .where('did', '=', identifier.did) + .where('recordPath', '=', identifier.recordPath) + .selectAll() + .executeTakeFirst() + + if ( + currentStatus?.reviewState === REVIEWESCALATED && + subjectStatus.reviewState === REVIEWOPEN + ) { + // If the current status is escalated and the incoming event is to open the review + // We want to keep the status as escalated + subjectStatus.reviewState = REVIEWESCALATED + } + + // Set these because we don't want to override them if they're already set + const defaultData = { + comment: null, + // Defaulting reviewState to open for any event may not be the desired behavior. + // For instance, if a subject never had any event and we just want to leave a comment to keep an eye on it + // that shouldn't mean we want to review the subject + reviewState: REVIEWOPEN, + recordCid: subjectCid || null, + } + const newStatus = { + ...defaultData, + ...subjectStatus, + } + + if ( + action === 'com.atproto.admin.defs#modEventReverseTakedown' && + !subjectStatus.takendown + ) { + newStatus.takendown = false + subjectStatus.takendown = false + } + + if (isAppealEvent) { + newStatus.appealed = true + subjectStatus.appealed = true + newStatus.lastAppealedAt = createdAt + subjectStatus.lastAppealedAt = createdAt + } + + if ( + action === 'com.atproto.admin.defs#modEventResolveAppeal' && + subjectStatus.appealed + ) { + newStatus.appealed = false + subjectStatus.appealed = false + } + + if (action === 'com.atproto.admin.defs#modEventComment' && meta?.sticky) { + newStatus.comment = comment + subjectStatus.comment = comment + } + + if (blobCids?.length) { + const newBlobCids = sql`${JSON.stringify( + blobCids.map((c) => c.toString()), + )}` as unknown as ModerationSubjectStatusRow['blobCids'] + newStatus.blobCids = newBlobCids + subjectStatus.blobCids = newBlobCids + } + + const insertQuery = db.db + .insertInto('moderation_subject_status') + .values({ + ...identifier, + ...newStatus, + createdAt: now, + updatedAt: now, + // TODO: Need to get the types right here. + } as ModerationSubjectStatusRow) + .onConflict((oc) => + oc.constraint('moderation_status_unique_idx').doUpdateSet({ + ...subjectStatus, + updatedAt: now, + }), + ) + + const status = await insertQuery.executeTakeFirst() + return status +} + +type ModerationSubjectStatusFilter = + | Pick + | Pick + | Pick +export const getModerationSubjectStatus = async ( + db: PrimaryDatabase, + filters: ModerationSubjectStatusFilter, +) => { + let builder = db.db + .selectFrom('moderation_subject_status') + // DID will always be passed at the very least + .where('did', '=', filters.did) + .where('recordPath', '=', 'recordPath' in filters ? filters.recordPath : '') + + if ('recordCid' in filters) { + builder = builder.where('recordCid', '=', filters.recordCid) + } else { + builder = builder.where('recordCid', 'is', null) + } + + return builder.executeTakeFirst() +} + +export const getStatusIdentifierFromSubject = ( + subject: string | AtUri, +): { did: string; recordPath: string } => { + const isSubjectString = typeof subject === 'string' + if (isSubjectString && subject.startsWith('did:')) { + return { + did: subject, + recordPath: '', + } + } + + if (isSubjectString && !subject.startsWith('at://')) { + throw new Error('Subject is neither a did nor an at-uri') + } + + const uri = isSubjectString ? new AtUri(subject) : subject + return { + did: uri.host, + recordPath: `${uri.collection}/${uri.rkey}`, + } +} diff --git a/packages/bsky/src/services/moderation/types.ts b/packages/bsky/src/services/moderation/types.ts new file mode 100644 index 00000000000..77a8baf71ff --- /dev/null +++ b/packages/bsky/src/services/moderation/types.ts @@ -0,0 +1,49 @@ +import { Selectable } from 'kysely' +import { + ModerationEvent, + ModerationSubjectStatus, +} from '../../db/tables/moderation' +import { AtUri } from '@atproto/syntax' +import { CID } from 'multiformats/cid' +import { ComAtprotoAdminDefs } from '@atproto/api' + +export type SubjectInfo = + | { + subjectType: 'com.atproto.admin.defs#repoRef' + subjectDid: string + subjectUri: null + subjectCid: null + } + | { + subjectType: 'com.atproto.repo.strongRef' + subjectDid: string + subjectUri: string + subjectCid: string + } + +export type ModerationEventRow = Selectable +export type ReversibleModerationEvent = Pick< + ModerationEventRow, + 'createdBy' | 'comment' | 'action' +> & { + createdAt?: Date + subject: { did: string } | { uri: AtUri; cid: CID } +} + +export type ModerationEventRowWithHandle = ModerationEventRow & { + subjectHandle?: string | null + creatorHandle?: string | null +} +export type ModerationSubjectStatusRow = Selectable +export type ModerationSubjectStatusRowWithHandle = + ModerationSubjectStatusRow & { handle: string | null } + +export type ModEventType = + | ComAtprotoAdminDefs.ModEventTakedown + | ComAtprotoAdminDefs.ModEventAcknowledge + | ComAtprotoAdminDefs.ModEventEscalate + | ComAtprotoAdminDefs.ModEventComment + | ComAtprotoAdminDefs.ModEventLabel + | ComAtprotoAdminDefs.ModEventReport + | ComAtprotoAdminDefs.ModEventMute + | ComAtprotoAdminDefs.ModEventReverseTakedown diff --git a/packages/bsky/src/services/moderation/views.ts b/packages/bsky/src/services/moderation/views.ts new file mode 100644 index 00000000000..654a6e54291 --- /dev/null +++ b/packages/bsky/src/services/moderation/views.ts @@ -0,0 +1,551 @@ +import { sql } from 'kysely' +import { ArrayEl } from '@atproto/common' +import { AtUri } from '@atproto/syntax' +import { INVALID_HANDLE } from '@atproto/syntax' +import { BlobRef, jsonStringToLex } from '@atproto/lexicon' +import { Database } from '../../db' +import { Actor } from '../../db/tables/actor' +import { Record as RecordRow } from '../../db/tables/record' +import { + ModEventView, + RepoView, + RepoViewDetail, + RecordView, + RecordViewDetail, + ReportViewDetail, + BlobView, + SubjectStatusView, + ModEventViewDetail, +} from '../../lexicon/types/com/atproto/admin/defs' +import { OutputSchema as ReportOutput } from '../../lexicon/types/com/atproto/moderation/createReport' +import { Label } from '../../lexicon/types/com/atproto/label/defs' +import { + ModerationEventRowWithHandle, + ModerationSubjectStatusRowWithHandle, +} from './types' +import { getSelfLabels } from '../label' +import { REASONOTHER } from '../../lexicon/types/com/atproto/moderation/defs' + +export class ModerationViews { + constructor(private db: Database) {} + + repo(result: RepoResult): Promise + repo(result: RepoResult[]): Promise + async repo( + result: RepoResult | RepoResult[], + ): Promise { + const results = Array.isArray(result) ? result : [result] + if (results.length === 0) return [] + + const [info, subjectStatuses] = await Promise.all([ + await this.db.db + .selectFrom('actor') + .leftJoin('profile', 'profile.creator', 'actor.did') + .leftJoin( + 'record as profile_record', + 'profile_record.uri', + 'profile.uri', + ) + .where( + 'actor.did', + 'in', + results.map((r) => r.did), + ) + .select(['actor.did as did', 'profile_record.json as profileJson']) + .execute(), + this.getSubjectStatus(results.map((r) => ({ did: r.did }))), + ]) + + const infoByDid = info.reduce( + (acc, cur) => Object.assign(acc, { [cur.did]: cur }), + {} as Record>, + ) + const subjectStatusByDid = subjectStatuses.reduce( + (acc, cur) => + Object.assign(acc, { [cur.did ?? '']: this.subjectStatus(cur) }), + {}, + ) + + const views = results.map((r) => { + const { profileJson } = infoByDid[r.did] ?? {} + const relatedRecords: object[] = [] + if (profileJson) { + relatedRecords.push( + jsonStringToLex(profileJson) as Record, + ) + } + return { + // No email or invite info on appview + did: r.did, + handle: r.handle ?? INVALID_HANDLE, + relatedRecords, + indexedAt: r.indexedAt, + moderation: { + subjectStatus: subjectStatusByDid[r.did] ?? undefined, + }, + } + }) + + return Array.isArray(result) ? views : views[0] + } + event(result: EventResult): Promise + event(result: EventResult[]): Promise + async event( + result: EventResult | EventResult[], + ): Promise { + const results = Array.isArray(result) ? result : [result] + if (results.length === 0) return [] + + const views = results.map((res) => { + const eventView: ModEventView = { + id: res.id, + event: { + $type: res.action, + comment: res.comment ?? undefined, + }, + subject: + res.subjectType === 'com.atproto.admin.defs#repoRef' + ? { + $type: 'com.atproto.admin.defs#repoRef', + did: res.subjectDid, + } + : { + $type: 'com.atproto.repo.strongRef', + uri: res.subjectUri, + cid: res.subjectCid, + }, + subjectBlobCids: [], + createdBy: res.createdBy, + createdAt: res.createdAt, + subjectHandle: res.subjectHandle ?? undefined, + creatorHandle: res.creatorHandle ?? undefined, + } + + if ( + [ + 'com.atproto.admin.defs#modEventTakedown', + 'com.atproto.admin.defs#modEventMute', + ].includes(res.action) + ) { + eventView.event = { + ...eventView.event, + durationInHours: res.durationInHours ?? undefined, + } + } + + if (res.action === 'com.atproto.admin.defs#modEventLabel') { + eventView.event = { + ...eventView.event, + createLabelVals: res.createLabelVals?.length + ? res.createLabelVals.split(' ') + : [], + negateLabelVals: res.negateLabelVals?.length + ? res.negateLabelVals.split(' ') + : [], + } + } + + // This is for legacy data only, for new events, these types of events won't have labels attached + if ( + [ + 'com.atproto.admin.defs#modEventAcknowledge', + 'com.atproto.admin.defs#modEventTakedown', + 'com.atproto.admin.defs#modEventEscalate', + ].includes(res.action) + ) { + if (res.createLabelVals?.length) { + eventView.event = { + ...eventView.event, + createLabelVals: res.createLabelVals.split(' '), + } + } + + if (res.negateLabelVals?.length) { + eventView.event = { + ...eventView.event, + negateLabelVals: res.negateLabelVals.split(' '), + } + } + } + + if (res.action === 'com.atproto.admin.defs#modEventReport') { + eventView.event = { + ...eventView.event, + reportType: res.meta?.reportType ?? undefined, + } + } + + if (res.action === 'com.atproto.admin.defs#modEventEmail') { + eventView.event = { + ...eventView.event, + subjectLine: res.meta?.subjectLine ?? '', + } + } + + if ( + res.action === 'com.atproto.admin.defs#modEventComment' && + res.meta?.sticky + ) { + eventView.event.sticky = true + } + + return eventView + }) + + return Array.isArray(result) ? views : views[0] + } + + async eventDetail(result: EventResult): Promise { + const [event, subject] = await Promise.all([ + this.event(result), + this.subject(result), + ]) + const allBlobs = findBlobRefs(subject.value) + const subjectBlobs = await this.blob( + allBlobs.filter((blob) => + event.subjectBlobCids.includes(blob.ref.toString()), + ), + ) + return { + ...event, + subject, + subjectBlobs, + } + } + + async repoDetail(result: RepoResult): Promise { + const [repo, labels] = await Promise.all([ + this.repo(result), + this.labels(result.did), + ]) + + return { + ...repo, + moderation: { + ...repo.moderation, + }, + labels, + } + } + + record(result: RecordResult): Promise + record(result: RecordResult[]): Promise + async record( + result: RecordResult | RecordResult[], + ): Promise { + const results = Array.isArray(result) ? result : [result] + if (results.length === 0) return [] + + const [repoResults, subjectStatuses] = await Promise.all([ + this.db.db + .selectFrom('actor') + .where( + 'actor.did', + 'in', + results.map((r) => didFromUri(r.uri)), + ) + .selectAll() + .execute(), + this.getSubjectStatus(results.map((r) => didAndRecordPathFromUri(r.uri))), + ]) + const repos = await this.repo(repoResults) + + const reposByDid = repos.reduce( + (acc, cur) => Object.assign(acc, { [cur.did]: cur }), + {} as Record>, + ) + const subjectStatusByUri = subjectStatuses.reduce( + (acc, cur) => + Object.assign(acc, { + [`${cur.did}/${cur.recordPath}` ?? '']: this.subjectStatus(cur), + }), + {}, + ) + + const views = results.map((res) => { + const repo = reposByDid[didFromUri(res.uri)] + const { did, recordPath } = didAndRecordPathFromUri(res.uri) + const subjectStatus = subjectStatusByUri[`${did}/${recordPath}`] + if (!repo) throw new Error(`Record repo is missing: ${res.uri}`) + const value = jsonStringToLex(res.json) as Record + return { + uri: res.uri, + cid: res.cid, + value, + blobCids: findBlobRefs(value).map((blob) => blob.ref.toString()), + indexedAt: res.indexedAt, + repo, + moderation: { + subjectStatus, + }, + } + }) + + return Array.isArray(result) ? views : views[0] + } + + async recordDetail(result: RecordResult): Promise { + const [record, subjectStatusResult] = await Promise.all([ + this.record(result), + this.getSubjectStatus(didAndRecordPathFromUri(result.uri)), + ]) + + const [blobs, labels, subjectStatus] = await Promise.all([ + this.blob(findBlobRefs(record.value)), + this.labels(record.uri), + subjectStatusResult?.length + ? this.subjectStatus(subjectStatusResult[0]) + : Promise.resolve(undefined), + ]) + const selfLabels = getSelfLabels({ + uri: result.uri, + cid: result.cid, + record: jsonStringToLex(result.json) as Record, + }) + return { + ...record, + blobs, + moderation: { + ...record.moderation, + subjectStatus, + }, + labels: [...labels, ...selfLabels], + } + } + reportPublic(report: ReportResult): ReportOutput { + return { + id: report.id, + createdAt: report.createdAt, + // Ideally, we would never have a report entry that does not have a reasonType but at the schema level + // we are not guarantying that so in whatever case, if we end up with such entries, default to 'other' + reasonType: report.meta?.reportType + ? (report.meta?.reportType as string) + : REASONOTHER, + reason: report.comment ?? undefined, + reportedBy: report.createdBy, + subject: + report.subjectType === 'com.atproto.admin.defs#repoRef' + ? { + $type: 'com.atproto.admin.defs#repoRef', + did: report.subjectDid, + } + : { + $type: 'com.atproto.repo.strongRef', + uri: report.subjectUri, + cid: report.subjectCid, + }, + } + } + // Partial view for subjects + + async subject(result: SubjectResult): Promise { + let subject: SubjectView + if (result.subjectType === 'com.atproto.admin.defs#repoRef') { + const repoResult = await this.db.db + .selectFrom('actor') + .selectAll() + .where('did', '=', result.subjectDid) + .executeTakeFirst() + if (repoResult) { + subject = await this.repo(repoResult) + subject.$type = 'com.atproto.admin.defs#repoView' + } else { + subject = { did: result.subjectDid } + subject.$type = 'com.atproto.admin.defs#repoViewNotFound' + } + } else if ( + result.subjectType === 'com.atproto.repo.strongRef' && + result.subjectUri !== null + ) { + const recordResult = await this.db.db + .selectFrom('record') + .selectAll() + .where('uri', '=', result.subjectUri) + .executeTakeFirst() + if (recordResult) { + subject = await this.record(recordResult) + subject.$type = 'com.atproto.admin.defs#recordView' + } else { + subject = { uri: result.subjectUri } + subject.$type = 'com.atproto.admin.defs#recordViewNotFound' + } + } else { + throw new Error(`Bad subject data: (${result.id}) ${result.subjectType}`) + } + return subject + } + + // Partial view for blobs + + async blob(blobs: BlobRef[]): Promise { + if (!blobs.length) return [] + const { ref } = this.db.db.dynamic + const modStatusResults = await this.db.db + .selectFrom('moderation_subject_status') + .where( + sql`${ref( + 'moderation_subject_status.blobCids', + )} @> ${JSON.stringify(blobs.map((blob) => blob.ref.toString()))}`, + ) + .selectAll() + .executeTakeFirst() + const statusByCid = (modStatusResults?.blobCids || [])?.reduce( + (acc, cur) => Object.assign(acc, { [cur]: modStatusResults }), + {}, + ) + // Intentionally missing details field, since we don't have any on appview. + // We also don't know when the blob was created, so we use a canned creation time. + const unknownTime = new Date(0).toISOString() + return blobs.map((blob) => { + const cid = blob.ref.toString() + const subjectStatus = statusByCid[cid] + ? this.subjectStatus(statusByCid[cid]) + : undefined + return { + cid, + mimeType: blob.mimeType, + size: blob.size, + createdAt: unknownTime, + moderation: { + subjectStatus, + }, + } + }) + } + + async labels(subject: string, includeNeg?: boolean): Promise { + const res = await this.db.db + .selectFrom('label') + .where('label.uri', '=', subject) + .if(!includeNeg, (qb) => qb.where('neg', '=', false)) + .selectAll() + .execute() + return res.map((l) => ({ + ...l, + cid: l.cid === '' ? undefined : l.cid, + neg: l.neg, + })) + } + + async getSubjectStatus( + subject: + | { did: string; recordPath?: string } + | { did: string; recordPath?: string }[], + ): Promise { + const subjectFilters = Array.isArray(subject) ? subject : [subject] + const filterForSubject = + ({ did, recordPath }: { did: string; recordPath?: string }) => + // TODO: Fix the typing here? + (clause: any) => { + clause = clause + .where('moderation_subject_status.did', '=', did) + .where('moderation_subject_status.recordPath', '=', recordPath || '') + return clause + } + + const builder = this.db.db + .selectFrom('moderation_subject_status') + .leftJoin('actor', 'actor.did', 'moderation_subject_status.did') + .where((clause) => { + subjectFilters.forEach(({ did, recordPath }, i) => { + const applySubjectFilter = filterForSubject({ did, recordPath }) + if (i === 0) { + clause = clause.where(applySubjectFilter) + } else { + clause = clause.orWhere(applySubjectFilter) + } + }) + + return clause + }) + .selectAll('moderation_subject_status') + .select('actor.handle as handle') + + return builder.execute() + } + + subjectStatus(result: ModerationSubjectStatusRowWithHandle): SubjectStatusView + subjectStatus( + result: ModerationSubjectStatusRowWithHandle[], + ): SubjectStatusView[] + subjectStatus( + result: + | ModerationSubjectStatusRowWithHandle + | ModerationSubjectStatusRowWithHandle[], + ): SubjectStatusView | SubjectStatusView[] { + const results = Array.isArray(result) ? result : [result] + if (results.length === 0) return [] + + const decoratedSubjectStatuses = results.map((subjectStatus) => ({ + id: subjectStatus.id, + reviewState: subjectStatus.reviewState, + createdAt: subjectStatus.createdAt, + updatedAt: subjectStatus.updatedAt, + comment: subjectStatus.comment ?? undefined, + lastReviewedBy: subjectStatus.lastReviewedBy ?? undefined, + lastReviewedAt: subjectStatus.lastReviewedAt ?? undefined, + lastReportedAt: subjectStatus.lastReportedAt ?? undefined, + lastAppealedAt: subjectStatus.lastAppealedAt ?? undefined, + muteUntil: subjectStatus.muteUntil ?? undefined, + suspendUntil: subjectStatus.suspendUntil ?? undefined, + takendown: subjectStatus.takendown ?? undefined, + appealed: subjectStatus.appealed ?? undefined, + subjectRepoHandle: subjectStatus.handle ?? undefined, + subjectBlobCids: subjectStatus.blobCids || [], + subject: !subjectStatus.recordPath + ? { + $type: 'com.atproto.admin.defs#repoRef', + did: subjectStatus.did, + } + : { + $type: 'com.atproto.repo.strongRef', + uri: AtUri.make( + subjectStatus.did, + // Not too intuitive but the recordpath is basically / + // which is what the last 2 params of .make() arguments are + ...subjectStatus.recordPath.split('/'), + ).toString(), + cid: subjectStatus.recordCid, + }, + })) + + return Array.isArray(result) + ? decoratedSubjectStatuses + : decoratedSubjectStatuses[0] + } +} + +type RepoResult = Actor + +type EventResult = ModerationEventRowWithHandle + +type ReportResult = ModerationEventRowWithHandle + +type RecordResult = RecordRow + +type SubjectResult = Pick< + EventResult & ReportResult, + 'id' | 'subjectType' | 'subjectDid' | 'subjectUri' | 'subjectCid' +> + +type SubjectView = ModEventViewDetail['subject'] & ReportViewDetail['subject'] + +function didFromUri(uri: string) { + return new AtUri(uri).host +} + +function didAndRecordPathFromUri(uri: string) { + const atUri = new AtUri(uri) + return { did: atUri.host, recordPath: `${atUri.collection}/${atUri.rkey}` } +} + +function findBlobRefs(value: unknown, refs: BlobRef[] = []) { + if (value instanceof BlobRef) { + refs.push(value) + } else if (Array.isArray(value)) { + value.forEach((val) => findBlobRefs(val, refs)) + } else if (value && typeof value === 'object') { + Object.values(value).forEach((val) => findBlobRefs(val, refs)) + } + return refs +} diff --git a/packages/dev-env/src/bsky.ts b/packages/dev-env/src/bsky.ts index 8eb40ed0b36..fb3e2788227 100644 --- a/packages/dev-env/src/bsky.ts +++ b/packages/dev-env/src/bsky.ts @@ -44,6 +44,8 @@ export class TestBsky { didCacheMaxTTL: DAY, labelCacheStaleTTL: 30 * SECOND, labelCacheMaxTTL: MINUTE, + labelerDid: 'did:example:labeler', + modServiceUrl: 'https://mod.invalid', modServiceDid: cfg.modServiceDid ?? 'did:example:invalidMod', ...cfg, // Each test suite gets its own lock id for the repo subscription diff --git a/services/bsky/api.js b/services/bsky/api.js index 42737d72b56..534c102971d 100644 --- a/services/bsky/api.js +++ b/services/bsky/api.js @@ -27,7 +27,6 @@ const { ServerConfig, BskyAppView, makeAlgos, - PeriodicModerationEventReversal, } = require('@atproto/bsky') const main = async () => { @@ -133,18 +132,9 @@ const main = async () => { algos, }) - const periodicModerationEventReversal = new PeriodicModerationEventReversal( - bsky.ctx, - ) - const periodicModerationEventReversalRunning = - periodicModerationEventReversal.run() - await bsky.start() // Graceful shutdown (see also https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/) const shutdown = async () => { - // Gracefully shutdown periodic-moderation-event-reversal before destroying bsky instance - periodicModerationEventReversal.destroy() - await periodicModerationEventReversalRunning await bsky.destroy() } process.on('SIGTERM', shutdown)