Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/moderation-api-v2' into multi-pd…
Browse files Browse the repository at this point in the history
…s-auth-moderation-api-v2
  • Loading branch information
devinivy committed Nov 30, 2023
2 parents 3d17be7 + c75131f commit fd7a499
Showing 1 changed file with 49 additions and 36 deletions.
85 changes: 49 additions & 36 deletions packages/bsky/src/migrate-moderation-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,43 @@ const countStatuses = async (db: PrimaryDatabase) => {
return events.count
}

const processNewReports = async (
const processLegacyReports = async (
db: PrimaryDatabase,
latestReportLegacyRefId: number,
legacyIds: number[],
) => {
const newReports = await db.db
db.assertTransaction()
if (!legacyIds.length) {
console.log('No legacy reports to process')
return
}
const reports = await db.db
.selectFrom('moderation_event')
.where('action', '=', 'com.atproto.admin.defs#modEventReport')
.where('legacyRefId', '>', latestReportLegacyRefId)
.where('legacyRefId', 'in', legacyIds)
.orderBy('legacyRefId', 'asc')
.selectAll()
.execute()

if (!newReports.length) {
console.log('No new reports to process')
return
}
console.log(`Processing ${reports.length} reports from ${legacyIds.length}`)
await db.transaction(async (tx) => {
// This will be slow but we need to run this in sequence
for (const report of reports) {
await adjustModerationSubjectStatus(tx, report)
}
})
console.log(`Completed processing ${reports.length} reports`)
}

console.log(`Processing ${newReports.length} new reports`)
// This will be slow but we need to run this in sequence
for (const newReport of newReports) {
await adjustModerationSubjectStatus(db, newReport)
}
console.log(`Completed processing ${newReports.length} new reports`)
const getReportEventsAboveLegacyId = async (
db: PrimaryDatabase,
aboveLegacyId: number,
) => {
return await db.db
.selectFrom('moderation_event')
.where('action', '=', 'com.atproto.admin.defs#modEventReport')
.where('legacyRefId', '>', aboveLegacyId)
.select(sql<number>`"legacyRefId"`.as('legacyRefId'))
.execute()
}

const createEvents = async (
Expand Down Expand Up @@ -197,7 +211,8 @@ const setReportedAtTimestamp = async (db: PrimaryDatabase) => {
group by "subjectDid", "subjectUri"
) as reports
WHERE reports."subjectDid" = moderation_subject_status."did"
AND "recordPath"=''
AND "recordPath" = ''
AND ("lastReportedAt" is null OR "lastReportedAt" < reports."createdAt")
`.execute(db.db)

console.log(
Expand All @@ -216,6 +231,7 @@ const setReportedAtTimestamp = async (db: PrimaryDatabase) => {
WHERE reports."subjectDid" = moderation_subject_status."did"
AND "recordPath" is not null
AND POSITION(moderation_subject_status."recordPath" IN reports."subjectUri") > 0
AND ("lastReportedAt" is null OR "lastReportedAt" < reports."createdAt")
`.execute(db.db)

console.log(
Expand Down Expand Up @@ -309,9 +325,9 @@ const syncBlobCids = async (db: PrimaryDatabase) => {
console.log(`Updated blob cids on ${results.numUpdatedOrDeletedRows} rows`)
}

export async function migrateUnresolvedReports(db: PrimaryDatabase) {
async function updateStatusFromUnresolvedReports(db: PrimaryDatabase) {
const { ref } = db.db.dynamic
const reports = (await db.db
const reports = await db.db
// @ts-ignore
.selectFrom('moderation_report')
.whereNotExists((qb) =>
Expand All @@ -321,25 +337,15 @@ export async function migrateUnresolvedReports(db: PrimaryDatabase) {
// @ts-ignore
.whereRef('reportId', '=', ref('moderation_report.id')),
)
// @ts-ignore
.select((eb) => eb.fn.min<number>('id').as('firstUnresolvedReportId'))
.executeTakeFirstOrThrow()) as { firstUnresolvedReportId: number }

if (!reports.firstUnresolvedReportId) {
console.log('No unresolved reports to migrate')
return
}
.select(sql<number>`moderation_report.id`.as('legacyId'))
.execute()

console.log(
`Migrating unresolved reports from id ${reports.firstUnresolvedReportId}`,
console.log('Updating statuses based on unresolved reports')
await processLegacyReports(
db,
reports.map((report) => report.legacyId),
)

await createEvents(db, {
onlyReportsAboveId: reports.firstUnresolvedReportId,
})
await processNewReports(db, reports.firstUnresolvedReportId)
await setReportedAtTimestamp(db)
console.log(`Migrated all unresolved reports`)
console.log('Completed updating statuses based on unresolved reports')
}

export async function MigrateModerationData() {
Expand Down Expand Up @@ -373,7 +379,14 @@ export async function MigrateModerationData() {
await createEvents(primaryDb, {
onlyReportsAboveId: latestReportLegacyRefId,
})
await processNewReports(primaryDb, latestReportLegacyRefId)
const newReportEvents = await getReportEventsAboveLegacyId(
primaryDb,
latestReportLegacyRefId,
)
await processLegacyReports(
primaryDb,
newReportEvents.map((evt) => evt.legacyRefId),
)
await setReportedAtTimestamp(primaryDb)
} else {
console.log('No reports have been migrated into events yet, bailing.')
Expand All @@ -393,9 +406,9 @@ export async function MigrateModerationData() {
// Important to run this before creation statuses from actions to ensure that we are not attempting to map flag actions
await remapFlagToAcknlowedge(primaryDb)
await createStatusFromActions(primaryDb)
await updateStatusFromUnresolvedReports(primaryDb)
await setReportedAtTimestamp(primaryDb)
await syncBlobCids(primaryDb)
await migrateUnresolvedReports(primaryDb)

console.log(`Time spent: ${(Date.now() - startedAt) / 1000 / 60} minutes`)
console.log('Migration complete!')
Expand Down

0 comments on commit fd7a499

Please sign in to comment.