Skip to content

Commit

Permalink
concurrent repair blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Nov 9, 2023
1 parent f94accf commit 09e46d9
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions packages/pds/src/migrate-script/repair-blobs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import PQueue from 'p-queue'
import { getPds, repairBlob, setupEnv } from './util'

type FailedBlob = {
did: string
cid: string
pdsId: number
}

export const runScript = async () => {
const { db, ctx, adminHeaders, pdsInfos } = await setupEnv()
const failed = await db
Expand All @@ -8,15 +15,26 @@ export const runScript = async () => {
.selectAll()
.execute()
let count = 0
for (const blob of failed) {
const pdsInfo = getPds(pdsInfos, blob.pdsId ?? -1)
try {
await repairBlob(ctx, db, pdsInfo, blob.did, blob.cid, adminHeaders)
} catch (err) {
console.log(err)
}
count++
console.log(`${count}/${failed.length}`)
const failedByDid = failed.reduce((acc, cur) => {
acc[cur.did] ??= []
acc[cur.did].push({ did: cur.did, cid: cur.cid, pdsId: cur.pdsId ?? -1 })
return acc
}, {} as Record<string, FailedBlob[]>)
const blobQueue = new PQueue({ concurrency: 20 })
for (const did of Object.keys(failedByDid)) {
const failedBlobs = failedByDid[did] ?? []
blobQueue.add(async () => {
for (const blob of failedBlobs) {
const pdsInfo = getPds(pdsInfos, blob.pdsId ?? -1)
try {
await repairBlob(ctx, db, pdsInfo, blob.did, blob.cid, adminHeaders)
} catch (err) {
console.log(err)
}
count++
console.log(`${count}/${failed.length}`)
}
})
}
console.log('DONE WITH ALL')
}
Expand Down

0 comments on commit 09e46d9

Please sign in to comment.