diff --git a/packages/pds/package.json b/packages/pds/package.json index 57c90869c1e..a859805eb58 100644 --- a/packages/pds/package.json +++ b/packages/pds/package.json @@ -48,6 +48,7 @@ "compression": "^1.7.4", "cors": "^2.8.5", "disposable-email": "^0.2.3", + "dotenv": "^16.0.3", "express": "^4.17.2", "express-async-errors": "^3.1.1", "file-type": "^16.5.4", diff --git a/packages/pds/src/migrate-script/db.ts b/packages/pds/src/migrate-script/db.ts index 289005ec7f1..9f8c81ca548 100644 --- a/packages/pds/src/migrate-script/db.ts +++ b/packages/pds/src/migrate-script/db.ts @@ -1,19 +1,17 @@ -import { Kysely, MigrationProvider, Migrator, SqliteDialect } from 'kysely' +import { Kysely, MigrationProvider, SqliteDialect } from 'kysely' import SqliteDB from 'better-sqlite3' -export const getDb = async (loc: string): Promise => { - const db = new Kysely({ +const LOCATION = 'migrate.db' + +export const getDb = (): MigrateDb => { + return new Kysely({ dialect: new SqliteDialect({ - database: new SqliteDB(loc), + database: new SqliteDB(LOCATION), }), }) - const migrator = new Migrator({ db, provider: dbMigrationProvider }) - const { error } = await migrator.migrateToLatest() - if (error) throw error - return db } -const dbMigrationProvider: MigrationProvider = { +export const dbMigrationProvider: MigrationProvider = { async getMigrations() { return { '1': { @@ -23,8 +21,10 @@ const dbMigrationProvider: MigrationProvider = { .addColumn('did', 'varchar', (col) => col.primaryKey()) .addColumn('pdsId', 'integer') .addColumn('signingKey', 'varchar') - .addColumn('phase', 'integer') - .addColumn('importedRev', 'varchar') + .addColumn('phase', 'integer', (col) => col.notNull().defaultTo(0)) + .addColumn('importedRev', 'varchar', (col) => + col.notNull().defaultTo(0), + ) .addColumn('failed', 'integer') .execute() await db.schema @@ -64,7 +64,8 @@ export enum TransferPhase { notStarted = 0, reservedKey = 1, initImport = 2, - transferred = 4, + transferredPds = 3, + transferredEntryway = 4, preferences = 5, takedowns = 6, completed = 7, diff --git a/packages/pds/src/migrate-script/load-dids.ts b/packages/pds/src/migrate-script/load-dids.ts new file mode 100644 index 00000000000..7a5c400cd04 --- /dev/null +++ b/packages/pds/src/migrate-script/load-dids.ts @@ -0,0 +1,22 @@ +import fs from 'fs/promises' +import { getDb } from './db' + +const run = async () => { + const db = getDb() + const didsFile = await fs.readFile('dids.txt') + const dids = didsFile + .toString() + .split('\n') + .map((did) => ({ + did: did.trim(), + phase: 0, + failed: 0 as const, + })) + await db + .insertInto('status') + .values(dids) + .onConflict((oc) => oc.doNothing()) + .execute() +} + +run() diff --git a/packages/pds/src/migrate-script/index.ts b/packages/pds/src/migrate-script/migrate-all.ts similarity index 88% rename from packages/pds/src/migrate-script/index.ts rename to packages/pds/src/migrate-script/migrate-all.ts index bb7ac0091b9..2ab7c0ea24c 100644 --- a/packages/pds/src/migrate-script/index.ts +++ b/packages/pds/src/migrate-script/migrate-all.ts @@ -1,3 +1,5 @@ +import assert from 'node:assert' +import dotenv from 'dotenv' import axios from 'axios' import * as ui8 from 'uint8arrays' import AtpAgent from '@atproto/api' @@ -8,6 +10,8 @@ import { envToCfg, envToSecrets, readEnv } from '../config' import AppContext from '../context' import { FailedTakedown, MigrateDb, Status, TransferPhase, getDb } from './db' +dotenv.config() + type PdsInfo = { id: number did: string @@ -16,7 +20,7 @@ type PdsInfo = { } export const runScript = async () => { - const db = await getDb('/data/migrate.db') + const db = getDb() const env = readEnv() const cfg = envToCfg(env) const secrets = envToSecrets(env) @@ -36,16 +40,21 @@ export const runScript = async () => { .selectFrom('status') .where('status.phase', '<', 7) .where('failed', '!=', 1) + .orderBy('phase', 'desc') + .orderBy('did') .selectAll() .execute() let pdsCounter = 0 for (const status of todo) { let pdsId = status.pdsId if (!pdsId) { - pdsId = pdsCounter % pdsInfos.length + pdsId = pdsInfos[pdsCounter % pdsInfos.length].id pdsCounter++ } - const pdsInfo = pdsInfos[pdsId] + const pdsInfo = pdsInfos.find((info) => info.id === pdsId) + if (!pdsInfo) { + throw new Error(`could not find pds with id: ${pdsId}`) + } try { await migrateRepo(ctx, db, pdsInfo, status, adminToken) console.log(`completed migrating: ${status.did}`) @@ -57,7 +66,6 @@ export const runScript = async () => { .execute() console.error(`failed to migrate: ${status.did}`, err) } - pdsCounter++ } console.log('DONE WITH ALL') } @@ -69,7 +77,6 @@ const migrateRepo = async ( status: Status, adminToken: string, ) => { - console.log(`reserve (${status.did}, ${status.phase})`) if (status.phase < TransferPhase.reservedKey) { const signingKey = await reserveSigningKey(pds, status.did) status.signingKey = signingKey @@ -77,7 +84,6 @@ const migrateRepo = async ( await updateStatus(db, status) } - console.log(`import (${status.did}, ${status.phase})`) if (status.phase < TransferPhase.initImport) { const importedRev = await doImport(ctx, db, pds, status.did) if (importedRev) { @@ -87,15 +93,13 @@ const migrateRepo = async ( await updateStatus(db, status) } - console.log(`transfer (${status.did}, ${status.phase})`) - if (status.phase < TransferPhase.transferred) { + if (status.phase < TransferPhase.transferredPds) { const importedRev = await lockAndTransfer(ctx, db, pds, status) status.importedRev = importedRev - status.phase = TransferPhase.transferred + status.phase = TransferPhase.transferredEntryway await updateStatus(db, status) } - console.log(`prefs (${status.did}, ${status.phase})`) if (status.phase < TransferPhase.preferences) { try { await transferPreferences(ctx, pds, status.did) @@ -111,7 +115,6 @@ const migrateRepo = async ( } } - console.log(`takedowns (${status.did}, ${status.phase})`) if (status.phase < TransferPhase.takedowns) { await transferTakedowns(ctx, db, pds, status.did, adminToken) status.phase = TransferPhase.completed @@ -168,7 +171,7 @@ const doImport = async ( if (typeof log === 'string') { const lines = log.split('\n') for (const line of lines) { - if (line.indexOf('failed to import blob') > 0) { + if (line.includes('failed to import blob')) { const cid = line.split(':')[1].trim() await logFailedBlob(db, did, cid) } @@ -185,17 +188,23 @@ const lockAndTransfer = async ( pds: PdsInfo, status: Status, ) => { - const defer = createDeferrable() + const repoLockedDefer = createDeferrable() + const transferDefer = createDeferrable() + let txFinished = false ctx.db .transaction(async (dbTxn) => { const storage = new SqlRepoStorage(dbTxn, status.did) await storage.lockRepo() - await defer.complete + repoLockedDefer.resolve() + await transferDefer.complete }) .catch((err) => { console.error(`error in repo lock tx for did: ${status.did}`, err) + txFinished = true }) + await repoLockedDefer.complete + let importedRev try { importedRev = await doImport( @@ -231,7 +240,7 @@ const lockAndTransfer = async ( } }, ) - + assert(!txFinished) const accountRes = await getUserAccount(ctx, status.did) await axios.post(`${pds.url}/xrpc/com.atproto.temp.transferAccount`, { did: status.did, @@ -239,21 +248,27 @@ const lockAndTransfer = async ( plcOp, }) - defer.resolve() + status.phase = TransferPhase.transferredPds + await updateStatus(db, status) + + transferDefer.resolve() + + await ctx.db.transaction(async (dbTxn) => { + await dbTxn.db + .updateTable('user_account') + .where('did', '=', status.did) + .set({ pdsId: pds.id }) + .execute() + await dbTxn.db + .updateTable('repo_root') + .where('did', '=', status.did) + .set({ did: `migrated-${status.did}` }) + .execute() + }) - await ctx.db.db - .updateTable('user_account') - .where('did', '=', status.did) - .set({ pdsId: pds.id }) - .execute() - await ctx.db.db - .updateTable('repo_root') - .where('did', '=', status.did) - .set({ did: `migrated-${status.did}` }) - .execute() return importedRev } finally { - defer.resolve() + transferDefer.resolve() } } diff --git a/packages/pds/src/migrate-script/setup-db.ts b/packages/pds/src/migrate-script/setup-db.ts new file mode 100644 index 00000000000..798874738fe --- /dev/null +++ b/packages/pds/src/migrate-script/setup-db.ts @@ -0,0 +1,13 @@ +import { Migrator } from 'kysely' +import { dbMigrationProvider, getDb } from './db' + +const run = async () => { + const db = getDb() + + const migrator = new Migrator({ db, provider: dbMigrationProvider }) + const { error } = await migrator.migrateToLatest() + if (error) throw error + return db +} + +run()