Skip to content

Commit

Permalink
refactor & add repair script
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Mar 7, 2025
1 parent 6741e42 commit 09d1b8d
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 165 deletions.
4 changes: 2 additions & 2 deletions packages/pds/src/scripts/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { publishIdentity, publishIdentityMany } from './publish-identity'
import { rebuildRepo } from './rebuild-repo'
import { sequencerRecovery } from './sequencer-recovery'
import { rotateKeys, rotateKeysMany } from './rotate-keys'
import { publishIdentity, publishIdentityMany } from './publish-identity'
import { sequencerRecovery } from './sequencer-recovery'

export const scripts = {
'rebuild-repo': rebuildRepo,
Expand Down
41 changes: 41 additions & 0 deletions packages/pds/src/scripts/publish-identity.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import fs from 'node:fs/promises'
import { wait } from '@atproto/common'
import { Sequencer } from '../sequencer'
import { getAndMigrateRecoveryDb } from './sequencer-recovery/recovery-db'
import { parseIntArg } from './util'

type Context = {
Expand All @@ -10,6 +11,7 @@ type Context = {
export const publishIdentity = async (ctx: Context, args: string[]) => {
const dids = args
await publishIdentityEvtForDids(ctx, dids)
console.log('DONE')
}

export const publishIdentityMany = async (ctx: Context, args: string[]) => {
Expand All @@ -26,6 +28,7 @@ export const publishIdentityMany = async (ctx: Context, args: string[]) => {
.filter((did) => did.startsWith('did:plc'))

await publishIdentityEvtForDids(ctx, dids, timeBetween)
console.log('DONE')
}

export const publishIdentityEvtForDids = async (
Expand All @@ -44,5 +47,43 @@ export const publishIdentityEvtForDids = async (
await wait(timeBetween)
}
}
}

export const publishIdentityRecovery = async (ctx: Context, args: string[]) => {
const filepath = args[0]
if (!filepath) {
throw new Error('Expected filepath as argument')
}
const timeBetween = args[1] ? parseIntArg(args[1]) : 5

const db = await getAndMigrateRecoveryDb(filepath)
const rows = await db.db
.selectFrom('new_account')
.select('did')
.where('new_account.published', '=', 0)
.execute()
const dids = rows.map((r) => r.did)

let published = 0
for (const did of dids) {
try {
await ctx.sequencer.sequenceIdentityEvt(did)
await db.db
.updateTable('new_account')
.set({ published: 1 })
.where('did', '=', did)
.execute()
console.log(`published identity evt for ${did}`)
} catch (err) {
console.error(`failed to sequence new identity evt for ${did}: ${err}`)
}
if (timeBetween > 0) {
await wait(timeBetween)
}
published++
if (published % 10 === 0) {
console.log(`${published}/${dids.length}`)
}
}
console.log('DONE')
}
51 changes: 39 additions & 12 deletions packages/pds/src/scripts/rebuild-repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,46 @@ import {
MemoryBlockstore,
signCommit,
} from '@atproto/repo'
import { AppContext } from '../context'
import { AccountManager } from '../account-manager'
import { ActorStore } from '../actor-store/actor-store'
import { Sequencer } from '../sequencer'

export const rebuildRepo = async (ctx: AppContext, args: string[]) => {
export interface RebuildContext {
sequencer: Sequencer
accountManager: AccountManager
actorStore: ActorStore
}

export const rebuildRepoScript = async (
ctx: RebuildContext,
args: string[],
) => {
const did = args[0]
if (!did || !did.startsWith('did:')) {
throw new Error('Expected DID as argument')
}
return rebuildRepo(ctx, did, true)
}

export const rebuildRepo = async (
ctx: RebuildContext,
did: string,
promptUser: boolean,
) => {
const memoryStore = new MemoryBlockstore()
const rev = TID.nextStr()
const commit = await ctx.actorStore.transact(did, async (store) => {
const [records, existingCids] = await Promise.all([
const [rootDetails, records, existingCids] = await Promise.all([
store.repo.storage.getRootDetailed(),
store.record.listAll(),
store.record.listExistingBlocks(),
])
// increment existing rev by 1 ms
const revTid = TID.fromStr(rootDetails.rev)
const rev = TID.fromTime(
revTid.timestamp() + 1,
revTid.clockid(),
).toString()

let mst = await MST.create(memoryStore)
for (const record of records) {
mst = await mst.add(record.path, record.cid)
Expand Down Expand Up @@ -50,14 +75,16 @@ export const rebuildRepo = async (ctx: AppContext, args: string[]) => {
)
const commitCid = await newBlocks.add(newCommit)

console.log('Record count: ', records.length)
console.log('Existing blocks: ', existingCids.toList().length)
console.log('Deleting blocks:', toDelete.toList().length)
console.log('Adding blocks: ', newBlocks.size)
if (promptUser) {
console.log('Record count: ', records.length)
console.log('Existing blocks: ', existingCids.toList().length)
console.log('Deleting blocks:', toDelete.toList().length)
console.log('Adding blocks: ', newBlocks.size)

const shouldContinue = await promptContinue()
if (!shouldContinue) {
throw new Error('Aborted')
const shouldContinue = await promptContinue()
if (!shouldContinue) {
throw new Error('Aborted')
}
}

await store.repo.storage.deleteMany(toDelete.toList())
Expand All @@ -76,7 +103,7 @@ export const rebuildRepo = async (ctx: AppContext, args: string[]) => {
prevData: null,
}
})
await ctx.accountManager.updateRepoRoot(did, commit.cid, rev)
await ctx.accountManager.updateRepoRoot(did, commit.cid, commit.rev)
await ctx.sequencer.sequenceCommit(did, commit)
}

Expand Down
Loading

0 comments on commit 09d1b8d

Please sign in to comment.