diff --git a/lexicons/com/atproto/temp/upgradeRepoVersion.json b/lexicons/com/atproto/temp/upgradeRepoVersion.json new file mode 100644 index 00000000000..05d8c7197fd --- /dev/null +++ b/lexicons/com/atproto/temp/upgradeRepoVersion.json @@ -0,0 +1,21 @@ +{ + "lexicon": 1, + "id": "com.atproto.temp.upgradeRepoVersion", + "defs": { + "main": { + "type": "procedure", + "description": "Upgrade a repo to v3", + "input": { + "encoding": "application/json", + "schema": { + "type": "object", + "required": ["did"], + "properties": { + "did": { "type": "string", "format": "did" }, + "force": { "type": "boolean" } + } + } + } + } + } +} diff --git a/packages/api/src/client/index.ts b/packages/api/src/client/index.ts index 761097aad7c..dfd5e51ff57 100644 --- a/packages/api/src/client/index.ts +++ b/packages/api/src/client/index.ts @@ -70,6 +70,7 @@ import * as ComAtprotoSyncListRepos from './types/com/atproto/sync/listRepos' import * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate' import * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl' import * as ComAtprotoSyncSubscribeRepos from './types/com/atproto/sync/subscribeRepos' +import * as ComAtprotoTempUpgradeRepoVersion from './types/com/atproto/temp/upgradeRepoVersion' import * as AppBskyActorDefs from './types/app/bsky/actor/defs' import * as AppBskyActorGetPreferences from './types/app/bsky/actor/getPreferences' import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile' @@ -194,6 +195,7 @@ export * as ComAtprotoSyncListRepos from './types/com/atproto/sync/listRepos' export * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate' export * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl' export * as ComAtprotoSyncSubscribeRepos from './types/com/atproto/sync/subscribeRepos' +export * as ComAtprotoTempUpgradeRepoVersion from './types/com/atproto/temp/upgradeRepoVersion' export * as AppBskyActorDefs from './types/app/bsky/actor/defs' export * as AppBskyActorGetPreferences from './types/app/bsky/actor/getPreferences' export * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile' @@ -322,6 +324,7 @@ export class AtprotoNS { repo: RepoNS server: ServerNS sync: SyncNS + temp: TempNS constructor(service: AtpServiceClient) { this._service = service @@ -332,6 +335,7 @@ export class AtprotoNS { this.repo = new RepoNS(service) this.server = new ServerNS(service) this.sync = new SyncNS(service) + this.temp = new TempNS(service) } } @@ -1007,6 +1011,25 @@ export class SyncNS { } } +export class TempNS { + _service: AtpServiceClient + + constructor(service: AtpServiceClient) { + this._service = service + } + + upgradeRepoVersion( + data?: ComAtprotoTempUpgradeRepoVersion.InputSchema, + opts?: ComAtprotoTempUpgradeRepoVersion.CallOptions, + ): Promise { + return this._service.xrpc + .call('com.atproto.temp.upgradeRepoVersion', opts?.qp, data, opts) + .catch((e) => { + throw ComAtprotoTempUpgradeRepoVersion.toKnownErr(e) + }) + } +} + export class AppNS { _service: AtpServiceClient bsky: BskyNS diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index f3c93c5e805..c682f239a3b 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -3517,6 +3517,32 @@ export const schemaDict = { }, }, }, + ComAtprotoTempUpgradeRepoVersion: { + lexicon: 1, + id: 'com.atproto.temp.upgradeRepoVersion', + defs: { + main: { + type: 'procedure', + description: 'Upgrade a repo to v3', + input: { + encoding: 'application/json', + schema: { + type: 'object', + required: ['did'], + properties: { + did: { + type: 'string', + format: 'did', + }, + force: { + type: 'boolean', + }, + }, + }, + }, + }, + }, + }, AppBskyActorDefs: { lexicon: 1, id: 'app.bsky.actor.defs', @@ -6838,6 +6864,7 @@ export const ids = { ComAtprotoSyncNotifyOfUpdate: 'com.atproto.sync.notifyOfUpdate', ComAtprotoSyncRequestCrawl: 'com.atproto.sync.requestCrawl', ComAtprotoSyncSubscribeRepos: 'com.atproto.sync.subscribeRepos', + ComAtprotoTempUpgradeRepoVersion: 'com.atproto.temp.upgradeRepoVersion', AppBskyActorDefs: 'app.bsky.actor.defs', AppBskyActorGetPreferences: 'app.bsky.actor.getPreferences', AppBskyActorGetProfile: 'app.bsky.actor.getProfile', diff --git a/packages/api/src/client/types/com/atproto/temp/upgradeRepoVersion.ts b/packages/api/src/client/types/com/atproto/temp/upgradeRepoVersion.ts new file mode 100644 index 00000000000..abaf3a9f1b0 --- /dev/null +++ b/packages/api/src/client/types/com/atproto/temp/upgradeRepoVersion.ts @@ -0,0 +1,33 @@ +/** + * GENERATED CODE - DO NOT MODIFY + */ +import { Headers, XRPCError } from '@atproto/xrpc' +import { ValidationResult, BlobRef } from '@atproto/lexicon' +import { isObj, hasProp } from '../../../../util' +import { lexicons } from '../../../../lexicons' +import { CID } from 'multiformats/cid' + +export interface QueryParams {} + +export interface InputSchema { + did: string + force?: boolean + [k: string]: unknown +} + +export interface CallOptions { + headers?: Headers + qp?: QueryParams + encoding: 'application/json' +} + +export interface Response { + success: boolean + headers: Headers +} + +export function toKnownErr(e: any) { + if (e instanceof XRPCError) { + } + return e +} diff --git a/packages/bsky/src/lexicon/index.ts b/packages/bsky/src/lexicon/index.ts index 93435056503..c0840abea2c 100644 --- a/packages/bsky/src/lexicon/index.ts +++ b/packages/bsky/src/lexicon/index.ts @@ -67,6 +67,7 @@ import * as ComAtprotoSyncListRepos from './types/com/atproto/sync/listRepos' import * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate' import * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl' import * as ComAtprotoSyncSubscribeRepos from './types/com/atproto/sync/subscribeRepos' +import * as ComAtprotoTempUpgradeRepoVersion from './types/com/atproto/temp/upgradeRepoVersion' import * as AppBskyActorGetPreferences from './types/app/bsky/actor/getPreferences' import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile' import * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles' @@ -163,6 +164,7 @@ export class AtprotoNS { repo: RepoNS server: ServerNS sync: SyncNS + temp: TempNS constructor(server: Server) { this._server = server @@ -173,6 +175,7 @@ export class AtprotoNS { this.repo = new RepoNS(server) this.server = new ServerNS(server) this.sync = new SyncNS(server) + this.temp = new TempNS(server) } } @@ -870,6 +873,25 @@ export class SyncNS { } } +export class TempNS { + _server: Server + + constructor(server: Server) { + this._server = server + } + + upgradeRepoVersion( + cfg: ConfigOf< + AV, + ComAtprotoTempUpgradeRepoVersion.Handler>, + ComAtprotoTempUpgradeRepoVersion.HandlerReqCtx> + >, + ) { + const nsid = 'com.atproto.temp.upgradeRepoVersion' // @ts-ignore + return this._server.xrpc.method(nsid, cfg) + } +} + export class AppNS { _server: Server bsky: BskyNS diff --git a/packages/bsky/src/lexicon/lexicons.ts b/packages/bsky/src/lexicon/lexicons.ts index f3c93c5e805..c682f239a3b 100644 --- a/packages/bsky/src/lexicon/lexicons.ts +++ b/packages/bsky/src/lexicon/lexicons.ts @@ -3517,6 +3517,32 @@ export const schemaDict = { }, }, }, + ComAtprotoTempUpgradeRepoVersion: { + lexicon: 1, + id: 'com.atproto.temp.upgradeRepoVersion', + defs: { + main: { + type: 'procedure', + description: 'Upgrade a repo to v3', + input: { + encoding: 'application/json', + schema: { + type: 'object', + required: ['did'], + properties: { + did: { + type: 'string', + format: 'did', + }, + force: { + type: 'boolean', + }, + }, + }, + }, + }, + }, + }, AppBskyActorDefs: { lexicon: 1, id: 'app.bsky.actor.defs', @@ -6838,6 +6864,7 @@ export const ids = { ComAtprotoSyncNotifyOfUpdate: 'com.atproto.sync.notifyOfUpdate', ComAtprotoSyncRequestCrawl: 'com.atproto.sync.requestCrawl', ComAtprotoSyncSubscribeRepos: 'com.atproto.sync.subscribeRepos', + ComAtprotoTempUpgradeRepoVersion: 'com.atproto.temp.upgradeRepoVersion', AppBskyActorDefs: 'app.bsky.actor.defs', AppBskyActorGetPreferences: 'app.bsky.actor.getPreferences', AppBskyActorGetProfile: 'app.bsky.actor.getProfile', diff --git a/packages/bsky/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts b/packages/bsky/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts new file mode 100644 index 00000000000..c5b77876fec --- /dev/null +++ b/packages/bsky/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts @@ -0,0 +1,39 @@ +/** + * GENERATED CODE - DO NOT MODIFY + */ +import express from 'express' +import { ValidationResult, BlobRef } from '@atproto/lexicon' +import { lexicons } from '../../../../lexicons' +import { isObj, hasProp } from '../../../../util' +import { CID } from 'multiformats/cid' +import { HandlerAuth } from '@atproto/xrpc-server' + +export interface QueryParams {} + +export interface InputSchema { + did: string + force?: boolean + [k: string]: unknown +} + +export interface HandlerInput { + encoding: 'application/json' + body: InputSchema +} + +export interface HandlerError { + status: number + message?: string +} + +export type HandlerOutput = HandlerError | void +export type HandlerReqCtx = { + auth: HA + params: QueryParams + input: HandlerInput + req: express.Request + res: express.Response +} +export type Handler = ( + ctx: HandlerReqCtx, +) => Promise | HandlerOutput diff --git a/packages/pds/src/api/com/atproto/index.ts b/packages/pds/src/api/com/atproto/index.ts index a5c26c80495..59fca0fd5a9 100644 --- a/packages/pds/src/api/com/atproto/index.ts +++ b/packages/pds/src/api/com/atproto/index.ts @@ -6,6 +6,7 @@ import moderation from './moderation' import repo from './repo' import serverMethods from './server' import sync from './sync' +import upgradeRepoVersion from './upgradeRepoVersion' export default function (server: Server, ctx: AppContext) { admin(server, ctx) @@ -14,4 +15,5 @@ export default function (server: Server, ctx: AppContext) { repo(server, ctx) serverMethods(server, ctx) sync(server, ctx) + upgradeRepoVersion(server, ctx) } diff --git a/packages/pds/src/api/com/atproto/upgradeRepoVersion.ts b/packages/pds/src/api/com/atproto/upgradeRepoVersion.ts new file mode 100644 index 00000000000..dd20943be3e --- /dev/null +++ b/packages/pds/src/api/com/atproto/upgradeRepoVersion.ts @@ -0,0 +1,140 @@ +import { InvalidRequestError } from '@atproto/xrpc-server' +import { TID, chunkArray, wait } from '@atproto/common' +import { Server } from '../../../lexicon' +import SqlRepoStorage from '../../../sql-repo-storage' +import AppContext from '../../../context' +import { + BlockMap, + CidSet, + DataDiff, + MST, + MemoryBlockstore, + def, + signCommit, +} from '@atproto/repo' +import { CID } from 'multiformats/cid' +import { formatSeqCommit, sequenceEvt } from '../../../sequencer' +import { httpLogger as log } from '../../../logger' + +export default function (server: Server, ctx: AppContext) { + server.com.atproto.temp.upgradeRepoVersion({ + auth: ctx.roleVerifier, + handler: async ({ input, auth }) => { + if (!auth.credentials.admin) { + throw new InvalidRequestError('must be admin') + } + const { did, force } = input.body + + await ctx.db.transaction(async (dbTxn) => { + const storage = new SqlRepoStorage(dbTxn, did) + await obtainLock(storage) + const prevCid = await storage.getRoot() + if (!prevCid) { + throw new InvalidRequestError('Could not find repo') + } + const prev = await storage.readObj(prevCid, def.versionedCommit) + const records = await dbTxn.db + .selectFrom('record') + .select(['collection', 'rkey', 'cid']) + .where('did', '=', did) + .execute() + const memoryStore = new MemoryBlockstore() + let data = await MST.create(memoryStore) + for (const record of records) { + const dataKey = record.collection + '/' + record.rkey + const cid = CID.parse(record.cid) + data = await data.add(dataKey, cid) + } + const dataCid = await data.getPointer() + if (!force && !dataCid.equals(prev.data)) { + throw new InvalidRequestError('Data cid did not match') + } + const recordCids = records.map((r) => r.cid) + const diff = await DataDiff.of(data, null) + const cidsToKeep = [...recordCids, ...diff.newMstBlocks.cids()] + const rev = TID.nextStr(prev.rev) + if (force) { + const got = await storage.getBlocks(diff.newMstBlocks.cids()) + const toAdd = diff.newMstBlocks.getMany(got.missing) + log.info( + { missing: got.missing.length }, + 'force added missing blocks', + ) + // puts any new blocks & no-ops for already existing + await storage.putMany(toAdd.blocks, rev) + } + for (const chunk of chunkArray(cidsToKeep, 500)) { + const cidStrs = chunk.map((c) => c.toString()) + await dbTxn.db + .updateTable('ipld_block') + .set({ repoRev: rev }) + .where('creator', '=', did) + .where('cid', 'in', cidStrs) + .execute() + } + await dbTxn.db + .deleteFrom('ipld_block') + .where('creator', '=', did) + .where((qb) => + qb.where('repoRev', 'is', null).orWhere('repoRev', '!=', rev), + ) + .execute() + await dbTxn.db + .updateTable('repo_blob') + .set({ repoRev: rev }) + .where('did', '=', did) + .execute() + await dbTxn.db + .updateTable('record') + .set({ repoRev: rev }) + .where('did', '=', did) + .execute() + const commit = await signCommit( + { + did, + version: 3, + rev: TID.nextStr(), + prev: prevCid, + data: dataCid, + }, + ctx.repoSigningKey, + ) + const newBlocks = new BlockMap() + const commitCid = await newBlocks.add(commit) + await storage.putMany(newBlocks, rev) + await dbTxn.db + .updateTable('repo_root') + .set({ + root: commitCid.toString(), + rev, + indexedAt: storage.getTimestamp(), + }) + .where('did', '=', did) + .execute() + + const commitData = { + cid: commitCid, + rev, + prev: prevCid, + since: null, + newBlocks, + removedCids: new CidSet(), + } + const seqEvt = await formatSeqCommit(did, commitData, []) + await sequenceEvt(dbTxn, seqEvt) + }) + }, + }) +} + +const obtainLock = async (storage: SqlRepoStorage, tries = 20) => { + const obtained = await storage.lockRepo() + if (obtained) { + return + } + if (tries < 1) { + throw new InvalidRequestError('could not obtain lock') + } + await wait(50) + return obtainLock(storage, tries - 1) +} diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 71af4ae62dc..466bc9a5613 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -180,6 +180,7 @@ export class AppContext { repoSigningKey, blobstore, appViewAgent, + pdsHostname: cfg.service.hostname, appViewDid: cfg.bskyAppView.did, appViewCdnUrlPattern: cfg.bskyAppView.cdnUrlPattern, backgroundQueue, diff --git a/packages/pds/src/lexicon/index.ts b/packages/pds/src/lexicon/index.ts index 93435056503..c0840abea2c 100644 --- a/packages/pds/src/lexicon/index.ts +++ b/packages/pds/src/lexicon/index.ts @@ -67,6 +67,7 @@ import * as ComAtprotoSyncListRepos from './types/com/atproto/sync/listRepos' import * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate' import * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl' import * as ComAtprotoSyncSubscribeRepos from './types/com/atproto/sync/subscribeRepos' +import * as ComAtprotoTempUpgradeRepoVersion from './types/com/atproto/temp/upgradeRepoVersion' import * as AppBskyActorGetPreferences from './types/app/bsky/actor/getPreferences' import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile' import * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles' @@ -163,6 +164,7 @@ export class AtprotoNS { repo: RepoNS server: ServerNS sync: SyncNS + temp: TempNS constructor(server: Server) { this._server = server @@ -173,6 +175,7 @@ export class AtprotoNS { this.repo = new RepoNS(server) this.server = new ServerNS(server) this.sync = new SyncNS(server) + this.temp = new TempNS(server) } } @@ -870,6 +873,25 @@ export class SyncNS { } } +export class TempNS { + _server: Server + + constructor(server: Server) { + this._server = server + } + + upgradeRepoVersion( + cfg: ConfigOf< + AV, + ComAtprotoTempUpgradeRepoVersion.Handler>, + ComAtprotoTempUpgradeRepoVersion.HandlerReqCtx> + >, + ) { + const nsid = 'com.atproto.temp.upgradeRepoVersion' // @ts-ignore + return this._server.xrpc.method(nsid, cfg) + } +} + export class AppNS { _server: Server bsky: BskyNS diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index f3c93c5e805..c682f239a3b 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -3517,6 +3517,32 @@ export const schemaDict = { }, }, }, + ComAtprotoTempUpgradeRepoVersion: { + lexicon: 1, + id: 'com.atproto.temp.upgradeRepoVersion', + defs: { + main: { + type: 'procedure', + description: 'Upgrade a repo to v3', + input: { + encoding: 'application/json', + schema: { + type: 'object', + required: ['did'], + properties: { + did: { + type: 'string', + format: 'did', + }, + force: { + type: 'boolean', + }, + }, + }, + }, + }, + }, + }, AppBskyActorDefs: { lexicon: 1, id: 'app.bsky.actor.defs', @@ -6838,6 +6864,7 @@ export const ids = { ComAtprotoSyncNotifyOfUpdate: 'com.atproto.sync.notifyOfUpdate', ComAtprotoSyncRequestCrawl: 'com.atproto.sync.requestCrawl', ComAtprotoSyncSubscribeRepos: 'com.atproto.sync.subscribeRepos', + ComAtprotoTempUpgradeRepoVersion: 'com.atproto.temp.upgradeRepoVersion', AppBskyActorDefs: 'app.bsky.actor.defs', AppBskyActorGetPreferences: 'app.bsky.actor.getPreferences', AppBskyActorGetProfile: 'app.bsky.actor.getProfile', diff --git a/packages/pds/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts b/packages/pds/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts new file mode 100644 index 00000000000..c5b77876fec --- /dev/null +++ b/packages/pds/src/lexicon/types/com/atproto/temp/upgradeRepoVersion.ts @@ -0,0 +1,39 @@ +/** + * GENERATED CODE - DO NOT MODIFY + */ +import express from 'express' +import { ValidationResult, BlobRef } from '@atproto/lexicon' +import { lexicons } from '../../../../lexicons' +import { isObj, hasProp } from '../../../../util' +import { CID } from 'multiformats/cid' +import { HandlerAuth } from '@atproto/xrpc-server' + +export interface QueryParams {} + +export interface InputSchema { + did: string + force?: boolean + [k: string]: unknown +} + +export interface HandlerInput { + encoding: 'application/json' + body: InputSchema +} + +export interface HandlerError { + status: number + message?: string +} + +export type HandlerOutput = HandlerError | void +export type HandlerReqCtx = { + auth: HA + params: QueryParams + input: HandlerInput + req: express.Request + res: express.Response +} +export type Handler = ( + ctx: HandlerReqCtx, +) => Promise | HandlerOutput diff --git a/packages/pds/src/services/index.ts b/packages/pds/src/services/index.ts index 30dffedf061..954a5544e6e 100644 --- a/packages/pds/src/services/index.ts +++ b/packages/pds/src/services/index.ts @@ -14,6 +14,7 @@ import { LocalService } from './local' export function createServices(resources: { repoSigningKey: crypto.Keypair blobstore: BlobStore + pdsHostname: string appViewAgent?: AtpAgent appViewDid?: string appViewCdnUrlPattern?: string @@ -23,6 +24,7 @@ export function createServices(resources: { const { repoSigningKey, blobstore, + pdsHostname, appViewAgent, appViewDid, appViewCdnUrlPattern, @@ -41,6 +43,7 @@ export function createServices(resources: { ), local: LocalService.creator( repoSigningKey, + pdsHostname, appViewAgent, appViewDid, appViewCdnUrlPattern, diff --git a/packages/pds/src/services/local/index.ts b/packages/pds/src/services/local/index.ts index 867f3baf4e7..7af82ee4173 100644 --- a/packages/pds/src/services/local/index.ts +++ b/packages/pds/src/services/local/index.ts @@ -39,6 +39,7 @@ export class LocalService { constructor( public db: Database, public signingKey: Keypair, + public pdsHostname: string, public appviewAgent?: AtpAgent, public appviewDid?: string, public appviewCdnUrlPattern?: string, @@ -46,6 +47,7 @@ export class LocalService { static creator( signingKey: Keypair, + pdsHostname: string, appviewAgent?: AtpAgent, appviewDid?: string, appviewCdnUrlPattern?: string, @@ -54,6 +56,7 @@ export class LocalService { new LocalService( db, signingKey, + pdsHostname, appviewAgent, appviewDid, appviewCdnUrlPattern, @@ -62,7 +65,7 @@ export class LocalService { getImageUrl(pattern: CommonSignedUris, did: string, cid: string) { if (!this.appviewCdnUrlPattern) { - return '' + return `https://${this.pdsHostname}/xrpc/${ids.ComAtprotoSyncGetBlob}?did=${did}&cid=${cid}` } return util.format(this.appviewCdnUrlPattern, pattern, did, cid) }