diff --git a/packages/pds/src/migrate-script/repair-blobs.ts b/packages/pds/src/migrate-script/repair-blobs.ts index 7574a05a6e5..d0bc610d812 100644 --- a/packages/pds/src/migrate-script/repair-blobs.ts +++ b/packages/pds/src/migrate-script/repair-blobs.ts @@ -1,5 +1,12 @@ +import PQueue from 'p-queue' import { getPds, repairBlob, setupEnv } from './util' +type FailedBlob = { + did: string + cid: string + pdsId: number +} + export const runScript = async () => { const { db, ctx, adminHeaders, pdsInfos } = await setupEnv() const failed = await db @@ -8,15 +15,26 @@ export const runScript = async () => { .selectAll() .execute() let count = 0 - for (const blob of failed) { - const pdsInfo = getPds(pdsInfos, blob.pdsId ?? -1) - try { - await repairBlob(ctx, db, pdsInfo, blob.did, blob.cid, adminHeaders) - } catch (err) { - console.log(err) - } - count++ - console.log(`${count}/${failed.length}`) + const failedByDid = failed.reduce((acc, cur) => { + acc[cur.did] ??= [] + acc[cur.did].push({ did: cur.did, cid: cur.cid, pdsId: cur.pdsId ?? -1 }) + return acc + }, {} as Record) + const blobQueue = new PQueue({ concurrency: 20 }) + for (const did of Object.keys(failedByDid)) { + const failedBlobs = failedByDid[did] ?? [] + blobQueue.add(async () => { + for (const blob of failedBlobs) { + const pdsInfo = getPds(pdsInfos, blob.pdsId ?? -1) + try { + await repairBlob(ctx, db, pdsInfo, blob.did, blob.cid, adminHeaders) + } catch (err) { + console.log(err) + } + count++ + console.log(`${count}/${failed.length}`) + } + }) } console.log('DONE WITH ALL') }