From 13207a248e9b7b02f862a41ea8d3f7cf767ef87b Mon Sep 17 00:00:00 2001 From: Daniel Holmgren Date: Tue, 12 Sep 2023 12:37:57 -0500 Subject: [PATCH] Remove temp.upgradeRepoVersion (#1588) remove temp.upgradeRepoVersion --- .../com/atproto/temp/upgradeRepoVersion.json | 21 --- packages/api/src/client/index.ts | 23 --- packages/api/src/client/lexicons.ts | 27 --- .../com/atproto/temp/upgradeRepoVersion.ts | 33 ---- packages/bsky/src/lexicon/index.ts | 22 --- packages/bsky/src/lexicon/lexicons.ts | 27 --- .../com/atproto/temp/upgradeRepoVersion.ts | 39 ---- packages/pds/src/api/com/atproto/index.ts | 2 - .../src/api/com/atproto/upgradeRepoVersion.ts | 140 -------------- packages/pds/src/lexicon/index.ts | 22 --- packages/pds/src/lexicon/lexicons.ts | 27 --- .../com/atproto/temp/upgradeRepoVersion.ts | 39 ---- .../migrations/repo-version-upgrade.test.ts | 173 ------------------ 13 files changed, 595 deletions(-) delete mode 100644 lexicons/com/atproto/temp/upgradeRepoVersion.json delete mode 100644 packages/api/src/client/types/com/atproto/temp/upgradeRepoVersion.ts delete mode 100644 packages/bsky/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts delete mode 100644 packages/pds/src/api/com/atproto/upgradeRepoVersion.ts delete mode 100644 packages/pds/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts delete mode 100644 packages/pds/tests/migrations/repo-version-upgrade.test.ts diff --git a/lexicons/com/atproto/temp/upgradeRepoVersion.json b/lexicons/com/atproto/temp/upgradeRepoVersion.json deleted file mode 100644 index 05d8c7197fd..00000000000 --- a/lexicons/com/atproto/temp/upgradeRepoVersion.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "lexicon": 1, - "id": "com.atproto.temp.upgradeRepoVersion", - "defs": { - "main": { - "type": "procedure", - "description": "Upgrade a repo to v3", - "input": { - "encoding": "application/json", - "schema": { - "type": "object", - "required": ["did"], - "properties": { - "did": { "type": "string", "format": "did" }, - "force": { "type": "boolean" } - } - } - } - } - } -} diff --git a/packages/api/src/client/index.ts b/packages/api/src/client/index.ts index f2921d3e5a5..16728348374 100644 --- a/packages/api/src/client/index.ts +++ b/packages/api/src/client/index.ts @@ -70,7 +70,6 @@ import * as ComAtprotoSyncListRepos from './types/com/atproto/sync/listRepos' import * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate' import * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl' import * as ComAtprotoSyncSubscribeRepos from './types/com/atproto/sync/subscribeRepos' -import * as ComAtprotoTempUpgradeRepoVersion from './types/com/atproto/temp/upgradeRepoVersion' import * as AppBskyActorDefs from './types/app/bsky/actor/defs' import * as AppBskyActorGetPreferences from './types/app/bsky/actor/getPreferences' import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile' @@ -194,7 +193,6 @@ export * as ComAtprotoSyncListRepos from './types/com/atproto/sync/listRepos' export * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate' export * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl' export * as ComAtprotoSyncSubscribeRepos from './types/com/atproto/sync/subscribeRepos' -export * as ComAtprotoTempUpgradeRepoVersion from './types/com/atproto/temp/upgradeRepoVersion' export * as AppBskyActorDefs from './types/app/bsky/actor/defs' export * as AppBskyActorGetPreferences from './types/app/bsky/actor/getPreferences' export * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile' @@ -322,7 +320,6 @@ export class AtprotoNS { repo: RepoNS server: ServerNS sync: SyncNS - temp: TempNS constructor(service: AtpServiceClient) { this._service = service @@ -333,7 +330,6 @@ export class AtprotoNS { this.repo = new RepoNS(service) this.server = new ServerNS(service) this.sync = new SyncNS(service) - this.temp = new TempNS(service) } } @@ -1009,25 +1005,6 @@ export class SyncNS { } } -export class TempNS { - _service: AtpServiceClient - - constructor(service: AtpServiceClient) { - this._service = service - } - - upgradeRepoVersion( - data?: ComAtprotoTempUpgradeRepoVersion.InputSchema, - opts?: ComAtprotoTempUpgradeRepoVersion.CallOptions, - ): Promise { - return this._service.xrpc - .call('com.atproto.temp.upgradeRepoVersion', opts?.qp, data, opts) - .catch((e) => { - throw ComAtprotoTempUpgradeRepoVersion.toKnownErr(e) - }) - } -} - export class AppNS { _service: AtpServiceClient bsky: BskyNS diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index 6f1cef89925..30da3464cb2 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -3517,32 +3517,6 @@ export const schemaDict = { }, }, }, - ComAtprotoTempUpgradeRepoVersion: { - lexicon: 1, - id: 'com.atproto.temp.upgradeRepoVersion', - defs: { - main: { - type: 'procedure', - description: 'Upgrade a repo to v3', - input: { - encoding: 'application/json', - schema: { - type: 'object', - required: ['did'], - properties: { - did: { - type: 'string', - format: 'did', - }, - force: { - type: 'boolean', - }, - }, - }, - }, - }, - }, - }, AppBskyActorDefs: { lexicon: 1, id: 'app.bsky.actor.defs', @@ -6828,7 +6802,6 @@ export const ids = { ComAtprotoSyncNotifyOfUpdate: 'com.atproto.sync.notifyOfUpdate', ComAtprotoSyncRequestCrawl: 'com.atproto.sync.requestCrawl', ComAtprotoSyncSubscribeRepos: 'com.atproto.sync.subscribeRepos', - ComAtprotoTempUpgradeRepoVersion: 'com.atproto.temp.upgradeRepoVersion', AppBskyActorDefs: 'app.bsky.actor.defs', AppBskyActorGetPreferences: 'app.bsky.actor.getPreferences', AppBskyActorGetProfile: 'app.bsky.actor.getProfile', diff --git a/packages/api/src/client/types/com/atproto/temp/upgradeRepoVersion.ts b/packages/api/src/client/types/com/atproto/temp/upgradeRepoVersion.ts deleted file mode 100644 index abaf3a9f1b0..00000000000 --- a/packages/api/src/client/types/com/atproto/temp/upgradeRepoVersion.ts +++ /dev/null @@ -1,33 +0,0 @@ -/** - * GENERATED CODE - DO NOT MODIFY - */ -import { Headers, XRPCError } from '@atproto/xrpc' -import { ValidationResult, BlobRef } from '@atproto/lexicon' -import { isObj, hasProp } from '../../../../util' -import { lexicons } from '../../../../lexicons' -import { CID } from 'multiformats/cid' - -export interface QueryParams {} - -export interface InputSchema { - did: string - force?: boolean - [k: string]: unknown -} - -export interface CallOptions { - headers?: Headers - qp?: QueryParams - encoding: 'application/json' -} - -export interface Response { - success: boolean - headers: Headers -} - -export function toKnownErr(e: any) { - if (e instanceof XRPCError) { - } - return e -} diff --git a/packages/bsky/src/lexicon/index.ts b/packages/bsky/src/lexicon/index.ts index df15a497c63..028b3cbf397 100644 --- a/packages/bsky/src/lexicon/index.ts +++ b/packages/bsky/src/lexicon/index.ts @@ -67,7 +67,6 @@ import * as ComAtprotoSyncListRepos from './types/com/atproto/sync/listRepos' import * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate' import * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl' import * as ComAtprotoSyncSubscribeRepos from './types/com/atproto/sync/subscribeRepos' -import * as ComAtprotoTempUpgradeRepoVersion from './types/com/atproto/temp/upgradeRepoVersion' import * as AppBskyActorGetPreferences from './types/app/bsky/actor/getPreferences' import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile' import * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles' @@ -163,7 +162,6 @@ export class AtprotoNS { repo: RepoNS server: ServerNS sync: SyncNS - temp: TempNS constructor(server: Server) { this._server = server @@ -174,7 +172,6 @@ export class AtprotoNS { this.repo = new RepoNS(server) this.server = new ServerNS(server) this.sync = new SyncNS(server) - this.temp = new TempNS(server) } } @@ -872,25 +869,6 @@ export class SyncNS { } } -export class TempNS { - _server: Server - - constructor(server: Server) { - this._server = server - } - - upgradeRepoVersion( - cfg: ConfigOf< - AV, - ComAtprotoTempUpgradeRepoVersion.Handler>, - ComAtprotoTempUpgradeRepoVersion.HandlerReqCtx> - >, - ) { - const nsid = 'com.atproto.temp.upgradeRepoVersion' // @ts-ignore - return this._server.xrpc.method(nsid, cfg) - } -} - export class AppNS { _server: Server bsky: BskyNS diff --git a/packages/bsky/src/lexicon/lexicons.ts b/packages/bsky/src/lexicon/lexicons.ts index 6f1cef89925..30da3464cb2 100644 --- a/packages/bsky/src/lexicon/lexicons.ts +++ b/packages/bsky/src/lexicon/lexicons.ts @@ -3517,32 +3517,6 @@ export const schemaDict = { }, }, }, - ComAtprotoTempUpgradeRepoVersion: { - lexicon: 1, - id: 'com.atproto.temp.upgradeRepoVersion', - defs: { - main: { - type: 'procedure', - description: 'Upgrade a repo to v3', - input: { - encoding: 'application/json', - schema: { - type: 'object', - required: ['did'], - properties: { - did: { - type: 'string', - format: 'did', - }, - force: { - type: 'boolean', - }, - }, - }, - }, - }, - }, - }, AppBskyActorDefs: { lexicon: 1, id: 'app.bsky.actor.defs', @@ -6828,7 +6802,6 @@ export const ids = { ComAtprotoSyncNotifyOfUpdate: 'com.atproto.sync.notifyOfUpdate', ComAtprotoSyncRequestCrawl: 'com.atproto.sync.requestCrawl', ComAtprotoSyncSubscribeRepos: 'com.atproto.sync.subscribeRepos', - ComAtprotoTempUpgradeRepoVersion: 'com.atproto.temp.upgradeRepoVersion', AppBskyActorDefs: 'app.bsky.actor.defs', AppBskyActorGetPreferences: 'app.bsky.actor.getPreferences', AppBskyActorGetProfile: 'app.bsky.actor.getProfile', diff --git a/packages/bsky/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts b/packages/bsky/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts deleted file mode 100644 index c5b77876fec..00000000000 --- a/packages/bsky/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts +++ /dev/null @@ -1,39 +0,0 @@ -/** - * GENERATED CODE - DO NOT MODIFY - */ -import express from 'express' -import { ValidationResult, BlobRef } from '@atproto/lexicon' -import { lexicons } from '../../../../lexicons' -import { isObj, hasProp } from '../../../../util' -import { CID } from 'multiformats/cid' -import { HandlerAuth } from '@atproto/xrpc-server' - -export interface QueryParams {} - -export interface InputSchema { - did: string - force?: boolean - [k: string]: unknown -} - -export interface HandlerInput { - encoding: 'application/json' - body: InputSchema -} - -export interface HandlerError { - status: number - message?: string -} - -export type HandlerOutput = HandlerError | void -export type HandlerReqCtx = { - auth: HA - params: QueryParams - input: HandlerInput - req: express.Request - res: express.Response -} -export type Handler = ( - ctx: HandlerReqCtx, -) => Promise | HandlerOutput diff --git a/packages/pds/src/api/com/atproto/index.ts b/packages/pds/src/api/com/atproto/index.ts index 59fca0fd5a9..a5c26c80495 100644 --- a/packages/pds/src/api/com/atproto/index.ts +++ b/packages/pds/src/api/com/atproto/index.ts @@ -6,7 +6,6 @@ import moderation from './moderation' import repo from './repo' import serverMethods from './server' import sync from './sync' -import upgradeRepoVersion from './upgradeRepoVersion' export default function (server: Server, ctx: AppContext) { admin(server, ctx) @@ -15,5 +14,4 @@ export default function (server: Server, ctx: AppContext) { repo(server, ctx) serverMethods(server, ctx) sync(server, ctx) - upgradeRepoVersion(server, ctx) } diff --git a/packages/pds/src/api/com/atproto/upgradeRepoVersion.ts b/packages/pds/src/api/com/atproto/upgradeRepoVersion.ts deleted file mode 100644 index dd20943be3e..00000000000 --- a/packages/pds/src/api/com/atproto/upgradeRepoVersion.ts +++ /dev/null @@ -1,140 +0,0 @@ -import { InvalidRequestError } from '@atproto/xrpc-server' -import { TID, chunkArray, wait } from '@atproto/common' -import { Server } from '../../../lexicon' -import SqlRepoStorage from '../../../sql-repo-storage' -import AppContext from '../../../context' -import { - BlockMap, - CidSet, - DataDiff, - MST, - MemoryBlockstore, - def, - signCommit, -} from '@atproto/repo' -import { CID } from 'multiformats/cid' -import { formatSeqCommit, sequenceEvt } from '../../../sequencer' -import { httpLogger as log } from '../../../logger' - -export default function (server: Server, ctx: AppContext) { - server.com.atproto.temp.upgradeRepoVersion({ - auth: ctx.roleVerifier, - handler: async ({ input, auth }) => { - if (!auth.credentials.admin) { - throw new InvalidRequestError('must be admin') - } - const { did, force } = input.body - - await ctx.db.transaction(async (dbTxn) => { - const storage = new SqlRepoStorage(dbTxn, did) - await obtainLock(storage) - const prevCid = await storage.getRoot() - if (!prevCid) { - throw new InvalidRequestError('Could not find repo') - } - const prev = await storage.readObj(prevCid, def.versionedCommit) - const records = await dbTxn.db - .selectFrom('record') - .select(['collection', 'rkey', 'cid']) - .where('did', '=', did) - .execute() - const memoryStore = new MemoryBlockstore() - let data = await MST.create(memoryStore) - for (const record of records) { - const dataKey = record.collection + '/' + record.rkey - const cid = CID.parse(record.cid) - data = await data.add(dataKey, cid) - } - const dataCid = await data.getPointer() - if (!force && !dataCid.equals(prev.data)) { - throw new InvalidRequestError('Data cid did not match') - } - const recordCids = records.map((r) => r.cid) - const diff = await DataDiff.of(data, null) - const cidsToKeep = [...recordCids, ...diff.newMstBlocks.cids()] - const rev = TID.nextStr(prev.rev) - if (force) { - const got = await storage.getBlocks(diff.newMstBlocks.cids()) - const toAdd = diff.newMstBlocks.getMany(got.missing) - log.info( - { missing: got.missing.length }, - 'force added missing blocks', - ) - // puts any new blocks & no-ops for already existing - await storage.putMany(toAdd.blocks, rev) - } - for (const chunk of chunkArray(cidsToKeep, 500)) { - const cidStrs = chunk.map((c) => c.toString()) - await dbTxn.db - .updateTable('ipld_block') - .set({ repoRev: rev }) - .where('creator', '=', did) - .where('cid', 'in', cidStrs) - .execute() - } - await dbTxn.db - .deleteFrom('ipld_block') - .where('creator', '=', did) - .where((qb) => - qb.where('repoRev', 'is', null).orWhere('repoRev', '!=', rev), - ) - .execute() - await dbTxn.db - .updateTable('repo_blob') - .set({ repoRev: rev }) - .where('did', '=', did) - .execute() - await dbTxn.db - .updateTable('record') - .set({ repoRev: rev }) - .where('did', '=', did) - .execute() - const commit = await signCommit( - { - did, - version: 3, - rev: TID.nextStr(), - prev: prevCid, - data: dataCid, - }, - ctx.repoSigningKey, - ) - const newBlocks = new BlockMap() - const commitCid = await newBlocks.add(commit) - await storage.putMany(newBlocks, rev) - await dbTxn.db - .updateTable('repo_root') - .set({ - root: commitCid.toString(), - rev, - indexedAt: storage.getTimestamp(), - }) - .where('did', '=', did) - .execute() - - const commitData = { - cid: commitCid, - rev, - prev: prevCid, - since: null, - newBlocks, - removedCids: new CidSet(), - } - const seqEvt = await formatSeqCommit(did, commitData, []) - await sequenceEvt(dbTxn, seqEvt) - }) - }, - }) -} - -const obtainLock = async (storage: SqlRepoStorage, tries = 20) => { - const obtained = await storage.lockRepo() - if (obtained) { - return - } - if (tries < 1) { - throw new InvalidRequestError('could not obtain lock') - } - await wait(50) - return obtainLock(storage, tries - 1) -} diff --git a/packages/pds/src/lexicon/index.ts b/packages/pds/src/lexicon/index.ts index df15a497c63..028b3cbf397 100644 --- a/packages/pds/src/lexicon/index.ts +++ b/packages/pds/src/lexicon/index.ts @@ -67,7 +67,6 @@ import * as ComAtprotoSyncListRepos from './types/com/atproto/sync/listRepos' import * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate' import * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl' import * as ComAtprotoSyncSubscribeRepos from './types/com/atproto/sync/subscribeRepos' -import * as ComAtprotoTempUpgradeRepoVersion from './types/com/atproto/temp/upgradeRepoVersion' import * as AppBskyActorGetPreferences from './types/app/bsky/actor/getPreferences' import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile' import * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles' @@ -163,7 +162,6 @@ export class AtprotoNS { repo: RepoNS server: ServerNS sync: SyncNS - temp: TempNS constructor(server: Server) { this._server = server @@ -174,7 +172,6 @@ export class AtprotoNS { this.repo = new RepoNS(server) this.server = new ServerNS(server) this.sync = new SyncNS(server) - this.temp = new TempNS(server) } } @@ -872,25 +869,6 @@ export class SyncNS { } } -export class TempNS { - _server: Server - - constructor(server: Server) { - this._server = server - } - - upgradeRepoVersion( - cfg: ConfigOf< - AV, - ComAtprotoTempUpgradeRepoVersion.Handler>, - ComAtprotoTempUpgradeRepoVersion.HandlerReqCtx> - >, - ) { - const nsid = 'com.atproto.temp.upgradeRepoVersion' // @ts-ignore - return this._server.xrpc.method(nsid, cfg) - } -} - export class AppNS { _server: Server bsky: BskyNS diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index 6f1cef89925..30da3464cb2 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -3517,32 +3517,6 @@ export const schemaDict = { }, }, }, - ComAtprotoTempUpgradeRepoVersion: { - lexicon: 1, - id: 'com.atproto.temp.upgradeRepoVersion', - defs: { - main: { - type: 'procedure', - description: 'Upgrade a repo to v3', - input: { - encoding: 'application/json', - schema: { - type: 'object', - required: ['did'], - properties: { - did: { - type: 'string', - format: 'did', - }, - force: { - type: 'boolean', - }, - }, - }, - }, - }, - }, - }, AppBskyActorDefs: { lexicon: 1, id: 'app.bsky.actor.defs', @@ -6828,7 +6802,6 @@ export const ids = { ComAtprotoSyncNotifyOfUpdate: 'com.atproto.sync.notifyOfUpdate', ComAtprotoSyncRequestCrawl: 'com.atproto.sync.requestCrawl', ComAtprotoSyncSubscribeRepos: 'com.atproto.sync.subscribeRepos', - ComAtprotoTempUpgradeRepoVersion: 'com.atproto.temp.upgradeRepoVersion', AppBskyActorDefs: 'app.bsky.actor.defs', AppBskyActorGetPreferences: 'app.bsky.actor.getPreferences', AppBskyActorGetProfile: 'app.bsky.actor.getProfile', diff --git a/packages/pds/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts b/packages/pds/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts deleted file mode 100644 index c5b77876fec..00000000000 --- a/packages/pds/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts +++ /dev/null @@ -1,39 +0,0 @@ -/** - * GENERATED CODE - DO NOT MODIFY - */ -import express from 'express' -import { ValidationResult, BlobRef } from '@atproto/lexicon' -import { lexicons } from '../../../../lexicons' -import { isObj, hasProp } from '../../../../util' -import { CID } from 'multiformats/cid' -import { HandlerAuth } from '@atproto/xrpc-server' - -export interface QueryParams {} - -export interface InputSchema { - did: string - force?: boolean - [k: string]: unknown -} - -export interface HandlerInput { - encoding: 'application/json' - body: InputSchema -} - -export interface HandlerError { - status: number - message?: string -} - -export type HandlerOutput = HandlerError | void -export type HandlerReqCtx = { - auth: HA - params: QueryParams - input: HandlerInput - req: express.Request - res: express.Response -} -export type Handler = ( - ctx: HandlerReqCtx, -) => Promise | HandlerOutput diff --git a/packages/pds/tests/migrations/repo-version-upgrade.test.ts b/packages/pds/tests/migrations/repo-version-upgrade.test.ts deleted file mode 100644 index d73bc8cc7ae..00000000000 --- a/packages/pds/tests/migrations/repo-version-upgrade.test.ts +++ /dev/null @@ -1,173 +0,0 @@ -import AtpAgent from '@atproto/api' -import { SeedClient } from '../seeds/client' -import basicSeed from '../seeds/basic' -import { TestNetworkNoAppView } from '@atproto/dev-env' -import { randomBytes } from 'crypto' -import { TID, cidForCbor } from '@atproto/common' -import { IpldBlock } from '../../src/db/tables/ipld-block' -import { readCarWithRoot, verifyRepo } from '@atproto/repo' -import { Database } from '../../src' - -describe('repo version upgrade', () => { - let network: TestNetworkNoAppView - let db: Database - let agent: AtpAgent - let sc: SeedClient - - let alice: string - - beforeAll(async () => { - network = await TestNetworkNoAppView.create({ - dbPostgresSchema: 'repo_version_upgrade', - }) - db = network.pds.ctx.db - agent = network.pds.getClient() - sc = new SeedClient(agent) - await basicSeed(sc) - alice = sc.dids.alice - }) - - afterAll(async () => { - await network.close() - }) - - const getNonAliceData = async () => { - const ipldBlocksQb = db.db - .selectFrom('ipld_block') - .where('creator', '!=', alice) - .selectAll() - .orderBy('creator') - .orderBy('cid') - const recordsQb = db.db - .selectFrom('record') - .where('did', '!=', alice) - .selectAll() - .orderBy('did') - .orderBy('uri') - const repoBlobsQb = db.db - .selectFrom('repo_blob') - .where('did', '!=', alice) - .selectAll() - .orderBy('did') - .orderBy('cid') - const repoRootsQb = db.db - .selectFrom('repo_root') - .where('did', '!=', alice) - .selectAll() - .orderBy('did') - const [ipldBlocks, records, repoBlobs, repoRoots] = await Promise.all([ - ipldBlocksQb.execute(), - recordsQb.execute(), - repoBlobsQb.execute(), - repoRootsQb.execute(), - ]) - return { - ipldBlocks, - records, - repoBlobs, - repoRoots, - } - } - - const addCruft = async (did: string) => { - const cruft: IpldBlock[] = [] - for (let i = 0; i < 1000; i++) { - const bytes = randomBytes(128) - const cid = await cidForCbor(bytes) - cruft.push({ - cid: cid.toString(), - creator: did, - repoRev: Math.random() > 0.5 ? TID.nextStr() : null, - size: 128, - content: bytes, - }) - } - await db.db.insertInto('ipld_block').values(cruft).execute() - return cruft - } - - const fetchAndVerifyRepo = async (did: string) => { - const res = await agent.api.com.atproto.sync.getRepo({ - did, - }) - const car = await readCarWithRoot(res.data) - return verifyRepo( - car.blocks, - car.root, - alice, - network.pds.ctx.repoSigningKey.did(), - ) - } - - it('upgrades a repo', async () => { - const nonAliceDataBefore = await getNonAliceData() - const aliceRepoBefore = await fetchAndVerifyRepo(alice) - - const cruft = await addCruft(alice) - - await agent.api.com.atproto.temp.upgradeRepoVersion( - { did: alice }, - { - headers: network.pds.adminAuthHeaders('admin'), - encoding: 'application/json', - }, - ) - - const nonAliceDataAfter = await getNonAliceData() - - // does not affect other users - expect(nonAliceDataAfter).toEqual(nonAliceDataBefore) - - // cleans up cruft - const res = await db.db - .selectFrom('ipld_block') - .selectAll() - .where('creator', '=', alice) - .execute() - const cidSet = new Set(res.map((row) => row.cid)) - for (const row of cruft) { - expect(cidSet.has(row.cid)).toBe(false) - } - - const aliceRepoAfter = await fetchAndVerifyRepo(alice) - expect(aliceRepoAfter.creates).toEqual(aliceRepoBefore.creates) - - // it updated the repo rev on all blocks/records/blobs - const root = await db.db - .selectFrom('repo_root') - .where('did', '=', alice) - .selectAll() - .executeTakeFirst() - if (!root || !root.rev) { - throw new Error('did not set rev') - } - expect(root.root).toEqual(aliceRepoAfter.commit.cid.toString()) - const nonUpgradedRecords = await db.db - .selectFrom('record') - .where('did', '=', alice) - .where((qb) => - qb.where('repoRev', '!=', root.rev).orWhere('repoRev', 'is', null), - ) - .selectAll() - .execute() - expect(nonUpgradedRecords.length).toBe(0) - const nonUpgradedBlocks = await db.db - .selectFrom('ipld_block') - .where('creator', '=', alice) - .where((qb) => - qb.where('repoRev', '!=', root.rev).orWhere('repoRev', 'is', null), - ) - .selectAll() - .execute() - expect(nonUpgradedBlocks.length).toBe(0) - const nonUpgradedBlobs = await db.db - .selectFrom('repo_blob') - .where('did', '=', alice) - .where((qb) => - qb.where('repoRev', '!=', root.rev).orWhere('repoRev', 'is', null), - ) - .selectAll() - .execute() - expect(nonUpgradedBlobs.length).toBe(0) - }) -})