diff --git a/.github/workflows/build-and-push-pds-ghcr.yaml b/.github/workflows/build-and-push-pds-ghcr.yaml index b11230ab531..7c3a7affd2f 100644 --- a/.github/workflows/build-and-push-pds-ghcr.yaml +++ b/.github/workflows/build-and-push-pds-ghcr.yaml @@ -3,6 +3,7 @@ on: push: branches: - main + - pds-v2-migrate-script env: REGISTRY: ghcr.io USERNAME: ${{ github.actor }} diff --git a/packages/pds/package.json b/packages/pds/package.json index 57c90869c1e..a859805eb58 100644 --- a/packages/pds/package.json +++ b/packages/pds/package.json @@ -48,6 +48,7 @@ "compression": "^1.7.4", "cors": "^2.8.5", "disposable-email": "^0.2.3", + "dotenv": "^16.0.3", "express": "^4.17.2", "express-async-errors": "^3.1.1", "file-type": "^16.5.4", diff --git a/packages/pds/src/index.ts b/packages/pds/src/index.ts index 42544eba492..2747f21628d 100644 --- a/packages/pds/src/index.ts +++ b/packages/pds/src/index.ts @@ -31,6 +31,7 @@ export { Database } from './db' export { DiskBlobStore, MemoryBlobStore } from './storage' export { AppContext } from './context' export { httpLogger } from './logger' +export { runScript } from './migrate-script' export class PDS { public ctx: AppContext diff --git a/packages/pds/src/migrate-script/check-failures.ts b/packages/pds/src/migrate-script/check-failures.ts new file mode 100644 index 00000000000..266410a1886 --- /dev/null +++ b/packages/pds/src/migrate-script/check-failures.ts @@ -0,0 +1,63 @@ +import { countAll } from '../db/util' +import { MigrateDb, getDb } from './db' + +const run = async () => { + const db = getDb() + const results = await Promise.all([ + totalCount(db), + failureCount(db), + failedBlobs(db), + failedPrefs(db), + failedTakedowns(db), + ]) + console.log(` +Total migrations: ${results[0]} +Failed migrations: ${results[1]} +Failed blobs: ${results[2]} +Failed prefs: ${results[3]} +Failed takedowns: ${results[4]} +`) +} + +const totalCount = async (db: MigrateDb) => { + const res = await db + .selectFrom('status') + .select(countAll.as('count')) + .executeTakeFirst() + return res?.count +} + +const failureCount = async (db: MigrateDb) => { + const res = await db + .selectFrom('status') + .select(countAll.as('count')) + .where('failed', '=', 1) + .executeTakeFirst() + return res?.count +} + +const failedBlobs = async (db: MigrateDb) => { + const res = await db + .selectFrom('failed_blob') + .select(countAll.as('count')) + .executeTakeFirst() + return res?.count +} + +const failedPrefs = async (db: MigrateDb) => { + const res = await db + .selectFrom('failed_pref') + .select(countAll.as('count')) + .executeTakeFirst() + return res?.count +} + +const failedTakedowns = async (db: MigrateDb) => { + const res = await db + .selectFrom('failed_takedown') + .select(countAll.as('count')) + .executeTakeFirst() + return res?.count +} + +run() diff --git a/packages/pds/src/migrate-script/db.ts b/packages/pds/src/migrate-script/db.ts new file mode 100644 index 00000000000..205bc508028 --- /dev/null +++ b/packages/pds/src/migrate-script/db.ts @@ -0,0 +1,139 @@ +import { Kysely, MigrationProvider, SqliteDialect } from 'kysely' +import SqliteDB from 'better-sqlite3' + +const LOCATION = 'migrate.db' + +export const getDb = (): MigrateDb => { + const sqliteDb = new SqliteDB(LOCATION) + sqliteDb.pragma('journal_mode = WAL') + sqliteDb.pragma('busy_timeout = 5000') + return new Kysely({ + dialect: new SqliteDialect({ + database: sqliteDb, + }), + }) +} + +export const dbMigrationProvider: MigrationProvider = { + async getMigrations() { + return { + '1': { + async up(db: Kysely) { + await db.schema + .createTable('status') + .addColumn('did', 'varchar', (col) => col.primaryKey()) + .addColumn('pdsId', 'integer') + .addColumn('signingKey', 'varchar') + .addColumn('phase', 'integer', (col) => col.notNull().defaultTo(0)) + .addColumn('importedRev', 'varchar') + .addColumn('failed', 'integer', (col) => col.notNull().defaultTo(0)) + .execute() + await db.schema + .createTable('failed_pref') + .addColumn('did', 'varchar', (col) => col.primaryKey()) + .execute() + await db.schema + .createTable('failed_blob') + .addColumn('did', 'varchar', (col) => col.notNull()) + .addColumn('cid', 'varchar', (col) => col.notNull()) + .addPrimaryKeyConstraint('failed_blob_pkey', ['did', 'cid']) + .execute() + await db.schema + .createTable('failed_takedown') + .addColumn('did', 'varchar', (col) => col.notNull()) + .addColumn('recordUri', 'varchar') + .addColumn('recordCid', 'varchar') + .addColumn('blobCid', 'varchar') + .execute() + }, + async down() {}, + }, + '2': { + async up(db: Kysely) { + await db.schema + .alterTable('status') + .addColumn('err', 'varchar') + .execute() + await db.schema + .alterTable('failed_pref') + .addColumn('err', 'varchar') + .execute() + await db.schema + .alterTable('failed_blob') + .addColumn('err', 'varchar') + .execute() + await db.schema + .alterTable('failed_takedown') + .addColumn('err', 'varchar') + .execute() + }, + async down() {}, + }, + '3': { + async up(db: Kysely) { + await db.schema + .createTable('failed_import') + .addColumn('did', 'varchar', (col) => col.primaryKey()) + .addColumn('err', 'varchar') + .execute() + }, + async down() {}, + }, + } + }, +} + +export type MigrateDb = Kysely + +type Schema = { + status: Status + failed_pref: FailedPreference + failed_blob: FailedBlob + failed_takedown: FailedTakedown + failed_import: FailedImport +} + +export enum TransferPhase { + notStarted = 0, + reservedKey = 1, + initImport = 2, + transferredPds = 3, + transferredEntryway = 4, + preferences = 5, + takedowns = 6, + completed = 7, +} + +export type Status = { + did: string + pdsId: number | null + signingKey: string | null + phase: TransferPhase + importedRev: string | null + failed: 0 | 1 + err: string | null +} + +export type FailedPreference = { + did: string + err: string | null +} + +export type FailedBlob = { + did: string + cid: string + err: string | null +} + +export type FailedTakedown = { + did: string + blobCid?: string + recordUri?: string + recordCid?: string + err: string | null +} + +export type FailedImport = { + did: string + err: string | null +} diff --git a/packages/pds/src/migrate-script/load-dids.ts b/packages/pds/src/migrate-script/load-dids.ts new file mode 100644 index 00000000000..e72bff96655 --- /dev/null +++ b/packages/pds/src/migrate-script/load-dids.ts @@ -0,0 +1,35 @@ +import { chunkArray } from '@atproto/common' +import { setupEnv } from './util' + +const run = async () => { + const amount = parseInt(process.argv[2]) + const pdsId = parseInt(process.argv[3]) + console.log(`loading next ${amount} dids`) + const { db, ctx } = await setupEnv() + + const didsRes = await ctx.db.db + .selectFrom('user_account') + .select('did') + .where('pdsId', 'is', null) + .orderBy('did', 'asc') + .limit(amount) + .execute() + const dids = didsRes.map((row) => ({ + did: row.did, + phase: 0, + pdsId, + failed: 0 as const, + })) + + await Promise.all( + chunkArray(dids, 50).map((chunk) => + db + .insertInto('status') + .values(chunk) + .onConflict((oc) => oc.doNothing()) + .execute(), + ), + ) +} + +run() diff --git a/packages/pds/src/migrate-script/load-failed-blobs.ts b/packages/pds/src/migrate-script/load-failed-blobs.ts new file mode 100644 index 00000000000..654248a88f2 --- /dev/null +++ b/packages/pds/src/migrate-script/load-failed-blobs.ts @@ -0,0 +1,31 @@ +import fs from 'fs/promises' +import { chunkArray } from '@atproto/common' +import { getDb } from './db' + +const run = async () => { + const file = await fs.readFile('missing-blobs.txt') + const rows = file + .toString() + .split('\n') + .filter((row) => row.length > 5) + .map((row) => { + const [did, cid] = row.split(' ') + return { + did: did.trim(), + cid: cid.trim(), + } + }) + const db = getDb() + + await Promise.all( + chunkArray(rows, 500).map((chunk) => + db + .insertInto('failed_blob') + .values(chunk) + .onConflict((oc) => oc.doNothing()) + .execute(), + ), + ) +} + +run() diff --git a/packages/pds/src/migrate-script/migrate-all.ts b/packages/pds/src/migrate-script/migrate-all.ts new file mode 100644 index 00000000000..35fb1b8758d --- /dev/null +++ b/packages/pds/src/migrate-script/migrate-all.ts @@ -0,0 +1,298 @@ +import assert from 'node:assert' +import * as plcLib from '@did-plc/lib' +import SqlRepoStorage from '../sql-repo-storage' +import { createDeferrable } from '@atproto/common' +import AppContext from '../context' +import { MigrateDb, Status, TransferPhase } from './db' +import PQueue from 'p-queue' +import { + AdminHeaders, + PdsInfo, + checkBorked, + doImport, + getPds, + getUserAccount, + httpClient, + repairBlob, + repairFailedPrefs, + setupEnv, + transferPreferences, + transferTakedowns, +} from './util' + +export const runScript = async () => { + console.log('starting') + const { db, ctx, adminHeaders, pdsInfos } = await setupEnv() + + const pdsIds = process.argv[2].split(',').map((id) => parseInt(id)) + + const todo = await db + .selectFrom('status') + .where('status.phase', '<', 7) + .where('pdsId', 'in', pdsIds) + .orderBy('phase', 'desc') + .orderBy('did') + .selectAll() + .execute() + + let pdsCounter = 0 + let completed = 0 + let failed = 0 + + console.log('migrating: ', todo.length) + + const migrateQueue = new PQueue({ concurrency: 80 }) + process.on('SIGINT', async () => { + migrateQueue.clear() + console.log(`waiting on ${migrateQueue.pending} to finish`) + await migrateQueue.onIdle() + process.exit(0) + }) + + for (const status of todo) { + if (!status.pdsId) { + status.pdsId = pdsInfos[pdsCounter % pdsInfos.length].id + pdsCounter++ + } + const pdsInfo = getPds(pdsInfos, status.pdsId) + migrateQueue.add(async () => { + try { + await migrateRepo(ctx, db, pdsInfo, status, adminHeaders) + await db + .updateTable('status') + .set({ failed: 0, err: null }) + .where('did', '=', status.did) + .execute() + completed++ + await repairFailedPrefs(ctx, db, pdsInfo, status.did).catch(() => + console.log('failed to repair prefs: ', status.did), + ) + repairFailedBlobs(ctx, db, pdsInfo, status.did, adminHeaders).catch( + () => console.log('failed to repair blobs: ', status.did), + ) + } catch (err) { + // @ts-ignore + const errmsg: string = err?.message ?? null + console.log(err) + await db + .updateTable('status') + .set({ failed: 1, err: errmsg }) + .where('did', '=', status.did) + .execute() + failed++ + + // check if the did is caught in a bad state where migration failed but plc got updated + await checkBorked(ctx, status.did) + } + console.log(`completed: ${completed}, failed: ${failed}`) + }) + } + await migrateQueue.onIdle() + console.log('DONE WITH ALL') +} + +const migrateRepo = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + status: Status, + adminHeaders: AdminHeaders, +) => { + if (status.phase < TransferPhase.reservedKey) { + const signingKey = await reserveSigningKey(pds, status.did) + status.signingKey = signingKey + status.phase = TransferPhase.reservedKey + await updateStatus(db, status) + } + + if (status.phase < TransferPhase.initImport) { + const importedRev = await doImport(ctx, db, pds, status.did, adminHeaders) + if (importedRev) { + status.importedRev = importedRev + } + status.phase = TransferPhase.initImport + await updateStatus(db, status) + } + + if (status.phase < TransferPhase.transferredPds) { + const importedRev = await lockAndTransfer( + ctx, + db, + pds, + status, + adminHeaders, + ) + status.importedRev = importedRev + status.phase = TransferPhase.transferredPds + await updateStatus(db, status) + } + + if (status.phase < TransferPhase.transferredEntryway) { + await updatePdsOnEntryway(ctx, pds, status.did) + status.phase = TransferPhase.transferredEntryway + await updateStatus(db, status) + } + + if (status.phase < TransferPhase.preferences) { + try { + await transferPreferences(ctx, pds, status.did) + } catch (err) { + await db + .insertInto('failed_pref') + .values({ did: status.did }) + .onConflict((oc) => oc.doNothing()) + .execute() + } finally { + status.phase = TransferPhase.preferences + await updateStatus(db, status) + } + } + + if (status.phase < TransferPhase.takedowns) { + await transferTakedowns(ctx, db, pds, status.did, adminHeaders) + status.phase = TransferPhase.completed + await updateStatus(db, status) + } +} + +const updateStatus = async (db: MigrateDb, status: Status) => { + return db + .updateTable('status') + .set({ ...status }) + .where('did', '=', status.did) + .execute() +} + +const reserveSigningKey = async ( + pds: PdsInfo, + did: string, +): Promise => { + const signingKeyRes = + await pds.agent.api.com.atproto.server.reserveSigningKey({ did }) + return signingKeyRes.data.signingKey +} + +const lockAndTransfer = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + status: Status, + adminHeaders: AdminHeaders, +) => { + const repoLockedDefer = createDeferrable() + const transferDefer = createDeferrable() + let txFinished = false + ctx.db + .transaction(async (dbTxn) => { + const storage = new SqlRepoStorage(dbTxn, status.did) + await storage.lockRepo() + repoLockedDefer.resolve() + await transferDefer.complete + }) + .catch((err) => { + console.error(`error in repo lock tx for did: ${status.did}`, err) + txFinished = true + }) + + await repoLockedDefer.complete + + let importedRev + try { + importedRev = await doImport( + ctx, + db, + pds, + status.did, + adminHeaders, + status.importedRev ?? undefined, + ) + + let plcOp + if (status.did.startsWith('did:web')) { + plcOp = {} + } else { + const lastOp = await ctx.plcClient.getLastOp(status.did) + if (!lastOp || lastOp.type === 'plc_tombstone') { + throw new Error('could not find last plc op') + } + plcOp = await plcLib.createUpdateOp( + lastOp, + ctx.plcRotationKey, + (normalized) => { + if (!status.signingKey) { + throw new Error('no reserved signing key') + } + return { + ...normalized, + verificationMethods: { + atproto: status.signingKey, + }, + services: { + atproto_pds: { + type: 'AtprotoPersonalDataServer', + endpoint: pds.url, + }, + }, + } + }, + ) + } + assert(!txFinished) + const accountRes = await getUserAccount(ctx, status.did) + await httpClient.post( + `${pds.url}/xrpc/com.atproto.temp.transferAccount`, + { + did: status.did, + handle: accountRes.handle, + plcOp, + }, + { headers: adminHeaders }, + ) + + return importedRev + } finally { + transferDefer.resolve() + } +} + +const updatePdsOnEntryway = async ( + ctx: AppContext, + pds: PdsInfo, + did: string, +) => { + await ctx.db.transaction(async (dbTxn) => { + await dbTxn.db + .updateTable('user_account') + .where('did', '=', did) + .set({ pdsId: pds.id }) + .execute() + await dbTxn.db + .updateTable('repo_root') + .where('did', '=', did) + .set({ did: `migrated-${did}` }) + .execute() + }) +} + +const repairFailedBlobs = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + did: string, + adminHeaders: AdminHeaders, +) => { + const failedBlobs = await db + .selectFrom('failed_blob') + .where('did', '=', did) + .selectAll() + .execute() + for (const blob of failedBlobs) { + try { + await repairBlob(ctx, db, pds, did, blob.cid, adminHeaders) + } catch { + console.log(`failed blob: ${did} ${blob.cid}`) + } + } +} + +runScript() diff --git a/packages/pds/src/migrate-script/push-repo.ts b/packages/pds/src/migrate-script/push-repo.ts new file mode 100644 index 00000000000..e69de29bb2d diff --git a/packages/pds/src/migrate-script/repair-blobs.ts b/packages/pds/src/migrate-script/repair-blobs.ts new file mode 100644 index 00000000000..e8043f0263e --- /dev/null +++ b/packages/pds/src/migrate-script/repair-blobs.ts @@ -0,0 +1,43 @@ +import PQueue from 'p-queue' +import { getPds, repairBlob, setupEnv } from './util' + +type FailedBlob = { + did: string + cid: string + pdsId: number +} + +export const runScript = async () => { + const { db, ctx, adminHeaders, pdsInfos } = await setupEnv() + const failed = await db + .selectFrom('failed_blob') + .innerJoin('status', 'status.did', 'failed_blob.did') + .selectAll() + .execute() + let count = 0 + const failedByDid = failed.reduce((acc, cur) => { + acc[cur.did] ??= [] + acc[cur.did].push({ did: cur.did, cid: cur.cid, pdsId: cur.pdsId ?? -1 }) + return acc + }, {} as Record) + const blobQueue = new PQueue({ concurrency: 40 }) + for (const did of Object.keys(failedByDid)) { + const failedBlobs = failedByDid[did] ?? [] + blobQueue.add(async () => { + for (const blob of failedBlobs) { + const pdsInfo = getPds(pdsInfos, blob.pdsId ?? -1) + try { + await repairBlob(ctx, db, pdsInfo, blob.did, blob.cid, adminHeaders) + } catch (err) { + console.log(err) + } + count++ + console.log(`${count}/${failed.length}`) + } + }) + } + await blobQueue.onIdle() + console.log('DONE WITH ALL') +} + +runScript() diff --git a/packages/pds/src/migrate-script/repair-imports.ts b/packages/pds/src/migrate-script/repair-imports.ts new file mode 100644 index 00000000000..99a49e24117 --- /dev/null +++ b/packages/pds/src/migrate-script/repair-imports.ts @@ -0,0 +1,33 @@ +import PQueue from 'p-queue' +import { doImport, getPds, setupEnv } from './util' + +export const runScript = async () => { + const { db, ctx, adminHeaders, pdsInfos } = await setupEnv() + const failed = await db + .selectFrom('failed_import') + .innerJoin('status', 'status.did', 'failed_import.did') + .select(['status.did', 'status.pdsId']) + .execute() + let count = 0 + const importQueue = new PQueue({ concurrency: 1 }) + for (const account of failed) { + importQueue.add(async () => { + const pdsInfo = getPds(pdsInfos, account.pdsId ?? -1) + try { + await doImport(ctx, db, pdsInfo, account.did, adminHeaders) + await db + .deleteFrom('failed_import') + .where('did', '=', account.did) + .execute() + } catch (err) { + console.log(err) + } + count++ + console.log(`${count}/${failed.length}`) + }) + } + await importQueue.onIdle() + console.log('DONE WITH ALL') +} + +runScript() diff --git a/packages/pds/src/migrate-script/repair-prefs.ts b/packages/pds/src/migrate-script/repair-prefs.ts new file mode 100644 index 00000000000..78c86b81ab9 --- /dev/null +++ b/packages/pds/src/migrate-script/repair-prefs.ts @@ -0,0 +1,24 @@ +import { getPds, repairPrefs, setupEnv } from './util' + +export const runScript = async () => { + const { db, ctx, pdsInfos } = await setupEnv() + const failed = await db + .selectFrom('failed_pref') + .innerJoin('status', 'status.did', 'failed_pref.did') + .selectAll() + .execute() + let count = 0 + for (const pref of failed) { + const pdsInfo = getPds(pdsInfos, pref.pdsId ?? -1) + try { + await repairPrefs(ctx, db, pdsInfo, pref.did) + } catch (err) { + console.log(err) + } + count++ + console.log(`${count}/${failed.length}`) + } + console.log('DONE WITH ALL') +} + +runScript() diff --git a/packages/pds/src/migrate-script/repair-takedowns.ts b/packages/pds/src/migrate-script/repair-takedowns.ts new file mode 100644 index 00000000000..2322a4953ed --- /dev/null +++ b/packages/pds/src/migrate-script/repair-takedowns.ts @@ -0,0 +1,29 @@ +import { getPds, setupEnv, transferTakedowns } from './util' + +export const runScript = async () => { + const { db, ctx, pdsInfos, adminHeaders } = await setupEnv() + const failed = await db + .selectFrom('failed_takedown') + .innerJoin('status', 'status.did', 'failed_takedown.did') + .groupBy('failed_takedown.did') + .select(['failed_takedown.did', 'status.pdsId']) + .execute() + let count = 0 + for (const takedown of failed) { + const pdsInfo = getPds(pdsInfos, takedown.pdsId ?? -1) + try { + await transferTakedowns(ctx, db, pdsInfo, takedown.did, adminHeaders) + await db + .deleteFrom('failed_takedown') + .where('did', '=', takedown.did) + .execute() + } catch (err) { + console.log(err) + } + count++ + console.log(`${count}/${failed.length}`) + } + console.log('DONE WITH ALL') +} + +runScript() diff --git a/packages/pds/src/migrate-script/setup-db.ts b/packages/pds/src/migrate-script/setup-db.ts new file mode 100644 index 00000000000..798874738fe --- /dev/null +++ b/packages/pds/src/migrate-script/setup-db.ts @@ -0,0 +1,13 @@ +import { Migrator } from 'kysely' +import { dbMigrationProvider, getDb } from './db' + +const run = async () => { + const db = getDb() + + const migrator = new Migrator({ db, provider: dbMigrationProvider }) + const { error } = await migrator.migrateToLatest() + if (error) throw error + return db +} + +run() diff --git a/packages/pds/src/migrate-script/util.ts b/packages/pds/src/migrate-script/util.ts new file mode 100644 index 00000000000..a9b020ca0fe --- /dev/null +++ b/packages/pds/src/migrate-script/util.ts @@ -0,0 +1,370 @@ +import fs from 'fs/promises' +import https from 'node:https' +import dotenv from 'dotenv' +import axios from 'axios' +import * as ui8 from 'uint8arrays' +import AtpAgent from '@atproto/api' +import AppContext from '../context' +import { FailedTakedown, MigrateDb, getDb } from './db' +import { CID } from 'multiformats/cid' +import { ServerSecrets, envToCfg, envToSecrets, readEnv } from '../config' +import SqlRepoStorage from '../sql-repo-storage' + +export const httpClient = axios.create({ + timeout: 0, //optional + httpsAgent: new https.Agent({ keepAlive: true }), +}) + +export type PdsInfo = { + id: number + did: string + url: string + agent: AtpAgent +} + +export type AdminHeaders = { + authorization: string +} + +export const setupEnv = async () => { + dotenv.config() + const db = getDb() + const env = readEnv() + const cfg = envToCfg(env) + const secrets = envToSecrets(env) + const ctx = await AppContext.fromConfig(cfg, secrets) + const adminHeaders = makeAdminHeaders(secrets) + const pdsRes = await ctx.db.db.selectFrom('pds').selectAll().execute() + const pdsInfos = pdsRes.map((row) => ({ + id: row.id, + did: row.did, + url: `https://${row.host}`, + agent: new AtpAgent({ service: `https://${row.host}` }), + })) + return { db, ctx, adminHeaders, pdsInfos } +} + +export const getPds = (infos: PdsInfo[], id: number | null): PdsInfo => { + const pdsInfo = infos.find((info) => info.id === id) + if (!pdsInfo) { + throw new Error(`could not find pds with id: ${id}`) + } + return pdsInfo +} + +export const makeAdminHeaders = (secrets: ServerSecrets): AdminHeaders => { + const adminToken = ui8.toString( + ui8.fromString(`admin:${secrets.adminPassword}`, 'utf8'), + 'base64pad', + ) + return { + authorization: `Basic ${adminToken}`, + } +} + +export const doImport = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + did: string, + adminHeaders: AdminHeaders, + since?: string, +) => { + const revRes = await ctx.db.db + .selectFrom('ipld_block') + .select('repoRev') + .where('creator', '=', did) + .orderBy('repoRev', 'desc') + .limit(1) + .executeTakeFirst() + const repoRev = revRes?.repoRev + if (since && repoRev === since) { + return + } + const storage = new SqlRepoStorage(ctx.db, did) + const carStream = await storage.getCarStream(since) + + const importRes = await httpClient.post( + `${pds.url}/xrpc/com.atproto.temp.importRepo`, + carStream, + { + params: { did }, + headers: { 'content-type': 'application/vnd.ipld.car', ...adminHeaders }, + decompress: true, + responseType: 'stream', + maxBodyLength: Infinity, + maxContentLength: Infinity, + }, + ) + + let logOutput = '' + for await (const log of importRes.data) { + logOutput += log.toString() + console.log(`${did}: ${log.toString()}`) + } + const lines = logOutput.split('\n') + for (const line of lines) { + if (line.includes('failed to import blob')) { + const cid = line.split(':')[1].trim() + await logFailedBlob(db, did, cid) + } + } + return repoRev +} + +const logFailedBlob = async (db: MigrateDb, did: string, cid: string) => { + await db + .insertInto('failed_blob') + .values({ did, cid }) + .onConflict((oc) => oc.doNothing()) + .execute() +} + +export const repairFailedPrefs = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + did: string, +) => { + const hasFailure = await db + .selectFrom('failed_pref') + .selectAll() + .where('did', '=', did) + .executeTakeFirst() + if (hasFailure) { + await repairPrefs(ctx, db, pds, did) + } +} + +export const repairPrefs = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + did: string, +) => { + const hasFailure = await db + .selectFrom('failed_pref') + .selectAll() + .where('did', '=', did) + .executeTakeFirst() + if (!hasFailure) { + return + } + await transferPreferences(ctx, pds, did) + await db.deleteFrom('failed_pref').where('did', '=', did).execute() +} + +export const transferPreferences = async ( + ctx: AppContext, + pds: PdsInfo, + did: string, +) => { + const accessToken = await ctx.services + .auth(ctx.db) + .createAccessToken({ did: did, pdsDid: pds.did }) + + const prefs = await ctx.services.account(ctx.db).getPreferences(did) + await pds.agent.api.app.bsky.actor.putPreferences( + { preferences: prefs }, + { + headers: { authorization: `Bearer ${accessToken}` }, + encoding: 'application/json', + }, + ) +} + +export const repairBlob = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + did: string, + cid: string, + adminHeaders: AdminHeaders, +) => { + await repairBlobInternal(ctx, pds, did, cid, adminHeaders) + await db + .deleteFrom('failed_blob') + .where('did', '=', did) + .where('cid', '=', cid) + .execute() +} + +export const repairBlobInternal = async ( + ctx: AppContext, + pds: PdsInfo, + did: string, + cid: string, + adminHeaders: AdminHeaders, +) => { + const blob = await ctx.db.db + .selectFrom('blob') + .where('cid', '=', cid) + .where('creator', '=', did) + .selectAll() + .executeTakeFirst() + if (!blob) return + let blobStream + try { + blobStream = await ctx.blobstore.getStream(CID.parse(blob.cid)) + } catch (err) { + if (err?.['Code'] === 'NoSuchKey') { + return + } + throw err + } + await axios.post(`${pds.url}/xrpc/com.atproto.temp.pushBlob`, blobStream, { + params: { did }, + headers: { + 'content-type': blob.mimeType, + ...adminHeaders, + }, + decompress: true, + responseType: 'stream', + }) +} + +export const getUserAccount = async (ctx: AppContext, did: string) => { + const accountRes = await ctx.db.db + .selectFrom('did_handle') + .innerJoin('user_account', 'user_account.did', 'did_handle.did') + .selectAll() + .where('did_handle.did', '=', did) + .executeTakeFirst() + if (!accountRes) { + throw new Error(`could not find account: ${did}`) + } + return accountRes +} + +export const transferTakedowns = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + did: string, + adminHeaders: AdminHeaders, +) => { + const [accountRes, takendownRecords, takendownBlobs] = await Promise.all([ + getUserAccount(ctx, did), + ctx.db.db + .selectFrom('record') + .selectAll() + .where('did', '=', did) + .where('takedownRef', 'is not', null) + .execute(), + ctx.db.db + .selectFrom('repo_blob') + .selectAll() + .where('did', '=', did) + .where('takedownRef', 'is not', null) + .execute(), + ]) + const promises: Promise[] = [] + if (accountRes.takedownRef) { + const promise = pds.agent.com.atproto.admin + .updateSubjectStatus( + { + subject: { + $type: 'com.atproto.admin.defs#repoRef', + did, + }, + takedown: { + applied: true, + ref: accountRes.takedownRef, + }, + }, + { + headers: adminHeaders, + encoding: 'application/json', + }, + ) + .catch(async (err) => { + await logFailedTakedown(db, { did, err: err?.message }) + }) + promises.push(promise) + } + + for (const takendownRecord of takendownRecords) { + if (!takendownRecord.takedownRef) continue + const promise = pds.agent.com.atproto.admin + .updateSubjectStatus( + { + subject: { + $type: 'com.atproto.repo.strongRef', + uri: takendownRecord.uri, + cid: takendownRecord.cid, + }, + takedown: { + applied: true, + ref: takendownRecord.takedownRef, + }, + }, + { + headers: adminHeaders, + encoding: 'application/json', + }, + ) + .catch(async (err) => { + await logFailedTakedown(db, { + did, + recordUri: takendownRecord.uri, + recordCid: takendownRecord.cid, + err: err?.message, + }) + }) + promises.push(promise) + } + + for (const takendownBlob of takendownBlobs) { + if (!takendownBlob.takedownRef) continue + const promise = pds.agent.com.atproto.admin + .updateSubjectStatus( + { + subject: { + $type: 'com.atproto.admin.defs#repoBlobRef', + did, + cid: takendownBlob.cid, + recordUri: takendownBlob.recordUri, + }, + takedown: { + applied: true, + ref: takendownBlob.takedownRef, + }, + }, + { + headers: adminHeaders, + encoding: 'application/json', + }, + ) + .catch(async (err) => { + await logFailedTakedown(db, { + did, + blobCid: takendownBlob.cid, + err: err?.message, + }) + }) + + promises.push(promise) + } + + await Promise.all(promises) +} + +const logFailedTakedown = async (db: MigrateDb, takedown: FailedTakedown) => { + await db + .insertInto('failed_takedown') + .values(takedown) + .onConflict((oc) => oc.doNothing()) + .execute() +} + +export const checkBorked = async (ctx: AppContext, did: string) => { + try { + const data = await ctx.plcClient.getDocumentData(did) + const endpoint = data.services['atproto_pds'].endpoint + if (endpoint !== 'https://bsky.social') { + await fs.appendFile('borked_dids.tx', `${did}\n`) + } + } catch { + // noop + } +} diff --git a/services/pds/index.js b/services/pds/index.js index 112d63edf90..e3f3749e4a6 100644 --- a/services/pds/index.js +++ b/services/pds/index.js @@ -1,73 +1,9 @@ 'use strict' /* eslint-disable */ -require('dd-trace') // Only works with commonjs - .init({ logInjection: true }) - .tracer.use('express', { - hooks: { - request: (span, req) => { - maintainXrpcResource(span, req) - }, - }, - }) - -// Tracer code above must come before anything else -const path = require('path') -const { - PDS, - envToCfg, - envToSecrets, - readEnv, - httpLogger, - PeriodicModerationActionReversal, -} = require('@atproto/pds') -const pkg = require('@atproto/pds/package.json') +const { runScript } = require('@atproto/pds') const main = async () => { - const env = readEnv() - env.version ??= pkg.version - const cfg = envToCfg(env) - const secrets = envToSecrets(env) - const pds = await PDS.create(cfg, secrets) - - // If the PDS is configured to proxy moderation, this will be running on appview instead of pds. - // Also don't run this on the sequencer leader, which may not be configured regarding moderation proxying at all. - const periodicModerationActionReversal = - pds.ctx.cfg.bskyAppView.proxyModeration || - pds.ctx.cfg.sequencerLeaderEnabled - ? null - : new PeriodicModerationActionReversal(pds.ctx) - const periodicModerationActionReversalRunning = - periodicModerationActionReversal?.run() - - await pds.start() - - httpLogger.info('pds is running') - // Graceful shutdown (see also https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/) - process.on('SIGTERM', async () => { - httpLogger.info('pds is stopping') - - periodicModerationActionReversal?.destroy() - await periodicModerationActionReversalRunning - - await pds.destroy() - - httpLogger.info('pds is stopped') - }) -} - -const maintainXrpcResource = (span, req) => { - // Show actual xrpc method as resource rather than the route pattern - if (span && req.originalUrl?.startsWith('/xrpc/')) { - span.setTag( - 'resource.name', - [ - req.method, - path.posix.join(req.baseUrl || '', req.path || '', '/').slice(0, -1), // Ensures no trailing slash - ] - .filter(Boolean) - .join(' '), - ) - } + await runScript() } main()