From 3446b7ea0684bd642b4cc044d6a45b0393c733ce Mon Sep 17 00:00:00 2001 From: dholms Date: Mon, 20 Nov 2023 16:23:02 -0600 Subject: [PATCH] repair imports script --- packages/pds/src/migrate-script/db.ts | 16 +++++ .../pds/src/migrate-script/migrate-all.ts | 66 +------------------ packages/pds/src/migrate-script/push-repo.ts | 0 .../pds/src/migrate-script/repair-imports.ts | 33 ++++++++++ packages/pds/src/migrate-script/util.ts | 64 ++++++++++++++++++ 5 files changed, 115 insertions(+), 64 deletions(-) create mode 100644 packages/pds/src/migrate-script/push-repo.ts create mode 100644 packages/pds/src/migrate-script/repair-imports.ts diff --git a/packages/pds/src/migrate-script/db.ts b/packages/pds/src/migrate-script/db.ts index 867b772ba0a..205bc508028 100644 --- a/packages/pds/src/migrate-script/db.ts +++ b/packages/pds/src/migrate-script/db.ts @@ -69,6 +69,16 @@ export const dbMigrationProvider: MigrationProvider = { }, async down() {}, }, + '3': { + async up(db: Kysely) { + await db.schema + .createTable('failed_import') + .addColumn('did', 'varchar', (col) => col.primaryKey()) + .addColumn('err', 'varchar') + .execute() + }, + async down() {}, + }, } }, } @@ -80,6 +90,7 @@ type Schema = { failed_pref: FailedPreference failed_blob: FailedBlob failed_takedown: FailedTakedown + failed_import: FailedImport } export enum TransferPhase { @@ -121,3 +132,8 @@ export type FailedTakedown = { recordCid?: string err: string | null } + +export type FailedImport = { + did: string + err: string | null +} diff --git a/packages/pds/src/migrate-script/migrate-all.ts b/packages/pds/src/migrate-script/migrate-all.ts index e14b173a85e..821cd8cef0f 100644 --- a/packages/pds/src/migrate-script/migrate-all.ts +++ b/packages/pds/src/migrate-script/migrate-all.ts @@ -1,6 +1,4 @@ import assert from 'node:assert' -import https from 'node:https' -import axios from 'axios' import * as plcLib from '@did-plc/lib' import SqlRepoStorage from '../sql-repo-storage' import { createDeferrable } from '@atproto/common' @@ -11,8 +9,10 @@ import { AdminHeaders, PdsInfo, checkBorked, + doImport, getPds, getUserAccount, + httpClient, repairBlob, repairFailedPrefs, retryOnce, @@ -21,11 +21,6 @@ import { transferTakedowns, } from './util' -const httpClient = axios.create({ - timeout: 0, //optional - httpsAgent: new https.Agent({ keepAlive: true }), -}) - export const runScript = async () => { console.log('starting') const { db, ctx, adminHeaders, pdsInfos } = await setupEnv() @@ -182,55 +177,6 @@ const reserveSigningKey = async ( return signingKeyRes.data.signingKey } -const doImport = async ( - ctx: AppContext, - db: MigrateDb, - pds: PdsInfo, - did: string, - adminHeaders: AdminHeaders, - since?: string, -) => { - const revRes = await ctx.db.db - .selectFrom('ipld_block') - .select('repoRev') - .where('creator', '=', did) - .orderBy('repoRev', 'desc') - .limit(1) - .executeTakeFirst() - const repoRev = revRes?.repoRev - if (since && repoRev === since) { - return - } - const storage = new SqlRepoStorage(ctx.db, did) - const carStream = await storage.getCarStream(since) - - const importRes = await httpClient.post( - `${pds.url}/xrpc/com.atproto.temp.importRepo`, - carStream, - { - params: { did }, - headers: { 'content-type': 'application/vnd.ipld.car', ...adminHeaders }, - decompress: true, - responseType: 'stream', - maxBodyLength: Infinity, - maxContentLength: Infinity, - }, - ) - - let logOutput = '' - for await (const log of importRes.data) { - logOutput += log.toString() - } - const lines = logOutput.split('\n') - for (const line of lines) { - if (line.includes('failed to import blob')) { - const cid = line.split(':')[1].trim() - await logFailedBlob(db, did, cid) - } - } - return repoRev -} - const lockAndTransfer = async ( ctx: AppContext, db: MigrateDb, @@ -333,14 +279,6 @@ const updatePdsOnEntryway = async ( }) } -const logFailedBlob = async (db: MigrateDb, did: string, cid: string) => { - await db - .insertInto('failed_blob') - .values({ did, cid }) - .onConflict((oc) => oc.doNothing()) - .execute() -} - const repairFailedBlobs = async ( ctx: AppContext, db: MigrateDb, diff --git a/packages/pds/src/migrate-script/push-repo.ts b/packages/pds/src/migrate-script/push-repo.ts new file mode 100644 index 00000000000..e69de29bb2d diff --git a/packages/pds/src/migrate-script/repair-imports.ts b/packages/pds/src/migrate-script/repair-imports.ts new file mode 100644 index 00000000000..99a49e24117 --- /dev/null +++ b/packages/pds/src/migrate-script/repair-imports.ts @@ -0,0 +1,33 @@ +import PQueue from 'p-queue' +import { doImport, getPds, setupEnv } from './util' + +export const runScript = async () => { + const { db, ctx, adminHeaders, pdsInfos } = await setupEnv() + const failed = await db + .selectFrom('failed_import') + .innerJoin('status', 'status.did', 'failed_import.did') + .select(['status.did', 'status.pdsId']) + .execute() + let count = 0 + const importQueue = new PQueue({ concurrency: 1 }) + for (const account of failed) { + importQueue.add(async () => { + const pdsInfo = getPds(pdsInfos, account.pdsId ?? -1) + try { + await doImport(ctx, db, pdsInfo, account.did, adminHeaders) + await db + .deleteFrom('failed_import') + .where('did', '=', account.did) + .execute() + } catch (err) { + console.log(err) + } + count++ + console.log(`${count}/${failed.length}`) + }) + } + await importQueue.onIdle() + console.log('DONE WITH ALL') +} + +runScript() diff --git a/packages/pds/src/migrate-script/util.ts b/packages/pds/src/migrate-script/util.ts index 9005bef9196..a7530e4510f 100644 --- a/packages/pds/src/migrate-script/util.ts +++ b/packages/pds/src/migrate-script/util.ts @@ -1,4 +1,5 @@ import fs from 'fs/promises' +import https from 'node:https' import dotenv from 'dotenv' import axios from 'axios' import * as ui8 from 'uint8arrays' @@ -7,6 +8,12 @@ import AppContext from '../context' import { FailedTakedown, MigrateDb, getDb } from './db' import { CID } from 'multiformats/cid' import { ServerSecrets, envToCfg, envToSecrets, readEnv } from '../config' +import SqlRepoStorage from '../sql-repo-storage' + +export const httpClient = axios.create({ + timeout: 0, //optional + httpsAgent: new https.Agent({ keepAlive: true }), +}) export type PdsInfo = { id: number @@ -63,6 +70,63 @@ export const retryOnce = async (fn: () => Promise) => { } } +export const doImport = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + did: string, + adminHeaders: AdminHeaders, + since?: string, +) => { + const revRes = await ctx.db.db + .selectFrom('ipld_block') + .select('repoRev') + .where('creator', '=', did) + .orderBy('repoRev', 'desc') + .limit(1) + .executeTakeFirst() + const repoRev = revRes?.repoRev + if (since && repoRev === since) { + return + } + const storage = new SqlRepoStorage(ctx.db, did) + const carStream = await storage.getCarStream(since) + + const importRes = await httpClient.post( + `${pds.url}/xrpc/com.atproto.temp.importRepo`, + carStream, + { + params: { did }, + headers: { 'content-type': 'application/vnd.ipld.car', ...adminHeaders }, + decompress: true, + responseType: 'stream', + maxBodyLength: Infinity, + maxContentLength: Infinity, + }, + ) + + let logOutput = '' + for await (const log of importRes.data) { + logOutput += log.toString() + } + const lines = logOutput.split('\n') + for (const line of lines) { + if (line.includes('failed to import blob')) { + const cid = line.split(':')[1].trim() + await logFailedBlob(db, did, cid) + } + } + return repoRev +} + +const logFailedBlob = async (db: MigrateDb, did: string, cid: string) => { + await db + .insertInto('failed_blob') + .values({ did, cid }) + .onConflict((oc) => oc.doNothing()) + .execute() +} + export const repairFailedPrefs = async ( ctx: AppContext, db: MigrateDb,