Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pds v2 migrate script #1800

Closed
wants to merge 82 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
32ddcf2
first pass
dholms Nov 1, 2023
a0437c3
tweak
dholms Nov 1, 2023
f414a4e
script edits
dholms Nov 2, 2023
d662a6e
reorg
dholms Nov 2, 2023
98b1c17
some more tidy
dholms Nov 2, 2023
9edc55f
read dids as maps
dholms Nov 2, 2023
66e334d
entrypoint for migrate script
devinivy Nov 2, 2023
0f8248b
Merge branch 'pds-v2-migrate-script' of github.com:bluesky-social/atp…
devinivy Nov 2, 2023
3e16cee
build branch
dholms Nov 2, 2023
b614a78
export script & change filenames
dholms Nov 2, 2023
21937f2
fix ambiguous column
dholms Nov 2, 2023
ec8ef4e
tidy script
dholms Nov 2, 2023
c491782
revamp migrate script
dholms Nov 2, 2023
e93e2f1
move db to data folder
dholms Nov 2, 2023
fcf8151
delete old script
dholms Nov 2, 2023
537e2c3
more logs
dholms Nov 2, 2023
c9532e0
missing executes
dholms Nov 2, 2023
5477f20
tweak log parsing
dholms Nov 2, 2023
e036de4
script tweaks
dholms Nov 3, 2023
008be35
actually run script
dholms Nov 3, 2023
093f86f
tweak logging
dholms Nov 3, 2023
69d863f
tweak
dholms Nov 3, 2023
8aa20ef
im dumb dumb
dholms Nov 3, 2023
f2201eb
patch script
dholms Nov 4, 2023
ee47798
patch
dholms Nov 4, 2023
de4f8d3
failed blobs script
dholms Nov 4, 2023
03ec698
run script
dholms Nov 4, 2023
de61594
tweak
dholms Nov 4, 2023
4a85750
better err handling
dholms Nov 4, 2023
f13b2b9
log import output
dholms Nov 6, 2023
709ec8f
small tweaks
dholms Nov 6, 2023
76c9699
tweak
dholms Nov 6, 2023
9c604c8
add concurrency
dholms Nov 6, 2023
1d1aa73
error tracking in script
dholms Nov 7, 2023
6226fe0
allow large bodies
dholms Nov 7, 2023
2525cfd
add admin auth headers
dholms Nov 7, 2023
604f2b7
mark not-failed once done
dholms Nov 7, 2023
1081ace
repair blobs that have been taken down
dholms Nov 7, 2023
bb51f9b
send admin headers
dholms Nov 7, 2023
f1dde36
rm colon
dholms Nov 7, 2023
4a99d2d
less logs
dholms Nov 7, 2023
a4e4ae6
check failures script
dholms Nov 7, 2023
d01d887
chunk load dids
dholms Nov 8, 2023
80110be
smaller chunks
dholms Nov 8, 2023
315138a
im dumb dumb
dholms Nov 8, 2023
3a018d8
increase concurrency
dholms Nov 8, 2023
a7cbea3
split out utils
dholms Nov 8, 2023
596d9c5
repair blobs on main script
dholms Nov 8, 2023
b6c48de
concurrent blob repair
dholms Nov 8, 2023
40d1660
tidy
dholms Nov 8, 2023
867c997
concurrency 1
dholms Nov 8, 2023
1a99165
not concurrent
dholms Nov 8, 2023
0bc2542
tweak script
dholms Nov 8, 2023
dc6b7f6
log only after failure to repair blob
dholms Nov 8, 2023
aa7a337
graceful shutdown
dholms Nov 8, 2023
c806ebb
repair prefs script
dholms Nov 8, 2023
47dfe36
cleanup
dholms Nov 8, 2023
fd4f44d
log err
dholms Nov 8, 2023
93dd22e
fix repair blob
dholms Nov 8, 2023
ad7a3b0
automate repair prefs
dholms Nov 8, 2023
fd4c745
increase concurrency
dholms Nov 8, 2023
375d44d
add a retry on migration
dholms Nov 8, 2023
5480515
refactor + better load-dids
dholms Nov 8, 2023
704a9c3
round robin dids
dholms Nov 9, 2023
f0c6f05
script per pds
dholms Nov 9, 2023
b109825
lower concurrency
dholms Nov 9, 2023
a7cbd0a
wal mode
dholms Nov 9, 2023
41e7e10
get rev from ipld_block
dholms Nov 9, 2023
86ca769
repair takedowns script
dholms Nov 9, 2023
9bf2fa9
tweak load dids
dholms Nov 9, 2023
f94accf
tweak
dholms Nov 9, 2023
09e46d9
concurrent repair blobs
dholms Nov 9, 2023
f842835
final await
dholms Nov 9, 2023
d9680bc
revert load dids
dholms Nov 9, 2023
24d2191
require pds id
dholms Nov 10, 2023
eb92915
script tweaks
dholms Nov 10, 2023
5976564
save borked
dholms Nov 10, 2023
525303f
keep alive http client
dholms Nov 16, 2023
e35f684
allow did:web transter
dholms Nov 20, 2023
3446b7e
repair imports script
dholms Nov 20, 2023
b95fb0e
tweaks
dholms Nov 20, 2023
7c50f02
load failed blobs script
dholms Nov 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-and-push-pds-ghcr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
push:
branches:
- main
- pds-v2-migrate-script
env:
REGISTRY: ghcr.io
USERNAME: ${{ github.actor }}
Expand Down
1 change: 1 addition & 0 deletions packages/pds/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"compression": "^1.7.4",
"cors": "^2.8.5",
"disposable-email": "^0.2.3",
"dotenv": "^16.0.3",
"express": "^4.17.2",
"express-async-errors": "^3.1.1",
"file-type": "^16.5.4",
Expand Down
1 change: 1 addition & 0 deletions packages/pds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export { Database } from './db'
export { DiskBlobStore, MemoryBlobStore } from './storage'
export { AppContext } from './context'
export { httpLogger } from './logger'
export { runScript } from './migrate-script'

export class PDS {
public ctx: AppContext
Expand Down
63 changes: 63 additions & 0 deletions packages/pds/src/migrate-script/check-failures.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { countAll } from '../db/util'
import { MigrateDb, getDb } from './db'

const run = async () => {
const db = getDb()
const results = await Promise.all([
totalCount(db),
failureCount(db),
failedBlobs(db),
failedPrefs(db),
failedTakedowns(db),
])
console.log(`
Total migrations: ${results[0]}
Failed migrations: ${results[1]}
Failed blobs: ${results[2]}
Failed prefs: ${results[3]}
Failed takedowns: ${results[4]}
`)
}

const totalCount = async (db: MigrateDb) => {
const res = await db
.selectFrom('status')
.select(countAll.as('count'))
.executeTakeFirst()
return res?.count
}

const failureCount = async (db: MigrateDb) => {
const res = await db
.selectFrom('status')
.select(countAll.as('count'))
.where('failed', '=', 1)
.executeTakeFirst()
return res?.count
}

const failedBlobs = async (db: MigrateDb) => {
const res = await db
.selectFrom('failed_blob')
.select(countAll.as('count'))
.executeTakeFirst()
return res?.count
}

const failedPrefs = async (db: MigrateDb) => {
const res = await db
.selectFrom('failed_pref')
.select(countAll.as('count'))
.executeTakeFirst()
return res?.count
}

const failedTakedowns = async (db: MigrateDb) => {
const res = await db
.selectFrom('failed_takedown')
.select(countAll.as('count'))
.executeTakeFirst()
return res?.count
}

run()
139 changes: 139 additions & 0 deletions packages/pds/src/migrate-script/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { Kysely, MigrationProvider, SqliteDialect } from 'kysely'
import SqliteDB from 'better-sqlite3'

const LOCATION = 'migrate.db'

export const getDb = (): MigrateDb => {
const sqliteDb = new SqliteDB(LOCATION)
sqliteDb.pragma('journal_mode = WAL')
sqliteDb.pragma('busy_timeout = 5000')
return new Kysely<Schema>({
dialect: new SqliteDialect({
database: sqliteDb,
}),
})
}

export const dbMigrationProvider: MigrationProvider = {
async getMigrations() {
return {
'1': {
async up(db: Kysely<unknown>) {
await db.schema
.createTable('status')
.addColumn('did', 'varchar', (col) => col.primaryKey())
.addColumn('pdsId', 'integer')
.addColumn('signingKey', 'varchar')
.addColumn('phase', 'integer', (col) => col.notNull().defaultTo(0))
.addColumn('importedRev', 'varchar')
.addColumn('failed', 'integer', (col) => col.notNull().defaultTo(0))
.execute()
await db.schema
.createTable('failed_pref')
.addColumn('did', 'varchar', (col) => col.primaryKey())
.execute()
await db.schema
.createTable('failed_blob')
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addPrimaryKeyConstraint('failed_blob_pkey', ['did', 'cid'])
.execute()
await db.schema
.createTable('failed_takedown')
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('recordUri', 'varchar')
.addColumn('recordCid', 'varchar')
.addColumn('blobCid', 'varchar')
.execute()
},
async down() {},
},
'2': {
async up(db: Kysely<unknown>) {
await db.schema
.alterTable('status')
.addColumn('err', 'varchar')
.execute()
await db.schema
.alterTable('failed_pref')
.addColumn('err', 'varchar')
.execute()
await db.schema
.alterTable('failed_blob')
.addColumn('err', 'varchar')
.execute()
await db.schema
.alterTable('failed_takedown')
.addColumn('err', 'varchar')
.execute()
},
async down() {},
},
'3': {
async up(db: Kysely<unknown>) {
await db.schema
.createTable('failed_import')
.addColumn('did', 'varchar', (col) => col.primaryKey())
.addColumn('err', 'varchar')
.execute()
},
async down() {},
},
}
},
}

export type MigrateDb = Kysely<Schema>

type Schema = {
status: Status
failed_pref: FailedPreference
failed_blob: FailedBlob
failed_takedown: FailedTakedown
failed_import: FailedImport
}

export enum TransferPhase {
notStarted = 0,
reservedKey = 1,
initImport = 2,
transferredPds = 3,
transferredEntryway = 4,
preferences = 5,
takedowns = 6,
completed = 7,
}

export type Status = {
did: string
pdsId: number | null
signingKey: string | null
phase: TransferPhase
importedRev: string | null
failed: 0 | 1
err: string | null
}

export type FailedPreference = {
did: string
err: string | null
}

export type FailedBlob = {
did: string
cid: string
err: string | null
}

export type FailedTakedown = {
did: string
blobCid?: string
recordUri?: string
recordCid?: string
err: string | null
}

export type FailedImport = {
did: string
err: string | null
}
35 changes: 35 additions & 0 deletions packages/pds/src/migrate-script/load-dids.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { chunkArray } from '@atproto/common'
import { setupEnv } from './util'

const run = async () => {
const amount = parseInt(process.argv[2])
const pdsId = parseInt(process.argv[3])
console.log(`loading next ${amount} dids`)
const { db, ctx } = await setupEnv()

const didsRes = await ctx.db.db
.selectFrom('user_account')
.select('did')
.where('pdsId', 'is', null)
.orderBy('did', 'asc')
.limit(amount)
.execute()
const dids = didsRes.map((row) => ({
did: row.did,
phase: 0,
pdsId,
failed: 0 as const,
}))

await Promise.all(
chunkArray(dids, 50).map((chunk) =>
db
.insertInto('status')
.values(chunk)
.onConflict((oc) => oc.doNothing())
.execute(),
),
)
}

run()
31 changes: 31 additions & 0 deletions packages/pds/src/migrate-script/load-failed-blobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import fs from 'fs/promises'
import { chunkArray } from '@atproto/common'
import { getDb } from './db'

const run = async () => {
const file = await fs.readFile('missing-blobs.txt')
const rows = file
.toString()
.split('\n')
.filter((row) => row.length > 5)
.map((row) => {
const [did, cid] = row.split(' ')
return {
did: did.trim(),
cid: cid.trim(),
}
})
const db = getDb()

await Promise.all(
chunkArray(rows, 500).map((chunk) =>
db
.insertInto('failed_blob')
.values(chunk)
.onConflict((oc) => oc.doNothing())
.execute(),
),
)
}

run()
Loading
Loading