diff --git a/packages/bsky/src/api/com/atproto/admin/queryModerationEvents.ts b/packages/bsky/src/api/com/atproto/admin/queryModerationEvents.ts index 9a311540746..1868533295c 100644 --- a/packages/bsky/src/api/com/atproto/admin/queryModerationEvents.ts +++ b/packages/bsky/src/api/com/atproto/admin/queryModerationEvents.ts @@ -29,8 +29,8 @@ export default function (server: Server, ctx: AppContext) { return { encoding: 'application/json', body: { - cursor: results.at(-1)?.id.toString() ?? undefined, - events: await moderationService.views.event(results), + cursor: results.cursor, + events: await moderationService.views.event(results.events), }, } }, diff --git a/packages/bsky/src/db/migrations/20231003T202833377Z-create-moderation-subject-status.ts b/packages/bsky/src/db/migrations/20231003T202833377Z-create-moderation-subject-status.ts index 4deef5d71cb..5419233804e 100644 --- a/packages/bsky/src/db/migrations/20231003T202833377Z-create-moderation-subject-status.ts +++ b/packages/bsky/src/db/migrations/20231003T202833377Z-create-moderation-subject-status.ts @@ -53,6 +53,13 @@ export async function up(db: Kysely): Promise { .addUniqueConstraint('moderation_status_unique_idx', ['did', 'recordPath']) .execute() + await db.schema + .createIndex('moderation_subject_status_blob_cids_idx') + .on('moderation_subject_status') + .using('gin') + .column('blobCids') + .execute() + // Move foreign keys from moderation_action to moderation_event await db.schema .alterTable('record') diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index 65f0ab233c5..a5d901e3000 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -39,6 +39,7 @@ export { AppContext } from './context' export { makeAlgos } from './feed-gen' export * from './indexer' export * from './ingester' +export { MigrateModerationData } from './migrate-moderation-data' export class BskyAppView { public ctx: AppContext diff --git a/packages/bsky/src/migrate-moderation-data.ts b/packages/bsky/src/migrate-moderation-data.ts index ccf6328dfd2..5d6d35fc089 100644 --- a/packages/bsky/src/migrate-moderation-data.ts +++ b/packages/bsky/src/migrate-moderation-data.ts @@ -11,7 +11,7 @@ const getEnv = () => ({ DB_URL: process.env.MODERATION_MIGRATION_DB_URL || 'postgresql://pg:password@127.0.0.1:5433/postgres', - DB_POOL_SIZE: Number(process.env.MODERATION_MIGRATION_DB_URL) || 10, + DB_POOL_SIZE: Number(process.env.MODERATION_MIGRATION_DB_POOL_SIZE) || 10, DB_SCHEMA: process.env.MODERATION_MIGRATION_DB_SCHEMA || 'bsky', }) @@ -53,7 +53,7 @@ const countStatuses = async (db: PrimaryDatabase) => { } const createEvents = async (db: PrimaryDatabase) => { - const commonColumns = [ + const commonColumnsToSelect = [ 'subjectDid', 'subjectUri', 'subjectType', @@ -61,24 +61,26 @@ const createEvents = async (db: PrimaryDatabase) => { sql`reason`.as('comment'), 'createdAt', ] + const commonColumnsToInsert = [ + 'subjectDid', + 'subjectUri', + 'subjectType', + 'subjectCid', + 'comment', + 'createdAt', + 'action', + 'createdBy', + ] as const - const insertQuery = db.db + await db.db .insertInto('moderation_event') .columns([ - 'subjectDid', - 'subjectUri', - 'subjectType', - 'subjectCid', - 'comment', - 'createdAt', - 'action', + 'id', + ...commonColumnsToInsert, 'createLabelVals', 'negateLabelVals', - 'createdBy', 'durationInHours', 'expiresAt', - 'meta', - 'legacyRefId', ]) .expression((eb) => eb @@ -86,41 +88,49 @@ const createEvents = async (db: PrimaryDatabase) => { .selectFrom('moderation_action') // @ts-ignore .select([ - ...commonColumns, + 'id', + ...commonColumnsToSelect, sql`CONCAT('com.atproto.admin.defs#modEvent', UPPER(SUBSTRING(SPLIT_PART(action, '#', 2) FROM 1 FOR 1)), SUBSTRING(SPLIT_PART(action, '#', 2) FROM 2))`.as( 'action', ), + 'createdBy', 'createLabelVals', 'negateLabelVals', - 'createdBy', 'durationInHours', 'expiresAt', - sql`NULL`.as('meta'), - sql`id`.as('legacyRefId'), ]) - .unionAll( - eb - // @ts-ignore - .selectFrom('moderation_report') - // @ts-ignore - .select([ - ...commonColumns, - sql`'com.atproto.admin.defs#modEventReport'`.as('action'), - sql`NULL`.as('createLabelVals'), - sql`NULL`.as('negateLabelVals'), - sql`"reportedByDid"`.as('createdBy'), - sql`NULL`.as('durationInHours'), - sql`NULL`.as('expiresAt'), - sql`json_build_object('reportType', "reasonType")`.as('meta'), - sql`id`.as('legacyRefId'), - ]), - ) - .orderBy('createdAt', 'asc'), + .orderBy('id', 'asc'), ) + .execute() + + const totalActions = await countEvents(db) + console.log(`Created ${totalActions} events from actions`) + + await sql`SELECT setval(pg_get_serial_sequence('moderation_event', 'id'), (select max(id) from moderation_event))`.execute( + db.db, + ) + console.log('Reset the id sequence for moderation_event') + + await db.db + .insertInto('moderation_event') + .columns([...commonColumnsToInsert, 'meta', 'legacyRefId']) + .expression((eb) => + eb + // @ts-ignore + .selectFrom('moderation_report') + // @ts-ignore + .select([ + ...commonColumnsToSelect, + sql`'com.atproto.admin.defs#modEventReport'`.as('action'), + sql`"reportedByDid"`.as('createdBy'), + sql`json_build_object('reportType', "reasonType")`.as('meta'), + sql`id`.as('legacyRefId'), + ]), + ) + .execute() - await insertQuery.execute() const totalEvents = await countEvents(db) - console.log(`Created ${totalEvents} events`) + console.log(`Created ${totalEvents - totalActions} events from reports`) return } @@ -179,8 +189,9 @@ const createStatusFromActions = async (db: PrimaryDatabase) => { console.log(`Processing ${allEvents.count} actions in ${totalChunks} chunks`) await db.transaction(async (tx) => { + // This is not used for pagination but only for logging purposes let currentChunk = 1 - let lastProcessedId = 0 + let lastProcessedId: undefined | number = 0 do { const eventsQuery = tx.db // @ts-ignore @@ -198,7 +209,7 @@ const createStatusFromActions = async (db: PrimaryDatabase) => { const actionParts = event.action.split('#') await adjustModerationSubjectStatus(tx, { ...event, - action: `${actionParts[0]}#modEvent${actionParts[1] + action: `com.atproto.admin.defs#modEvent${actionParts[1] .charAt(0) .toUpperCase()}${actionParts[1].slice( 1, @@ -209,9 +220,9 @@ const createStatusFromActions = async (db: PrimaryDatabase) => { } console.log(`Processed events chunk ${currentChunk} of ${totalChunks}`) - lastProcessedId = events.at(-1)?.id ?? 0 + lastProcessedId = events.at(-1)?.id currentChunk++ - } while (currentChunk < totalChunks) + } while (lastProcessedId !== undefined) }) console.log(`Events migration complete!`) @@ -248,7 +259,7 @@ const syncBlobCids = async (db: PrimaryDatabase) => { console.log(`Updated blob cids on ${results.numUpdatedOrDeletedRows} rows`) } -async function main() { +export async function MigrateModerationData() { const env = getEnv() const db = new DatabaseCoordinator({ schema: env.DB_SCHEMA, @@ -276,5 +287,3 @@ async function main() { console.log(`Time spent: ${(Date.now() - startedAt) / 1000 / 60} minutes`) console.log('Migration complete!') } - -main() diff --git a/packages/bsky/src/services/moderation/index.ts b/packages/bsky/src/services/moderation/index.ts index 7787981faa8..114698480a0 100644 --- a/packages/bsky/src/services/moderation/index.ts +++ b/packages/bsky/src/services/moderation/index.ts @@ -30,8 +30,8 @@ import { SubjectInfo, } from './types' import { ModerationEvent } from '../../db/tables/moderation' -import { Cursor, GenericKeyset, paginate } from '../../db/pagination' -import { DynamicModule, sql } from 'kysely' +import { paginate } from '../../db/pagination' +import { StatusKeyset, TimeIdKeyset } from './pagination' export class ModerationService { constructor( @@ -72,7 +72,7 @@ export class ModerationService { includeAllUserRecords: boolean types: ModerationEvent['action'][] sortDirection?: 'asc' | 'desc' - }): Promise { + }): Promise<{ cursor?: string; events: ModerationEventRowWithHandle[] }> { const { subject, createdBy, @@ -126,26 +126,29 @@ export class ModerationService { if (createdBy) { builder = builder.where('createdBy', '=', createdBy) } - if (cursor) { - const cursorNumeric = parseInt(cursor, 10) - if (isNaN(cursorNumeric)) { - throw new InvalidRequestError('Malformed cursor') - } - builder = builder.where( - 'id', - sortDirection === 'asc' ? '>' : '<', - cursorNumeric, - ) - } - return await builder + + const { ref } = this.db.db.dynamic + const keyset = new TimeIdKeyset( + ref(`moderation_event.createdAt`), + ref('moderation_event.id'), + ) + const paginatedBuilder = paginate(builder, { + limit, + cursor, + keyset, + direction: sortDirection, + tryIndex: true, + }) + + const result = await paginatedBuilder .selectAll(['moderation_event']) .select([ 'subjectActor.handle as subjectHandle', 'creatorActor.handle as creatorHandle', ]) - .orderBy('id', sortDirection) - .limit(limit) .execute() + + return { cursor: keyset.packFromResult(result), events: result } } async getReport(id: number): Promise { @@ -609,7 +612,7 @@ export class ModerationService { } const { ref } = this.db.db.dynamic - const keyset = new ListKeyset( + const keyset = new StatusKeyset( ref(`moderation_subject_status.${sortField}`), ref('moderation_subject_status.id'), ) @@ -651,66 +654,3 @@ export type TakedownSubjects = { did: string subjects: (RepoRef | RepoBlobRef | StrongRef)[] } - -type KeysetParam = { - lastReviewedAt: string | null - lastReportedAt: string | null - id: number -} - -export class ListKeyset extends GenericKeyset { - labelResult(result: KeysetParam): Cursor - labelResult(result: KeysetParam) { - const primaryField = ( - this.primary as ReturnType - ).dynamicReference.includes('lastReviewedAt') - ? 'lastReviewedAt' - : 'lastReportedAt' - - return { - primary: result[primaryField] - ? new Date(`${result[primaryField]}`).getTime().toString() - : '', - secondary: result.id.toString(), - } - } - labeledResultToCursor(labeled: Cursor) { - return { - primary: labeled.primary, - secondary: labeled.secondary, - } - } - cursorToLabeledResult(cursor: Cursor) { - return { - primary: cursor.primary - ? new Date(parseInt(cursor.primary, 10)).toISOString() - : '', - secondary: cursor.secondary, - } - } - unpackCursor(cursorStr?: string): Cursor | undefined { - if (!cursorStr) return - const result = cursorStr.split('::') - const [primary, secondary, ...others] = result - if (!secondary || others.length > 0) { - throw new InvalidRequestError('Malformed cursor') - } - return { - primary, - secondary, - } - } - // This is specifically built to handle nullable columns as primary sorting column - getSql(labeled?: Cursor, direction?: 'asc' | 'desc') { - if (labeled === undefined) return - if (direction === 'asc') { - return !labeled.primary - ? sql`(${this.primary} IS NULL AND ${this.secondary} > ${labeled.secondary})` - : sql`((${this.primary}, ${this.secondary}) > (${labeled.primary}, ${labeled.secondary}) OR (${this.primary} is null))` - } else { - return !labeled.primary - ? sql`(${this.primary} IS NULL AND ${this.secondary} < ${labeled.secondary})` - : sql`((${this.primary}, ${this.secondary}) < (${labeled.primary}, ${labeled.secondary}) OR (${this.primary} is null))` - } - } -} diff --git a/packages/bsky/src/services/moderation/pagination.ts b/packages/bsky/src/services/moderation/pagination.ts new file mode 100644 index 00000000000..c68de0822d4 --- /dev/null +++ b/packages/bsky/src/services/moderation/pagination.ts @@ -0,0 +1,96 @@ +import { InvalidRequestError } from '@atproto/xrpc-server' +import { DynamicModule, sql } from 'kysely' + +import { Cursor, GenericKeyset } from '../../db/pagination' + +type StatusKeysetParam = { + lastReviewedAt: string | null + lastReportedAt: string | null + id: number +} + +export class StatusKeyset extends GenericKeyset { + labelResult(result: StatusKeysetParam): Cursor + labelResult(result: StatusKeysetParam) { + const primaryField = ( + this.primary as ReturnType + ).dynamicReference.includes('lastReviewedAt') + ? 'lastReviewedAt' + : 'lastReportedAt' + + return { + primary: result[primaryField] + ? new Date(`${result[primaryField]}`).getTime().toString() + : '', + secondary: result.id.toString(), + } + } + labeledResultToCursor(labeled: Cursor) { + return { + primary: labeled.primary, + secondary: labeled.secondary, + } + } + cursorToLabeledResult(cursor: Cursor) { + return { + primary: cursor.primary + ? new Date(parseInt(cursor.primary, 10)).toISOString() + : '', + secondary: cursor.secondary, + } + } + unpackCursor(cursorStr?: string): Cursor | undefined { + if (!cursorStr) return + const result = cursorStr.split('::') + const [primary, secondary, ...others] = result + if (!secondary || others.length > 0) { + throw new InvalidRequestError('Malformed cursor') + } + return { + primary, + secondary, + } + } + // This is specifically built to handle nullable columns as primary sorting column + getSql(labeled?: Cursor, direction?: 'asc' | 'desc') { + if (labeled === undefined) return + if (direction === 'asc') { + return !labeled.primary + ? sql`(${this.primary} IS NULL AND ${this.secondary} > ${labeled.secondary})` + : sql`((${this.primary}, ${this.secondary}) > (${labeled.primary}, ${labeled.secondary}) OR (${this.primary} is null))` + } else { + return !labeled.primary + ? sql`(${this.primary} IS NULL AND ${this.secondary} < ${labeled.secondary})` + : sql`((${this.primary}, ${this.secondary}) < (${labeled.primary}, ${labeled.secondary}) OR (${this.primary} is null))` + } + } +} + +type TimeIdKeysetParam = { + id: number + createdAt: string +} +type TimeIdResult = TimeIdKeysetParam + +export class TimeIdKeyset extends GenericKeyset { + labelResult(result: TimeIdResult): Cursor + labelResult(result: TimeIdResult) { + return { primary: result.createdAt, secondary: result.id.toString() } + } + labeledResultToCursor(labeled: Cursor) { + return { + primary: new Date(labeled.primary).getTime().toString(), + secondary: labeled.secondary, + } + } + cursorToLabeledResult(cursor: Cursor) { + const primaryDate = new Date(parseInt(cursor.primary, 10)) + if (isNaN(primaryDate.getTime())) { + throw new InvalidRequestError('Malformed cursor') + } + return { + primary: primaryDate.toISOString(), + secondary: cursor.secondary, + } + } +} diff --git a/packages/bsky/tests/admin/moderation-events.test.ts b/packages/bsky/tests/admin/moderation-events.test.ts index b5669d7aa0a..174167034db 100644 --- a/packages/bsky/tests/admin/moderation-events.test.ts +++ b/packages/bsky/tests/admin/moderation-events.test.ts @@ -1,5 +1,5 @@ import { TestNetwork, SeedClient } from '@atproto/dev-env' -import AtpAgent from '@atproto/api' +import AtpAgent, { ComAtprotoAdminDefs } from '@atproto/api' import { forSnapshot } from '../_util' import basicSeed from '../seeds/basic' import { @@ -163,6 +163,45 @@ describe('moderation-events', () => { forAccount.data.events.map(({ id }) => id).sort(), ) }) + + it('returns paginated list of events with cursor', async () => { + const allEvents = await queryModerationEvents({ + subject: sc.dids.bob, + includeAllUserRecords: true, + }) + + const getPaginatedEvents = async ( + sortDirection: 'asc' | 'desc' = 'desc', + ) => { + let defaultCursor: undefined | string = undefined + const events: ComAtprotoAdminDefs.ModEventView[] = [] + let count = 0 + do { + // get 1 event at a time and check we get all events + const { data } = await queryModerationEvents({ + limit: 1, + subject: sc.dids.bob, + includeAllUserRecords: true, + cursor: defaultCursor, + sortDirection, + }) + events.push(...data.events) + defaultCursor = data.cursor + count++ + // The count is a circuit breaker to prevent infinite loop in case of failing test + } while (defaultCursor && count < 10) + + return events + } + + const defaultEvents = await getPaginatedEvents() + const reversedEvents = await getPaginatedEvents('asc') + + expect(allEvents.data.events.length).toEqual(4) + expect(defaultEvents.length).toEqual(allEvents.data.events.length) + expect(reversedEvents.length).toEqual(allEvents.data.events.length) + expect(reversedEvents[0].id).toEqual(defaultEvents[3].id) + }) }) describe('get event', () => { diff --git a/packages/pds/tests/proxied/__snapshots__/admin.test.ts.snap b/packages/pds/tests/proxied/__snapshots__/admin.test.ts.snap index d548f573004..15c63498ac1 100644 --- a/packages/pds/tests/proxied/__snapshots__/admin.test.ts.snap +++ b/packages/pds/tests/proxied/__snapshots__/admin.test.ts.snap @@ -27,58 +27,55 @@ Array [ `; exports[`proxies admin requests fetches a list of events. 1`] = ` -Object { - "cursor": "2", - "events": Array [ - Object { - "createdAt": "1970-01-01T00:00:00.000Z", - "createdBy": "did:example:admin", - "event": Object { - "$type": "com.atproto.admin.defs#modEventAcknowledge", - }, - "id": 5, - "subject": Object { - "$type": "com.atproto.admin.defs#repoRef", - "did": "user(0)", - }, - "subjectBlobCids": Array [], - "subjectHandle": "bob.test", +Array [ + Object { + "createdAt": "1970-01-01T00:00:00.000Z", + "createdBy": "did:example:admin", + "event": Object { + "$type": "com.atproto.admin.defs#modEventAcknowledge", }, - Object { - "createdAt": "1970-01-01T00:00:00.000Z", - "createdBy": "user(1)", - "creatorHandle": "carol.test", - "event": Object { - "$type": "com.atproto.admin.defs#modEventReport", - "comment": "impersonation", - "reportType": "com.atproto.moderation.defs#reasonOther", - }, - "id": 3, - "subject": Object { - "$type": "com.atproto.admin.defs#repoRef", - "did": "user(0)", - }, - "subjectBlobCids": Array [], - "subjectHandle": "bob.test", + "id": 5, + "subject": Object { + "$type": "com.atproto.admin.defs#repoRef", + "did": "user(0)", }, - Object { - "createdAt": "1970-01-01T00:00:00.000Z", - "createdBy": "user(2)", - "creatorHandle": "alice.test", - "event": Object { - "$type": "com.atproto.admin.defs#modEventReport", - "reportType": "com.atproto.moderation.defs#reasonSpam", - }, - "id": 2, - "subject": Object { - "$type": "com.atproto.admin.defs#repoRef", - "did": "user(0)", - }, - "subjectBlobCids": Array [], - "subjectHandle": "bob.test", + "subjectBlobCids": Array [], + "subjectHandle": "bob.test", + }, + Object { + "createdAt": "1970-01-01T00:00:00.000Z", + "createdBy": "user(1)", + "creatorHandle": "carol.test", + "event": Object { + "$type": "com.atproto.admin.defs#modEventReport", + "comment": "impersonation", + "reportType": "com.atproto.moderation.defs#reasonOther", }, - ], -} + "id": 3, + "subject": Object { + "$type": "com.atproto.admin.defs#repoRef", + "did": "user(0)", + }, + "subjectBlobCids": Array [], + "subjectHandle": "bob.test", + }, + Object { + "createdAt": "1970-01-01T00:00:00.000Z", + "createdBy": "user(2)", + "creatorHandle": "alice.test", + "event": Object { + "$type": "com.atproto.admin.defs#modEventReport", + "reportType": "com.atproto.moderation.defs#reasonSpam", + }, + "id": 2, + "subject": Object { + "$type": "com.atproto.admin.defs#repoRef", + "did": "user(0)", + }, + "subjectBlobCids": Array [], + "subjectHandle": "bob.test", + }, +] `; exports[`proxies admin requests fetches event details. 1`] = ` @@ -135,26 +132,23 @@ Object { `; exports[`proxies admin requests fetches moderation events. 1`] = ` -Object { - "cursor": "4", - "events": Array [ - Object { - "createdAt": "1970-01-01T00:00:00.000Z", - "createdBy": "did:example:admin", - "event": Object { - "$type": "com.atproto.admin.defs#modEventAcknowledge", - }, - "id": 4, - "subject": Object { - "$type": "com.atproto.repo.strongRef", - "cid": "cids(0)", - "uri": "record(0)", - }, - "subjectBlobCids": Array [], - "subjectHandle": "bob.test", +Array [ + Object { + "createdAt": "1970-01-01T00:00:00.000Z", + "createdBy": "did:example:admin", + "event": Object { + "$type": "com.atproto.admin.defs#modEventAcknowledge", }, - ], -} + "id": 4, + "subject": Object { + "$type": "com.atproto.repo.strongRef", + "cid": "cids(0)", + "uri": "record(0)", + }, + "subjectBlobCids": Array [], + "subjectHandle": "bob.test", + }, +] `; exports[`proxies admin requests fetches record details. 1`] = ` diff --git a/packages/pds/tests/proxied/admin.test.ts b/packages/pds/tests/proxied/admin.test.ts index fae7e740459..a51ec048c2d 100644 --- a/packages/pds/tests/proxied/admin.test.ts +++ b/packages/pds/tests/proxied/admin.test.ts @@ -145,7 +145,7 @@ describe('proxies admin requests', () => { }, { headers: network.pds.adminAuthHeaders() }, ) - expect(forSnapshot(result)).toMatchSnapshot() + expect(forSnapshot(result.events)).toMatchSnapshot() }) it('fetches repo details.', async () => { const { data: result } = await agent.api.com.atproto.admin.getRepo( @@ -179,7 +179,7 @@ describe('proxies admin requests', () => { { subject: sc.dids.bob }, { headers: network.pds.adminAuthHeaders() }, ) - expect(forSnapshot(result)).toMatchSnapshot() + expect(forSnapshot(result.events)).toMatchSnapshot() }) it('searches repos.', async () => {