Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/moderation-api-v2' into multi-pd…
Browse files Browse the repository at this point in the history
…s-auth-moderation-api-v2
  • Loading branch information
devinivy committed Nov 22, 2023
2 parents 53949e8 + 006372f commit cae7f8f
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ export default function (server: Server, ctx: AppContext) {
return {
encoding: 'application/json',
body: {
cursor: results.at(-1)?.id.toString() ?? undefined,
events: await moderationService.views.event(results),
cursor: results.cursor,
events: await moderationService.views.event(results.events),
},
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ export async function up(db: Kysely<unknown>): Promise<void> {
.addUniqueConstraint('moderation_status_unique_idx', ['did', 'recordPath'])
.execute()

await db.schema
.createIndex('moderation_subject_status_blob_cids_idx')
.on('moderation_subject_status')
.using('gin')
.column('blobCids')
.execute()

// Move foreign keys from moderation_action to moderation_event
await db.schema
.alterTable('record')
Expand Down
1 change: 1 addition & 0 deletions packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export { AppContext } from './context'
export { makeAlgos } from './feed-gen'
export * from './indexer'
export * from './ingester'
export { MigrateModerationData } from './migrate-moderation-data'

export class BskyAppView {
public ctx: AppContext
Expand Down
97 changes: 53 additions & 44 deletions packages/bsky/src/migrate-moderation-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const getEnv = () => ({
DB_URL:
process.env.MODERATION_MIGRATION_DB_URL ||
'postgresql://pg:[email protected]:5433/postgres',
DB_POOL_SIZE: Number(process.env.MODERATION_MIGRATION_DB_URL) || 10,
DB_POOL_SIZE: Number(process.env.MODERATION_MIGRATION_DB_POOL_SIZE) || 10,
DB_SCHEMA: process.env.MODERATION_MIGRATION_DB_SCHEMA || 'bsky',
})

Expand Down Expand Up @@ -53,74 +53,84 @@ const countStatuses = async (db: PrimaryDatabase) => {
}

const createEvents = async (db: PrimaryDatabase) => {
const commonColumns = [
const commonColumnsToSelect = [
'subjectDid',
'subjectUri',
'subjectType',
'subjectCid',
sql`reason`.as('comment'),
'createdAt',
]
const commonColumnsToInsert = [
'subjectDid',
'subjectUri',
'subjectType',
'subjectCid',
'comment',
'createdAt',
'action',
'createdBy',
] as const

const insertQuery = db.db
await db.db
.insertInto('moderation_event')
.columns([
'subjectDid',
'subjectUri',
'subjectType',
'subjectCid',
'comment',
'createdAt',
'action',
'id',
...commonColumnsToInsert,
'createLabelVals',
'negateLabelVals',
'createdBy',
'durationInHours',
'expiresAt',
'meta',
'legacyRefId',
])
.expression((eb) =>
eb
// @ts-ignore
.selectFrom('moderation_action')
// @ts-ignore
.select([
...commonColumns,
'id',
...commonColumnsToSelect,
sql`CONCAT('com.atproto.admin.defs#modEvent', UPPER(SUBSTRING(SPLIT_PART(action, '#', 2) FROM 1 FOR 1)), SUBSTRING(SPLIT_PART(action, '#', 2) FROM 2))`.as(
'action',
),
'createdBy',
'createLabelVals',
'negateLabelVals',
'createdBy',
'durationInHours',
'expiresAt',
sql`NULL`.as('meta'),
sql`id`.as('legacyRefId'),
])
.unionAll(
eb
// @ts-ignore
.selectFrom('moderation_report')
// @ts-ignore
.select([
...commonColumns,
sql`'com.atproto.admin.defs#modEventReport'`.as('action'),
sql`NULL`.as('createLabelVals'),
sql`NULL`.as('negateLabelVals'),
sql`"reportedByDid"`.as('createdBy'),
sql`NULL`.as('durationInHours'),
sql`NULL`.as('expiresAt'),
sql`json_build_object('reportType', "reasonType")`.as('meta'),
sql`id`.as('legacyRefId'),
]),
)
.orderBy('createdAt', 'asc'),
.orderBy('id', 'asc'),
)
.execute()

const totalActions = await countEvents(db)
console.log(`Created ${totalActions} events from actions`)

await sql`SELECT setval(pg_get_serial_sequence('moderation_event', 'id'), (select max(id) from moderation_event))`.execute(
db.db,
)
console.log('Reset the id sequence for moderation_event')

await db.db
.insertInto('moderation_event')
.columns([...commonColumnsToInsert, 'meta', 'legacyRefId'])
.expression((eb) =>
eb
// @ts-ignore
.selectFrom('moderation_report')
// @ts-ignore
.select([
...commonColumnsToSelect,
sql`'com.atproto.admin.defs#modEventReport'`.as('action'),
sql`"reportedByDid"`.as('createdBy'),
sql`json_build_object('reportType', "reasonType")`.as('meta'),
sql`id`.as('legacyRefId'),
]),
)
.execute()

await insertQuery.execute()
const totalEvents = await countEvents(db)
console.log(`Created ${totalEvents} events`)
console.log(`Created ${totalEvents - totalActions} events from reports`)

return
}
Expand Down Expand Up @@ -179,8 +189,9 @@ const createStatusFromActions = async (db: PrimaryDatabase) => {
console.log(`Processing ${allEvents.count} actions in ${totalChunks} chunks`)

await db.transaction(async (tx) => {
// This is not used for pagination but only for logging purposes
let currentChunk = 1
let lastProcessedId = 0
let lastProcessedId: undefined | number = 0
do {
const eventsQuery = tx.db
// @ts-ignore
Expand All @@ -198,7 +209,7 @@ const createStatusFromActions = async (db: PrimaryDatabase) => {
const actionParts = event.action.split('#')
await adjustModerationSubjectStatus(tx, {
...event,
action: `${actionParts[0]}#modEvent${actionParts[1]
action: `com.atproto.admin.defs#modEvent${actionParts[1]
.charAt(0)
.toUpperCase()}${actionParts[1].slice(
1,
Expand All @@ -209,9 +220,9 @@ const createStatusFromActions = async (db: PrimaryDatabase) => {
}

console.log(`Processed events chunk ${currentChunk} of ${totalChunks}`)
lastProcessedId = events.at(-1)?.id ?? 0
lastProcessedId = events.at(-1)?.id
currentChunk++
} while (currentChunk < totalChunks)
} while (lastProcessedId !== undefined)
})

console.log(`Events migration complete!`)
Expand Down Expand Up @@ -248,7 +259,7 @@ const syncBlobCids = async (db: PrimaryDatabase) => {
console.log(`Updated blob cids on ${results.numUpdatedOrDeletedRows} rows`)
}

async function main() {
export async function MigrateModerationData() {
const env = getEnv()
const db = new DatabaseCoordinator({
schema: env.DB_SCHEMA,
Expand Down Expand Up @@ -276,5 +287,3 @@ async function main() {
console.log(`Time spent: ${(Date.now() - startedAt) / 1000 / 60} minutes`)
console.log('Migration complete!')
}

main()
102 changes: 21 additions & 81 deletions packages/bsky/src/services/moderation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import {
SubjectInfo,
} from './types'
import { ModerationEvent } from '../../db/tables/moderation'
import { Cursor, GenericKeyset, paginate } from '../../db/pagination'
import { DynamicModule, sql } from 'kysely'
import { paginate } from '../../db/pagination'
import { StatusKeyset, TimeIdKeyset } from './pagination'

export class ModerationService {
constructor(
Expand Down Expand Up @@ -72,7 +72,7 @@ export class ModerationService {
includeAllUserRecords: boolean
types: ModerationEvent['action'][]
sortDirection?: 'asc' | 'desc'
}): Promise<ModerationEventRowWithHandle[]> {
}): Promise<{ cursor?: string; events: ModerationEventRowWithHandle[] }> {
const {
subject,
createdBy,
Expand Down Expand Up @@ -126,26 +126,29 @@ export class ModerationService {
if (createdBy) {
builder = builder.where('createdBy', '=', createdBy)
}
if (cursor) {
const cursorNumeric = parseInt(cursor, 10)
if (isNaN(cursorNumeric)) {
throw new InvalidRequestError('Malformed cursor')
}
builder = builder.where(
'id',
sortDirection === 'asc' ? '>' : '<',
cursorNumeric,
)
}
return await builder

const { ref } = this.db.db.dynamic
const keyset = new TimeIdKeyset(
ref(`moderation_event.createdAt`),
ref('moderation_event.id'),
)
const paginatedBuilder = paginate(builder, {
limit,
cursor,
keyset,
direction: sortDirection,
tryIndex: true,
})

const result = await paginatedBuilder
.selectAll(['moderation_event'])
.select([
'subjectActor.handle as subjectHandle',
'creatorActor.handle as creatorHandle',
])
.orderBy('id', sortDirection)
.limit(limit)
.execute()

return { cursor: keyset.packFromResult(result), events: result }
}

async getReport(id: number): Promise<ModerationEventRow | undefined> {
Expand Down Expand Up @@ -609,7 +612,7 @@ export class ModerationService {
}

const { ref } = this.db.db.dynamic
const keyset = new ListKeyset(
const keyset = new StatusKeyset(
ref(`moderation_subject_status.${sortField}`),
ref('moderation_subject_status.id'),
)
Expand Down Expand Up @@ -651,66 +654,3 @@ export type TakedownSubjects = {
did: string
subjects: (RepoRef | RepoBlobRef | StrongRef)[]
}

type KeysetParam = {
lastReviewedAt: string | null
lastReportedAt: string | null
id: number
}

export class ListKeyset extends GenericKeyset<KeysetParam, Cursor> {
labelResult(result: KeysetParam): Cursor
labelResult(result: KeysetParam) {
const primaryField = (
this.primary as ReturnType<DynamicModule['ref']>
).dynamicReference.includes('lastReviewedAt')
? 'lastReviewedAt'
: 'lastReportedAt'

return {
primary: result[primaryField]
? new Date(`${result[primaryField]}`).getTime().toString()
: '',
secondary: result.id.toString(),
}
}
labeledResultToCursor(labeled: Cursor) {
return {
primary: labeled.primary,
secondary: labeled.secondary,
}
}
cursorToLabeledResult(cursor: Cursor) {
return {
primary: cursor.primary
? new Date(parseInt(cursor.primary, 10)).toISOString()
: '',
secondary: cursor.secondary,
}
}
unpackCursor(cursorStr?: string): Cursor | undefined {
if (!cursorStr) return
const result = cursorStr.split('::')
const [primary, secondary, ...others] = result
if (!secondary || others.length > 0) {
throw new InvalidRequestError('Malformed cursor')
}
return {
primary,
secondary,
}
}
// This is specifically built to handle nullable columns as primary sorting column
getSql(labeled?: Cursor, direction?: 'asc' | 'desc') {
if (labeled === undefined) return
if (direction === 'asc') {
return !labeled.primary
? sql`(${this.primary} IS NULL AND ${this.secondary} > ${labeled.secondary})`
: sql`((${this.primary}, ${this.secondary}) > (${labeled.primary}, ${labeled.secondary}) OR (${this.primary} is null))`
} else {
return !labeled.primary
? sql`(${this.primary} IS NULL AND ${this.secondary} < ${labeled.secondary})`
: sql`((${this.primary}, ${this.secondary}) < (${labeled.primary}, ${labeled.secondary}) OR (${this.primary} is null))`
}
}
}
Loading

0 comments on commit cae7f8f

Please sign in to comment.