Skip to content

Commit

Permalink
repair takedowns script
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Nov 9, 2023
1 parent 41e7e10 commit 86ca769
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 136 deletions.
138 changes: 3 additions & 135 deletions packages/pds/src/migrate-script/migrate-all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import * as plcLib from '@did-plc/lib'
import SqlRepoStorage from '../sql-repo-storage'
import { createDeferrable } from '@atproto/common'
import AppContext from '../context'
import { FailedTakedown, MigrateDb, Status, TransferPhase } from './db'
import { MigrateDb, Status, TransferPhase } from './db'
import PQueue from 'p-queue'
import {
AdminHeaders,
PdsInfo,
getPds,
getUserAccount,
repairBlob,
repairFailedPrefs,
retryOnce,
setupEnv,
transferPreferences,
transferTakedowns,
} from './util'

export const runScript = async () => {
Expand Down Expand Up @@ -322,127 +324,6 @@ const logFailedBlob = async (db: MigrateDb, did: string, cid: string) => {
.execute()
}

const logFailedTakedown = async (db: MigrateDb, takedown: FailedTakedown) => {
await db
.insertInto('failed_takedown')
.values(takedown)
.onConflict((oc) => oc.doNothing())
.execute()
}

const transferTakedowns = async (
ctx: AppContext,
db: MigrateDb,
pds: PdsInfo,
did: string,
adminHeaders: AdminHeaders,
) => {
const [accountRes, takendownRecords, takendownBlobs] = await Promise.all([
getUserAccount(ctx, did),
ctx.db.db
.selectFrom('record')
.selectAll()
.where('did', '=', did)
.where('takedownRef', 'is not', null)
.execute(),
ctx.db.db
.selectFrom('repo_blob')
.selectAll()
.where('did', '=', did)
.where('takedownRef', 'is not', null)
.execute(),
])
const promises: Promise<unknown>[] = []
if (accountRes.takedownRef) {
const promise = pds.agent.com.atproto.admin
.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.admin.defs#repoRef',
did,
},
takedown: {
applied: true,
ref: accountRes.takedownRef,
},
},
{
headers: adminHeaders,
encoding: 'application/json',
},
)
.catch(async (err) => {
await logFailedTakedown(db, { did, err: err?.message })
})
promises.push(promise)
}

for (const takendownRecord of takendownRecords) {
if (!takendownRecord.takedownRef) continue
const promise = pds.agent.com.atproto.admin
.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.repo.strongRef',
uri: takendownRecord.uri,
cid: takendownRecord.cid,
},
takedown: {
applied: true,
ref: takendownRecord.takedownRef,
},
},
{
headers: adminHeaders,
encoding: 'application/json',
},
)
.catch(async (err) => {
await logFailedTakedown(db, {
did,
recordUri: takendownRecord.uri,
recordCid: takendownRecord.cid,
err: err?.message,
})
})
promises.push(promise)
}

for (const takendownBlob of takendownBlobs) {
if (!takendownBlob.takedownRef) continue
const promise = pds.agent.com.atproto.admin
.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.admin.defs#repoBlobRef',
did,
cid: takendownBlob.cid,
recordUri: takendownBlob.recordUri,
},
takedown: {
applied: true,
ref: takendownBlob.takedownRef,
},
},
{
headers: adminHeaders,
encoding: 'application/json',
},
)
.catch(async (err) => {
await logFailedTakedown(db, {
did,
blobCid: takendownBlob.cid,
err: err?.message,
})
})

promises.push(promise)
}

await Promise.all(promises)
}

const repairFailedBlobs = async (
ctx: AppContext,
db: MigrateDb,
Expand All @@ -464,17 +345,4 @@ const repairFailedBlobs = async (
}
}

const getUserAccount = async (ctx: AppContext, did: string) => {
const accountRes = await ctx.db.db
.selectFrom('did_handle')
.innerJoin('user_account', 'user_account.did', 'did_handle.did')
.selectAll()
.where('did_handle.did', '=', did)
.executeTakeFirst()
if (!accountRes) {
throw new Error(`could not find account: ${did}`)
}
return accountRes
}

runScript()
29 changes: 29 additions & 0 deletions packages/pds/src/migrate-script/repair-takedowns.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { getPds, setupEnv, transferTakedowns } from './util'

export const runScript = async () => {
const { db, ctx, pdsInfos, adminHeaders } = await setupEnv()
const failed = await db
.selectFrom('failed_takedown')
.innerJoin('status', 'status.did', 'failed_takedown.did')
.groupBy('failed_takedown.did')
.select(['failed_takedown.did', 'status.pdsId'])
.execute()
let count = 0
for (const takedown of failed) {
const pdsInfo = getPds(pdsInfos, takedown.pdsId ?? -1)
try {
await transferTakedowns(ctx, db, pdsInfo, takedown.did, adminHeaders)
await db
.deleteFrom('failed_takedown')
.where('did', '=', takedown.did)
.execute()
} catch (err) {
console.log(err)
}
count++
console.log(`${count}/${failed.length}`)
}
console.log('DONE WITH ALL')
}

runScript()
136 changes: 135 additions & 1 deletion packages/pds/src/migrate-script/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import axios from 'axios'
import * as ui8 from 'uint8arrays'
import AtpAgent from '@atproto/api'
import AppContext from '../context'
import { MigrateDb, getDb } from './db'
import { FailedTakedown, MigrateDb, getDb } from './db'
import { CID } from 'multiformats/cid'
import { ServerSecrets, envToCfg, envToSecrets, readEnv } from '../config'

Expand Down Expand Up @@ -164,3 +164,137 @@ export const repairBlobInternal = async (
responseType: 'stream',
})
}

export const getUserAccount = async (ctx: AppContext, did: string) => {
const accountRes = await ctx.db.db
.selectFrom('did_handle')
.innerJoin('user_account', 'user_account.did', 'did_handle.did')
.selectAll()
.where('did_handle.did', '=', did)
.executeTakeFirst()
if (!accountRes) {
throw new Error(`could not find account: ${did}`)
}
return accountRes
}

export const transferTakedowns = async (
ctx: AppContext,
db: MigrateDb,
pds: PdsInfo,
did: string,
adminHeaders: AdminHeaders,
) => {
const [accountRes, takendownRecords, takendownBlobs] = await Promise.all([
getUserAccount(ctx, did),
ctx.db.db
.selectFrom('record')
.selectAll()
.where('did', '=', did)
.where('takedownRef', 'is not', null)
.execute(),
ctx.db.db
.selectFrom('repo_blob')
.selectAll()
.where('did', '=', did)
.where('takedownRef', 'is not', null)
.execute(),
])
const promises: Promise<unknown>[] = []
if (accountRes.takedownRef) {
const promise = pds.agent.com.atproto.admin
.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.admin.defs#repoRef',
did,
},
takedown: {
applied: true,
ref: accountRes.takedownRef,
},
},
{
headers: adminHeaders,
encoding: 'application/json',
},
)
.catch(async (err) => {
await logFailedTakedown(db, { did, err: err?.message })
})
promises.push(promise)
}

for (const takendownRecord of takendownRecords) {
if (!takendownRecord.takedownRef) continue
const promise = pds.agent.com.atproto.admin
.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.repo.strongRef',
uri: takendownRecord.uri,
cid: takendownRecord.cid,
},
takedown: {
applied: true,
ref: takendownRecord.takedownRef,
},
},
{
headers: adminHeaders,
encoding: 'application/json',
},
)
.catch(async (err) => {
await logFailedTakedown(db, {
did,
recordUri: takendownRecord.uri,
recordCid: takendownRecord.cid,
err: err?.message,
})
})
promises.push(promise)
}

for (const takendownBlob of takendownBlobs) {
if (!takendownBlob.takedownRef) continue
const promise = pds.agent.com.atproto.admin
.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.admin.defs#repoBlobRef',
did,
cid: takendownBlob.cid,
recordUri: takendownBlob.recordUri,
},
takedown: {
applied: true,
ref: takendownBlob.takedownRef,
},
},
{
headers: adminHeaders,
encoding: 'application/json',
},
)
.catch(async (err) => {
await logFailedTakedown(db, {
did,
blobCid: takendownBlob.cid,
err: err?.message,
})
})

promises.push(promise)
}

await Promise.all(promises)
}

const logFailedTakedown = async (db: MigrateDb, takedown: FailedTakedown) => {
await db
.insertInto('failed_takedown')
.values(takedown)
.onConflict((oc) => oc.doNothing())
.execute()
}

0 comments on commit 86ca769

Please sign in to comment.