Skip to content

Commit

Permalink
script tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Nov 3, 2023
1 parent 5477f20 commit e036de4
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 39 deletions.
1 change: 1 addition & 0 deletions packages/pds/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"compression": "^1.7.4",
"cors": "^2.8.5",
"disposable-email": "^0.2.3",
"dotenv": "^16.0.3",
"express": "^4.17.2",
"express-async-errors": "^3.1.1",
"file-type": "^16.5.4",
Expand Down
25 changes: 13 additions & 12 deletions packages/pds/src/migrate-script/db.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
import { Kysely, MigrationProvider, Migrator, SqliteDialect } from 'kysely'
import { Kysely, MigrationProvider, SqliteDialect } from 'kysely'
import SqliteDB from 'better-sqlite3'

export const getDb = async (loc: string): Promise<MigrateDb> => {
const db = new Kysely<Schema>({
const LOCATION = 'migrate.db'

export const getDb = (): MigrateDb => {
return new Kysely<Schema>({
dialect: new SqliteDialect({
database: new SqliteDB(loc),
database: new SqliteDB(LOCATION),
}),
})
const migrator = new Migrator({ db, provider: dbMigrationProvider })
const { error } = await migrator.migrateToLatest()
if (error) throw error
return db
}

const dbMigrationProvider: MigrationProvider = {
export const dbMigrationProvider: MigrationProvider = {
async getMigrations() {
return {
'1': {
Expand All @@ -23,8 +21,10 @@ const dbMigrationProvider: MigrationProvider = {
.addColumn('did', 'varchar', (col) => col.primaryKey())
.addColumn('pdsId', 'integer')
.addColumn('signingKey', 'varchar')
.addColumn('phase', 'integer')
.addColumn('importedRev', 'varchar')
.addColumn('phase', 'integer', (col) => col.notNull().defaultTo(0))
.addColumn('importedRev', 'varchar', (col) =>
col.notNull().defaultTo(0),
)
.addColumn('failed', 'integer')
.execute()
await db.schema
Expand Down Expand Up @@ -64,7 +64,8 @@ export enum TransferPhase {
notStarted = 0,
reservedKey = 1,
initImport = 2,
transferred = 4,
transferredPds = 3,
transferredEntryway = 4,
preferences = 5,
takedowns = 6,
completed = 7,
Expand Down
22 changes: 22 additions & 0 deletions packages/pds/src/migrate-script/load-dids.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import fs from 'fs/promises'
import { getDb } from './db'

const run = async () => {
const db = getDb()
const didsFile = await fs.readFile('dids.txt')
const dids = didsFile
.toString()
.split('\n')
.map((did) => ({
did: did.trim(),
phase: 0,
failed: 0 as const,
}))
await db
.insertInto('status')
.values(dids)
.onConflict((oc) => oc.doNothing())
.execute()
}

run()
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import assert from 'node:assert'
import dotenv from 'dotenv'
import axios from 'axios'
import * as ui8 from 'uint8arrays'
import AtpAgent from '@atproto/api'
Expand All @@ -8,6 +10,8 @@ import { envToCfg, envToSecrets, readEnv } from '../config'
import AppContext from '../context'
import { FailedTakedown, MigrateDb, Status, TransferPhase, getDb } from './db'

dotenv.config()

type PdsInfo = {
id: number
did: string
Expand All @@ -16,7 +20,7 @@ type PdsInfo = {
}

export const runScript = async () => {
const db = await getDb('/data/migrate.db')
const db = getDb()
const env = readEnv()
const cfg = envToCfg(env)
const secrets = envToSecrets(env)
Expand All @@ -36,16 +40,21 @@ export const runScript = async () => {
.selectFrom('status')
.where('status.phase', '<', 7)
.where('failed', '!=', 1)
.orderBy('phase', 'desc')
.orderBy('did')
.selectAll()
.execute()
let pdsCounter = 0
for (const status of todo) {
let pdsId = status.pdsId
if (!pdsId) {
pdsId = pdsCounter % pdsInfos.length
pdsId = pdsInfos[pdsCounter % pdsInfos.length].id
pdsCounter++
}
const pdsInfo = pdsInfos[pdsId]
const pdsInfo = pdsInfos.find((info) => info.id === pdsId)
if (!pdsInfo) {
throw new Error(`could not find pds with id: ${pdsId}`)
}
try {
await migrateRepo(ctx, db, pdsInfo, status, adminToken)
console.log(`completed migrating: ${status.did}`)
Expand All @@ -57,7 +66,6 @@ export const runScript = async () => {
.execute()
console.error(`failed to migrate: ${status.did}`, err)
}
pdsCounter++
}
console.log('DONE WITH ALL')
}
Expand All @@ -69,15 +77,13 @@ const migrateRepo = async (
status: Status,
adminToken: string,
) => {
console.log(`reserve (${status.did}, ${status.phase})`)
if (status.phase < TransferPhase.reservedKey) {
const signingKey = await reserveSigningKey(pds, status.did)
status.signingKey = signingKey
status.phase = TransferPhase.reservedKey
await updateStatus(db, status)
}

console.log(`import (${status.did}, ${status.phase})`)
if (status.phase < TransferPhase.initImport) {
const importedRev = await doImport(ctx, db, pds, status.did)
if (importedRev) {
Expand All @@ -87,15 +93,13 @@ const migrateRepo = async (
await updateStatus(db, status)
}

console.log(`transfer (${status.did}, ${status.phase})`)
if (status.phase < TransferPhase.transferred) {
if (status.phase < TransferPhase.transferredPds) {
const importedRev = await lockAndTransfer(ctx, db, pds, status)
status.importedRev = importedRev
status.phase = TransferPhase.transferred
status.phase = TransferPhase.transferredEntryway
await updateStatus(db, status)
}

console.log(`prefs (${status.did}, ${status.phase})`)
if (status.phase < TransferPhase.preferences) {
try {
await transferPreferences(ctx, pds, status.did)
Expand All @@ -111,7 +115,6 @@ const migrateRepo = async (
}
}

console.log(`takedowns (${status.did}, ${status.phase})`)
if (status.phase < TransferPhase.takedowns) {
await transferTakedowns(ctx, db, pds, status.did, adminToken)
status.phase = TransferPhase.completed
Expand Down Expand Up @@ -168,7 +171,7 @@ const doImport = async (
if (typeof log === 'string') {
const lines = log.split('\n')
for (const line of lines) {
if (line.indexOf('failed to import blob') > 0) {
if (line.includes('failed to import blob')) {
const cid = line.split(':')[1].trim()
await logFailedBlob(db, did, cid)
}
Expand All @@ -185,17 +188,23 @@ const lockAndTransfer = async (
pds: PdsInfo,
status: Status,
) => {
const defer = createDeferrable()
const repoLockedDefer = createDeferrable()
const transferDefer = createDeferrable()
let txFinished = false
ctx.db
.transaction(async (dbTxn) => {
const storage = new SqlRepoStorage(dbTxn, status.did)
await storage.lockRepo()
await defer.complete
repoLockedDefer.resolve()
await transferDefer.complete
})
.catch((err) => {
console.error(`error in repo lock tx for did: ${status.did}`, err)
txFinished = true
})

await repoLockedDefer.complete

let importedRev
try {
importedRev = await doImport(
Expand Down Expand Up @@ -231,29 +240,35 @@ const lockAndTransfer = async (
}
},
)

assert(!txFinished)
const accountRes = await getUserAccount(ctx, status.did)
await axios.post(`${pds.url}/xrpc/com.atproto.temp.transferAccount`, {
did: status.did,
handle: accountRes.handle,
plcOp,
})

defer.resolve()
status.phase = TransferPhase.transferredPds
await updateStatus(db, status)

transferDefer.resolve()

await ctx.db.transaction(async (dbTxn) => {
await dbTxn.db
.updateTable('user_account')
.where('did', '=', status.did)
.set({ pdsId: pds.id })
.execute()
await dbTxn.db
.updateTable('repo_root')
.where('did', '=', status.did)
.set({ did: `migrated-${status.did}` })
.execute()
})

await ctx.db.db
.updateTable('user_account')
.where('did', '=', status.did)
.set({ pdsId: pds.id })
.execute()
await ctx.db.db
.updateTable('repo_root')
.where('did', '=', status.did)
.set({ did: `migrated-${status.did}` })
.execute()
return importedRev
} finally {
defer.resolve()
transferDefer.resolve()
}
}

Expand Down
13 changes: 13 additions & 0 deletions packages/pds/src/migrate-script/setup-db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Migrator } from 'kysely'
import { dbMigrationProvider, getDb } from './db'

const run = async () => {
const db = getDb()

const migrator = new Migrator({ db, provider: dbMigrationProvider })
const { error } = await migrator.migrateToLatest()
if (error) throw error
return db
}

run()

0 comments on commit e036de4

Please sign in to comment.