Skip to content

Commit

Permalink
split out utils
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Nov 8, 2023
1 parent 3a018d8 commit a7cbea3
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 68 deletions.
16 changes: 3 additions & 13 deletions packages/pds/src/migrate-script/migrate-all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,11 @@ import { envToCfg, envToSecrets, readEnv } from '../config'
import AppContext from '../context'
import { FailedTakedown, MigrateDb, Status, TransferPhase, getDb } from './db'
import PQueue from 'p-queue'
import { AdminHeaders, PdsInfo } from './util'

dotenv.config()

type PdsInfo = {
id: number
did: string
url: string
agent: AtpAgent
}

type AdminHeaders = {
authorization: string
}

export const runScript = async () => {
console.log('starting')
const db = getDb()
const env = readEnv()
const cfg = envToCfg(env)
Expand All @@ -47,14 +37,14 @@ export const runScript = async () => {
const todo = await db
.selectFrom('status')
.where('status.phase', '<', 7)
// .where('failed', '!=', 1)
.orderBy('phase', 'desc')
.orderBy('did')
.selectAll()
.execute()
let pdsCounter = 0
let completed = 0
let failed = 0
console.log('migrating: ', todo.length)
const migrateQueue = new PQueue({ concurrency: 40 })
for (const status of todo) {
if (!status.pdsId) {
Expand Down
57 changes: 2 additions & 55 deletions packages/pds/src/migrate-script/repair-blobs.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import dotenv from 'dotenv'
import axios from 'axios'
import * as ui8 from 'uint8arrays'
import AtpAgent from '@atproto/api'
import { envToCfg, envToSecrets, readEnv } from '../config'
import AppContext from '../context'
import { MigrateDb, getDb } from './db'
import { CID } from 'multiformats/cid'
import { getDb } from './db'
import { repairBlob } from './util'

dotenv.config()

type PdsInfo = {
id: number
did: string
url: string
agent: AtpAgent
}

export const runScript = async () => {
const db = getDb()
const env = readEnv()
Expand Down Expand Up @@ -55,49 +47,4 @@ export const runScript = async () => {
console.log('DONE WITH ALL')
}

const repairBlob = async (
ctx: AppContext,
db: MigrateDb,
pds: PdsInfo,
did: string,
cid: string,
adminToken: string,
) => {
const blob = await ctx.db.db
.selectFrom('blob')
.where('cid', '=', cid)
.selectAll()
.executeTakeFirst()
if (!blob) return
let blobStream
try {
blobStream = await ctx.blobstore.getStream(CID.parse(blob.cid))
} catch (err) {
const hasTakedown = await ctx.db.db
.selectFrom('repo_blob')
.where('did', '=', did)
.where('cid', '=', cid)
.where('takedownRef', 'is not', null)
.executeTakeFirst()
if (hasTakedown) {
return
}
throw err
}
await axios.post(`${pds.url}/xrpc/com.atproto.temp.pushBlob`, blobStream, {
params: { did },
headers: {
'content-type': blob.mimeType,
authorization: `Basic ${adminToken}`,
},
decompress: true,
responseType: 'stream',
})
await db
.deleteFrom('failed_blob')
.where('did', '=', did)
.where('cid', '=', blob.cid)
.execute()
}

runScript()
62 changes: 62 additions & 0 deletions packages/pds/src/migrate-script/util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import axios from 'axios'
import AtpAgent from '@atproto/api'
import AppContext from '../context'
import { MigrateDb } from './db'
import { CID } from 'multiformats/cid'

export type PdsInfo = {
id: number
did: string
url: string
agent: AtpAgent
}

export type AdminHeaders = {
authorization: string
}

export const repairBlob = async (
ctx: AppContext,
db: MigrateDb,
pds: PdsInfo,
did: string,
cid: string,
adminToken: string,
) => {
const blob = await ctx.db.db
.selectFrom('blob')
.where('cid', '=', cid)
.where('creator', '=', did)
.selectAll()
.executeTakeFirst()
if (!blob) return
let blobStream
try {
blobStream = await ctx.blobstore.getStream(CID.parse(blob.cid))
} catch (err) {
const hasTakedown = await ctx.db.db
.selectFrom('repo_blob')
.where('did', '=', did)
.where('cid', '=', cid)
.where('takedownRef', 'is not', null)
.executeTakeFirst()
if (hasTakedown) {
return
}
throw err
}
await axios.post(`${pds.url}/xrpc/com.atproto.temp.pushBlob`, blobStream, {
params: { did },
headers: {
'content-type': blob.mimeType,
authorization: `Basic ${adminToken}`,
},
decompress: true,
responseType: 'stream',
})
await db
.deleteFrom('failed_blob')
.where('did', '=', did)
.where('cid', '=', blob.cid)
.execute()
}

0 comments on commit a7cbea3

Please sign in to comment.