diff --git a/packages/pds/src/migrate-script/migrate-all.ts b/packages/pds/src/migrate-script/migrate-all.ts index 2fc2b82a6c2..e2ccd5c3d37 100644 --- a/packages/pds/src/migrate-script/migrate-all.ts +++ b/packages/pds/src/migrate-script/migrate-all.ts @@ -10,21 +10,11 @@ import { envToCfg, envToSecrets, readEnv } from '../config' import AppContext from '../context' import { FailedTakedown, MigrateDb, Status, TransferPhase, getDb } from './db' import PQueue from 'p-queue' +import { AdminHeaders, PdsInfo } from './util' dotenv.config() - -type PdsInfo = { - id: number - did: string - url: string - agent: AtpAgent -} - -type AdminHeaders = { - authorization: string -} - export const runScript = async () => { + console.log('starting') const db = getDb() const env = readEnv() const cfg = envToCfg(env) @@ -47,7 +37,6 @@ export const runScript = async () => { const todo = await db .selectFrom('status') .where('status.phase', '<', 7) - // .where('failed', '!=', 1) .orderBy('phase', 'desc') .orderBy('did') .selectAll() @@ -55,6 +44,7 @@ export const runScript = async () => { let pdsCounter = 0 let completed = 0 let failed = 0 + console.log('migrating: ', todo.length) const migrateQueue = new PQueue({ concurrency: 40 }) for (const status of todo) { if (!status.pdsId) { diff --git a/packages/pds/src/migrate-script/repair-blobs.ts b/packages/pds/src/migrate-script/repair-blobs.ts index 3c1c258cd1c..68516c883a0 100644 --- a/packages/pds/src/migrate-script/repair-blobs.ts +++ b/packages/pds/src/migrate-script/repair-blobs.ts @@ -1,21 +1,13 @@ import dotenv from 'dotenv' -import axios from 'axios' import * as ui8 from 'uint8arrays' import AtpAgent from '@atproto/api' import { envToCfg, envToSecrets, readEnv } from '../config' import AppContext from '../context' -import { MigrateDb, getDb } from './db' -import { CID } from 'multiformats/cid' +import { getDb } from './db' +import { repairBlob } from './util' dotenv.config() -type PdsInfo = { - id: number - did: string - url: string - agent: AtpAgent -} - export const runScript = async () => { const db = getDb() const env = readEnv() @@ -55,49 +47,4 @@ export const runScript = async () => { console.log('DONE WITH ALL') } -const repairBlob = async ( - ctx: AppContext, - db: MigrateDb, - pds: PdsInfo, - did: string, - cid: string, - adminToken: string, -) => { - const blob = await ctx.db.db - .selectFrom('blob') - .where('cid', '=', cid) - .selectAll() - .executeTakeFirst() - if (!blob) return - let blobStream - try { - blobStream = await ctx.blobstore.getStream(CID.parse(blob.cid)) - } catch (err) { - const hasTakedown = await ctx.db.db - .selectFrom('repo_blob') - .where('did', '=', did) - .where('cid', '=', cid) - .where('takedownRef', 'is not', null) - .executeTakeFirst() - if (hasTakedown) { - return - } - throw err - } - await axios.post(`${pds.url}/xrpc/com.atproto.temp.pushBlob`, blobStream, { - params: { did }, - headers: { - 'content-type': blob.mimeType, - authorization: `Basic ${adminToken}`, - }, - decompress: true, - responseType: 'stream', - }) - await db - .deleteFrom('failed_blob') - .where('did', '=', did) - .where('cid', '=', blob.cid) - .execute() -} - runScript() diff --git a/packages/pds/src/migrate-script/util.ts b/packages/pds/src/migrate-script/util.ts new file mode 100644 index 00000000000..510af62b776 --- /dev/null +++ b/packages/pds/src/migrate-script/util.ts @@ -0,0 +1,62 @@ +import axios from 'axios' +import AtpAgent from '@atproto/api' +import AppContext from '../context' +import { MigrateDb } from './db' +import { CID } from 'multiformats/cid' + +export type PdsInfo = { + id: number + did: string + url: string + agent: AtpAgent +} + +export type AdminHeaders = { + authorization: string +} + +export const repairBlob = async ( + ctx: AppContext, + db: MigrateDb, + pds: PdsInfo, + did: string, + cid: string, + adminToken: string, +) => { + const blob = await ctx.db.db + .selectFrom('blob') + .where('cid', '=', cid) + .where('creator', '=', did) + .selectAll() + .executeTakeFirst() + if (!blob) return + let blobStream + try { + blobStream = await ctx.blobstore.getStream(CID.parse(blob.cid)) + } catch (err) { + const hasTakedown = await ctx.db.db + .selectFrom('repo_blob') + .where('did', '=', did) + .where('cid', '=', cid) + .where('takedownRef', 'is not', null) + .executeTakeFirst() + if (hasTakedown) { + return + } + throw err + } + await axios.post(`${pds.url}/xrpc/com.atproto.temp.pushBlob`, blobStream, { + params: { did }, + headers: { + 'content-type': blob.mimeType, + authorization: `Basic ${adminToken}`, + }, + decompress: true, + responseType: 'stream', + }) + await db + .deleteFrom('failed_blob') + .where('did', '=', did) + .where('cid', '=', blob.cid) + .execute() +}