From f9e90967e303cfce348270bc3037e794c261be30 Mon Sep 17 00:00:00 2001 From: dholms Date: Tue, 3 Oct 2023 15:15:04 -0500 Subject: [PATCH] wip --- .../actor-db}/index.ts | 35 ++-- .../migrations/20230613T164932261Z-init.ts | 0 .../actor-db}/migrations/index.ts | 0 .../actor-db}/migrations/provider.ts | 0 .../actor-db}/schema/backlink.ts | 0 .../actor-db}/schema/blob.ts | 0 .../actor-db}/schema/index.ts | 8 + .../actor-db}/schema/ipld-block.ts | 0 .../actor-db}/schema/record.ts | 0 .../actor-db}/schema/repo-blob.ts | 0 .../actor-db}/schema/repo-root.ts | 2 +- .../actor-db}/schema/user-pref.ts | 0 packages/pds/src/actor-store/index.ts | 94 +++++++++++ .../src/{user-store => actor-store}/local.ts | 10 +- .../preference.ts} | 8 +- packages/pds/src/actor-store/reader.ts | 1 + .../src/{user-store => actor-store}/record.ts | 153 ++++-------------- .../blobs.ts => actor-store/repo/blob.ts} | 73 +++------ packages/pds/src/actor-store/repo/index.ts | 4 + .../src/actor-store/repo/record-transactor.ts | 96 +++++++++++ .../{user-store => actor-store/repo}/repo.ts | 136 ++++------------ .../repo}/sql-repo-storage.ts | 9 +- .../src/api/com/atproto/repo/applyWrites.ts | 23 ++- .../src/api/com/atproto/repo/createRecord.ts | 81 ++-------- .../src/api/com/atproto/repo/deleteRecord.ts | 42 ++--- .../src/api/com/atproto/repo/describeRepo.ts | 6 +- .../pds/src/api/com/atproto/repo/getRecord.ts | 6 +- .../src/api/com/atproto/repo/listRecords.ts | 19 +-- .../pds/src/api/com/atproto/repo/putRecord.ts | 72 ++++----- .../src/api/com/atproto/repo/uploadBlob.ts | 7 +- packages/pds/src/context.ts | 22 ++- 31 files changed, 433 insertions(+), 474 deletions(-) rename packages/pds/src/{user-db => actor-store/actor-db}/index.ts (77%) rename packages/pds/src/{user-db => actor-store/actor-db}/migrations/20230613T164932261Z-init.ts (100%) rename packages/pds/src/{user-db => actor-store/actor-db}/migrations/index.ts (100%) rename packages/pds/src/{user-db => actor-store/actor-db}/migrations/provider.ts (100%) rename packages/pds/src/{user-db => actor-store/actor-db}/schema/backlink.ts (100%) rename packages/pds/src/{user-db => actor-store/actor-db}/schema/blob.ts (100%) rename packages/pds/src/{user-db => actor-store/actor-db}/schema/index.ts (60%) rename packages/pds/src/{user-db => actor-store/actor-db}/schema/ipld-block.ts (100%) rename packages/pds/src/{user-db => actor-store/actor-db}/schema/record.ts (100%) rename packages/pds/src/{user-db => actor-store/actor-db}/schema/repo-blob.ts (100%) rename packages/pds/src/{user-db => actor-store/actor-db}/schema/repo-root.ts (82%) rename packages/pds/src/{user-db => actor-store/actor-db}/schema/user-pref.ts (100%) create mode 100644 packages/pds/src/actor-store/index.ts rename packages/pds/src/{user-store => actor-store}/local.ts (98%) rename packages/pds/src/{user-store/preferences.ts => actor-store/preference.ts} (91%) create mode 100644 packages/pds/src/actor-store/reader.ts rename packages/pds/src/{user-store => actor-store}/record.ts (55%) rename packages/pds/src/{user-store/blobs.ts => actor-store/repo/blob.ts} (81%) create mode 100644 packages/pds/src/actor-store/repo/index.ts create mode 100644 packages/pds/src/actor-store/repo/record-transactor.ts rename packages/pds/src/{user-store => actor-store/repo}/repo.ts (63%) rename packages/pds/src/{user-store => actor-store/repo}/sql-repo-storage.ts (97%) diff --git a/packages/pds/src/user-db/index.ts b/packages/pds/src/actor-store/actor-db/index.ts similarity index 77% rename from packages/pds/src/user-db/index.ts rename to packages/pds/src/actor-store/actor-db/index.ts index 0be4d682929..9c23a7b096f 100644 --- a/packages/pds/src/user-db/index.ts +++ b/packages/pds/src/actor-store/actor-db/index.ts @@ -1,4 +1,5 @@ import assert from 'assert' +import path from 'path' import { Kysely, SqliteDialect, @@ -14,38 +15,38 @@ import SqliteDB from 'better-sqlite3' import { DatabaseSchema } from './schema' import * as migrations from './migrations' import { CtxMigrationProvider } from './migrations/provider' +export * from './schema' -export class UserDb { +type CommitHook = () => void + +export class ActorDb { migrator: Migrator destroyed = false + commitHooks: CommitHook[] = [] - constructor(public db: Kysely) { + constructor(public did: string, public db: Kysely) { this.migrator = new Migrator({ db, provider: new CtxMigrationProvider(migrations), }) } - static sqlite(location: string): UserDb { + static sqlite(location: string, did: string): ActorDb { const db = new Kysely({ dialect: new SqliteDialect({ - database: new SqliteDB(location), + database: new SqliteDB(path.join(location, did)), }), }) - return new UserDb(db) - } - - static memory(): UserDb { - return UserDb.sqlite(':memory:') + return new ActorDb(did, db) } - async transaction(fn: (db: UserDb) => Promise): Promise { + async transaction(fn: (db: ActorDb) => Promise): Promise { const leakyTxPlugin = new LeakyTxPlugin() - return this.db + const { hooks, txRes } = await this.db .withPlugin(leakyTxPlugin) .transaction() .execute(async (txn) => { - const dbTxn = new UserDb(txn) + const dbTxn = new ActorDb(this.did, txn) const txRes = await fn(dbTxn) .catch(async (err) => { leakyTxPlugin.endTx() @@ -54,8 +55,16 @@ export class UserDb { throw err }) .finally(() => leakyTxPlugin.endTx()) - return txRes + const hooks = dbTxn.commitHooks + return { hooks, txRes } }) + hooks.map((hook) => hook()) + return txRes + } + + onCommit(fn: () => void) { + this.assertTransaction() + this.commitHooks.push(fn) } get isTransaction() { diff --git a/packages/pds/src/user-db/migrations/20230613T164932261Z-init.ts b/packages/pds/src/actor-store/actor-db/migrations/20230613T164932261Z-init.ts similarity index 100% rename from packages/pds/src/user-db/migrations/20230613T164932261Z-init.ts rename to packages/pds/src/actor-store/actor-db/migrations/20230613T164932261Z-init.ts diff --git a/packages/pds/src/user-db/migrations/index.ts b/packages/pds/src/actor-store/actor-db/migrations/index.ts similarity index 100% rename from packages/pds/src/user-db/migrations/index.ts rename to packages/pds/src/actor-store/actor-db/migrations/index.ts diff --git a/packages/pds/src/user-db/migrations/provider.ts b/packages/pds/src/actor-store/actor-db/migrations/provider.ts similarity index 100% rename from packages/pds/src/user-db/migrations/provider.ts rename to packages/pds/src/actor-store/actor-db/migrations/provider.ts diff --git a/packages/pds/src/user-db/schema/backlink.ts b/packages/pds/src/actor-store/actor-db/schema/backlink.ts similarity index 100% rename from packages/pds/src/user-db/schema/backlink.ts rename to packages/pds/src/actor-store/actor-db/schema/backlink.ts diff --git a/packages/pds/src/user-db/schema/blob.ts b/packages/pds/src/actor-store/actor-db/schema/blob.ts similarity index 100% rename from packages/pds/src/user-db/schema/blob.ts rename to packages/pds/src/actor-store/actor-db/schema/blob.ts diff --git a/packages/pds/src/user-db/schema/index.ts b/packages/pds/src/actor-store/actor-db/schema/index.ts similarity index 60% rename from packages/pds/src/user-db/schema/index.ts rename to packages/pds/src/actor-store/actor-db/schema/index.ts index bebaf302b6e..1d527873d5d 100644 --- a/packages/pds/src/user-db/schema/index.ts +++ b/packages/pds/src/actor-store/actor-db/schema/index.ts @@ -6,6 +6,14 @@ import * as ipldBlock from './ipld-block' import * as blob from './blob' import * as repoBlob from './repo-blob' +export type { UserPref } from './user-pref' +export type { RepoRoot } from './repo-root' +export type { Record } from './record' +export type { Backlink } from './backlink' +export type { IpldBlock } from './ipld-block' +export type { Blob } from './blob' +export type { RepoBlob } from './repo-blob' + export type DatabaseSchema = userPref.PartialDB & repoRoot.PartialDB & record.PartialDB & diff --git a/packages/pds/src/user-db/schema/ipld-block.ts b/packages/pds/src/actor-store/actor-db/schema/ipld-block.ts similarity index 100% rename from packages/pds/src/user-db/schema/ipld-block.ts rename to packages/pds/src/actor-store/actor-db/schema/ipld-block.ts diff --git a/packages/pds/src/user-db/schema/record.ts b/packages/pds/src/actor-store/actor-db/schema/record.ts similarity index 100% rename from packages/pds/src/user-db/schema/record.ts rename to packages/pds/src/actor-store/actor-db/schema/record.ts diff --git a/packages/pds/src/user-db/schema/repo-blob.ts b/packages/pds/src/actor-store/actor-db/schema/repo-blob.ts similarity index 100% rename from packages/pds/src/user-db/schema/repo-blob.ts rename to packages/pds/src/actor-store/actor-db/schema/repo-blob.ts diff --git a/packages/pds/src/user-db/schema/repo-root.ts b/packages/pds/src/actor-store/actor-db/schema/repo-root.ts similarity index 82% rename from packages/pds/src/user-db/schema/repo-root.ts rename to packages/pds/src/actor-store/actor-db/schema/repo-root.ts index 8f4f1122059..afe41ef0e30 100644 --- a/packages/pds/src/user-db/schema/repo-root.ts +++ b/packages/pds/src/actor-store/actor-db/schema/repo-root.ts @@ -5,6 +5,6 @@ export interface RepoRoot { indexedAt: string } -export const tableName = 'repo_root' +const tableName = 'repo_root' export type PartialDB = { [tableName]: RepoRoot } diff --git a/packages/pds/src/user-db/schema/user-pref.ts b/packages/pds/src/actor-store/actor-db/schema/user-pref.ts similarity index 100% rename from packages/pds/src/user-db/schema/user-pref.ts rename to packages/pds/src/actor-store/actor-db/schema/user-pref.ts diff --git a/packages/pds/src/actor-store/index.ts b/packages/pds/src/actor-store/index.ts new file mode 100644 index 00000000000..04d2bbc5957 --- /dev/null +++ b/packages/pds/src/actor-store/index.ts @@ -0,0 +1,94 @@ +import { AtpAgent } from '@atproto/api' +import * as crypto from '@atproto/crypto' +import { BlobStore } from '@atproto/repo' +import { ActorDb } from './actor-db' +import { ActorRepo } from './repo' +import { ActorRecord } from './record' +import { ActorLocal } from './local' +import { ActorPreference } from './preference' +import { BackgroundQueue } from '../background' + +type ActorStoreReaderResources = { + repoSigningKey: crypto.Keypair + pdsHostname: string + appViewAgent?: AtpAgent + appViewDid?: string + appViewCdnUrlPattern?: string +} + +type ActorStoreResources = ActorStoreReaderResources & { + blobstore: BlobStore + backgroundQueue: BackgroundQueue +} + +export const createActorStore = ( + resources: ActorStoreResources, +): ActorStore => { + return { + reader: (did: string) => { + const db = ActorDb.sqlite('', did) + return createActorReader(db, resources) + }, + transact: (did: string, fn: ActorStoreTransactFn) => { + const db = ActorDb.sqlite('', did) + return db.transaction((dbTxn) => { + const store = createActorTransactor(dbTxn, resources) + return fn(store) + }) + }, + } +} + +const createActorTransactor = ( + db: ActorDb, + resources: ActorStoreResources, +): ActorStoreTransactor => { + const { repoSigningKey, blobstore, backgroundQueue } = resources + const reader = createActorReader(db, resources) + return { + ...reader, + repo: new ActorRepo(db, repoSigningKey, blobstore, backgroundQueue), + } +} + +const createActorReader = ( + db: ActorDb, + resources: ActorStoreReaderResources, +): ActorStoreReader => { + const { + repoSigningKey, + pdsHostname, + appViewAgent, + appViewDid, + appViewCdnUrlPattern, + } = resources + return { + record: new ActorRecord(db), + local: new ActorLocal( + db, + repoSigningKey, + pdsHostname, + appViewAgent, + appViewDid, + appViewCdnUrlPattern, + ), + pref: new ActorPreference(db), + } +} + +export type ActorStore = { + reader: (did: string) => ActorStoreReader + transact: (did: string, store: ActorStoreTransactFn) => Promise +} + +export type ActorStoreTransactFn = (fn: ActorStoreTransactor) => Promise + +export type ActorStoreTransactor = ActorStoreReader & { + repo: ActorRepo +} + +export type ActorStoreReader = { + record: ActorRecord + local: ActorLocal + pref: ActorPreference +} diff --git a/packages/pds/src/user-store/local.ts b/packages/pds/src/actor-store/local.ts similarity index 98% rename from packages/pds/src/user-store/local.ts rename to packages/pds/src/actor-store/local.ts index 4286db6425d..754e14deacb 100644 --- a/packages/pds/src/user-store/local.ts +++ b/packages/pds/src/actor-store/local.ts @@ -31,13 +31,13 @@ import { import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' import { createServiceAuthHeaders } from '@atproto/xrpc-server' -import { UserDb } from '../user-db' +import { ActorDb } from './actor-db' type CommonSignedUris = 'avatar' | 'banner' | 'feed_thumbnail' | 'feed_fullsize' -export class LocalService { +export class ActorLocal { constructor( - public db: UserDb, + public db: ActorDb, public signingKey: Keypair, public pdsHostname: string, public appViewAgent?: AtpAgent, @@ -52,8 +52,8 @@ export class LocalService { appviewDid?: string, appviewCdnUrlPattern?: string, ) { - return (db: UserDb) => - new LocalService( + return (db: ActorDb) => + new ActorLocal( db, signingKey, pdsHostname, diff --git a/packages/pds/src/user-store/preferences.ts b/packages/pds/src/actor-store/preference.ts similarity index 91% rename from packages/pds/src/user-store/preferences.ts rename to packages/pds/src/actor-store/preference.ts index 6c0f520f49f..de2c5cb2a9c 100644 --- a/packages/pds/src/user-store/preferences.ts +++ b/packages/pds/src/actor-store/preference.ts @@ -1,11 +1,11 @@ import { InvalidRequestError } from '@atproto/xrpc-server' -import { UserDb } from '../user-db' +import { ActorDb } from './actor-db' -export class PreferencesService { - constructor(public db: UserDb) {} +export class ActorPreference { + constructor(public db: ActorDb) {} static creator() { - return (db: UserDb) => new PreferencesService(db) + return (db: ActorDb) => new ActorPreference(db) } async getPreferences( diff --git a/packages/pds/src/actor-store/reader.ts b/packages/pds/src/actor-store/reader.ts new file mode 100644 index 00000000000..64740930424 --- /dev/null +++ b/packages/pds/src/actor-store/reader.ts @@ -0,0 +1 @@ +export class ActorStoreReader {} diff --git a/packages/pds/src/user-store/record.ts b/packages/pds/src/actor-store/record.ts similarity index 55% rename from packages/pds/src/user-store/record.ts rename to packages/pds/src/actor-store/record.ts index a1a36417037..bd119412a40 100644 --- a/packages/pds/src/user-store/record.ts +++ b/packages/pds/src/actor-store/record.ts @@ -1,91 +1,22 @@ -import { CID } from 'multiformats/cid' import { AtUri, ensureValidAtUri } from '@atproto/syntax' -import * as ident from '@atproto/syntax' -import { cborToLexRecord, WriteOpAction } from '@atproto/repo' -import { dbLogger as log } from '../logger' -import { notSoftDeletedClause } from '../user-db/util' -import { Backlink } from '../user-db/tables/backlink' +import * as syntax from '@atproto/syntax' +import { cborToLexRecord } from '@atproto/repo' +import { notSoftDeletedClause } from '../db/util' import { ids } from '../lexicon/lexicons' -import { UserDb } from '../user-db' +import { ActorDb, Backlink } from './actor-db' +import { prepareDelete } from '../repo' -export class RecordService { - constructor(public db: UserDb) {} +export class ActorRecord { + constructor(public db: ActorDb) {} static creator() { - return (db: UserDb) => new RecordService(db) + return (db: ActorDb) => new ActorRecord(db) } - async indexRecord( - uri: AtUri, - cid: CID, - obj: unknown, - action: WriteOpAction.Create | WriteOpAction.Update = WriteOpAction.Create, - repoRev?: string, - timestamp?: string, - ) { - this.db.assertTransaction() - log.debug({ uri }, 'indexing record') - const record = { - uri: uri.toString(), - cid: cid.toString(), - did: uri.host, - collection: uri.collection, - rkey: uri.rkey, - repoRev: repoRev ?? null, - indexedAt: timestamp || new Date().toISOString(), - } - if (!record.did.startsWith('did:')) { - throw new Error('Expected indexed URI to contain DID') - } else if (record.collection.length < 1) { - throw new Error('Expected indexed URI to contain a collection') - } else if (record.rkey.length < 1) { - throw new Error('Expected indexed URI to contain a record key') - } - - // Track current version of record - await this.db.db - .insertInto('record') - .values(record) - .onConflict((oc) => - oc.column('uri').doUpdateSet({ - cid: record.cid, - repoRev: repoRev ?? null, - indexedAt: record.indexedAt, - }), - ) - .execute() - - // Maintain backlinks - const backlinks = getBacklinks(uri, obj) - if (action === WriteOpAction.Update) { - // On update just recreate backlinks from scratch for the record, so we can clear out - // the old ones. E.g. for weird cases like updating a follow to be for a different did. - await this.removeBacklinksByUri(uri) - } - await this.addBacklinks(backlinks) - - log.info({ uri }, 'indexed record') - } - - async deleteRecord(uri: AtUri) { - this.db.assertTransaction() - log.debug({ uri }, 'deleting indexed record') - const deleteQuery = this.db.db - .deleteFrom('record') - .where('uri', '=', uri.toString()) - const backlinkQuery = this.db.db - .deleteFrom('backlink') - .where('uri', '=', uri.toString()) - await Promise.all([deleteQuery.execute(), backlinkQuery.execute()]) - - log.info({ uri }, 'deleted indexed record') - } - - async listCollectionsForDid(did: string): Promise { + async listCollections(): Promise { const collections = await this.db.db .selectFrom('record') .select('collection') - .where('did', '=', did) .groupBy('collection') .execute() @@ -93,7 +24,6 @@ export class RecordService { } async listRecordsForCollection(opts: { - did: string collection: string limit: number reverse: boolean @@ -103,7 +33,6 @@ export class RecordService { includeSoftDeleted?: boolean }): Promise<{ uri: string; cid: string; value: object }[]> { const { - did, collection, limit, reverse, @@ -116,12 +45,7 @@ export class RecordService { const { ref } = this.db.db.dynamic let builder = this.db.db .selectFrom('record') - .innerJoin('ipld_block', (join) => - join - .onRef('ipld_block.cid', '=', 'record.cid') - .on('ipld_block.creator', '=', did), - ) - .where('record.did', '=', did) + .innerJoin('ipld_block', 'ipld_block.cid', 'record.cid') .where('record.collection', '=', collection) .if(!includeSoftDeleted, (qb) => qb.where(notSoftDeletedClause(ref('record'))), @@ -169,11 +93,7 @@ export class RecordService { const { ref } = this.db.db.dynamic let builder = this.db.db .selectFrom('record') - .innerJoin('ipld_block', (join) => - join - .onRef('ipld_block.cid', '=', 'record.cid') - .on('ipld_block.creator', '=', uri.host), - ) + .innerJoin('ipld_block', 'ipld_block.cid', 'record.cid') .where('record.uri', '=', uri.toString()) .selectAll() .if(!includeSoftDeleted, (qb) => @@ -213,35 +133,12 @@ export class RecordService { return !!record } - async deleteForActor(did: string) { - // Not done in transaction because it would be too long, prone to contention. - // Also, this can safely be run multiple times if it fails. - await this.db.db.deleteFrom('record').where('did', '=', did).execute() - } - - async removeBacklinksByUri(uri: AtUri) { - await this.db.db - .deleteFrom('backlink') - .where('uri', '=', uri.toString()) - .execute() - } - - async addBacklinks(backlinks: Backlink[]) { - if (backlinks.length === 0) return - await this.db.db - .insertInto('backlink') - .values(backlinks) - .onConflict((oc) => oc.doNothing()) - .execute() - } - async getRecordBacklinks(opts: { - did: string collection: string path: string linkTo: string }) { - const { did, collection, path, linkTo } = opts + const { collection, path, linkTo } = opts return await this.db.db .selectFrom('record') .innerJoin('backlink', 'backlink.uri', 'record.uri') @@ -252,17 +149,37 @@ export class RecordService { .if(!linkTo.startsWith('at://'), (q) => q.where('backlink.linkToDid', '=', linkTo), ) - .where('record.did', '=', did) .where('record.collection', '=', collection) .selectAll('record') .execute() } + + // @NOTE this logic a placeholder until we allow users to specify these constraints themselves. + // Ensures that we don't end-up with duplicate likes, reposts, and follows from race conditions. + + async getBacklinkDeletions(uri: AtUri, record: unknown) { + const recordBacklinks = getBacklinks(uri, record) + const conflicts = await Promise.all( + recordBacklinks.map((backlink) => + this.getRecordBacklinks({ + collection: uri.collection, + path: backlink.path, + linkTo: backlink.linkToDid ?? backlink.linkToUri ?? '', + }), + ), + ) + return conflicts + .flat() + .map(({ rkey }) => + prepareDelete({ did: this.db.did, collection: uri.collection, rkey }), + ) + } } // @NOTE in the future this can be replaced with a more generic routine that pulls backlinks based on lex docs. // For now we just want to ensure we're tracking links from follows, blocks, likes, and reposts. -function getBacklinks(uri: AtUri, record: unknown): Backlink[] { +export const getBacklinks = (uri: AtUri, record: unknown): Backlink[] => { if ( record?.['$type'] === ids.AppBskyGraphFollow || record?.['$type'] === ids.AppBskyGraphBlock @@ -272,7 +189,7 @@ function getBacklinks(uri: AtUri, record: unknown): Backlink[] { return [] } try { - ident.ensureValidDid(subject) + syntax.ensureValidDid(subject) } catch { return [] } diff --git a/packages/pds/src/user-store/blobs.ts b/packages/pds/src/actor-store/repo/blob.ts similarity index 81% rename from packages/pds/src/user-store/blobs.ts rename to packages/pds/src/actor-store/repo/blob.ts index fc445778f1f..c7d4020924a 100644 --- a/packages/pds/src/user-store/blobs.ts +++ b/packages/pds/src/actor-store/repo/blob.ts @@ -3,25 +3,24 @@ import crypto from 'crypto' import { CID } from 'multiformats/cid' import bytes from 'bytes' import { fromStream as fileTypeFromStream } from 'file-type' -import { BlobStore, CidSet, WriteOpAction } from '@atproto/repo' +import { BlobStore, WriteOpAction } from '@atproto/repo' import { AtUri } from '@atproto/syntax' import { cloneStream, sha256RawToCid, streamSize } from '@atproto/common' import { InvalidRequestError } from '@atproto/xrpc-server' import { BlobRef } from '@atproto/lexicon' -import { UserDb } from '../user-db' +import { ActorDb, Blob as BlobTable } from '../actor-db' import { PreparedBlobRef, PreparedWrite, PreparedDelete, PreparedUpdate, -} from '../repo/types' -import { Blob as BlobTable } from '../user-db/tables/blob' -import * as img from '../image' -import { BackgroundQueue } from '../background' +} from '../../repo/types' +import * as img from '../../image' +import { BackgroundQueue } from '../../background' -export class Blobs { +export class ActorBlob { constructor( - public db: UserDb, + public db: ActorDb, public blobstore: BlobStore, public backgroundQueue: BackgroundQueue, ) {} @@ -62,8 +61,8 @@ export class Blobs { return new BlobRef(cid, mimeType, size) } - async processWriteBlobs(did: string, rev: string, writes: PreparedWrite[]) { - await this.deleteDereferencedBlobs(did, writes) + async processWriteBlobs(rev: string, writes: PreparedWrite[]) { + await this.deleteDereferencedBlobs(writes) const blobPromises: Promise[] = [] for (const write of writes) { @@ -72,15 +71,15 @@ export class Blobs { write.action === WriteOpAction.Update ) { for (const blob of write.blobs) { - blobPromises.push(this.verifyBlobAndMakePermanent(did, blob)) - blobPromises.push(this.associateBlob(blob, write.uri, rev, did)) + blobPromises.push(this.verifyBlobAndMakePermanent(blob)) + blobPromises.push(this.associateBlob(blob, write.uri, rev)) } } } await Promise.all(blobPromises) } - async deleteDereferencedBlobs(did: string, writes: PreparedWrite[]) { + async deleteDereferencedBlobs(writes: PreparedWrite[]) { const deletes = writes.filter( (w) => w.action === WriteOpAction.Delete, ) as PreparedDelete[] @@ -92,7 +91,6 @@ export class Blobs { const deletedRepoBlobs = await this.db.db .deleteFrom('repo_blob') - .where('did', '=', did) .where('recordUri', 'in', uris) .returningAll() .execute() @@ -101,7 +99,6 @@ export class Blobs { const deletedRepoBlobCids = deletedRepoBlobs.map((row) => row.cid) const duplicateCids = await this.db.db .selectFrom('repo_blob') - .where('did', '=', did) .where('cid', 'in', deletedRepoBlobCids) .select('cid') .execute() @@ -135,25 +132,18 @@ export class Blobs { const stillUsed = stillUsedRes.map((row) => row.cid) const blobsToDelete = cidsToDelete.filter((cid) => !stillUsed.includes(cid)) - - // @TODO FIX ME - - // move actual blob deletion to the background queue - // if (blobsToDelete.length > 0) { - // this.db.onCommit(() => { - // this.backgroundQueue.add(async () => { - // await Promise.allSettled( - // blobsToDelete.map((cid) => this.blobstore.delete(CID.parse(cid))), - // ) - // }) - // }) - // } + if (blobsToDelete.length > 0) { + this.db.onCommit(() => { + this.backgroundQueue.add(async () => { + await Promise.allSettled( + blobsToDelete.map((cid) => this.blobstore.delete(CID.parse(cid))), + ) + }) + }) + } } - async verifyBlobAndMakePermanent( - creator: string, - blob: PreparedBlobRef, - ): Promise { + async verifyBlobAndMakePermanent(blob: PreparedBlobRef): Promise { const { ref } = this.db.db.dynamic const found = await this.db.db .selectFrom('blob') @@ -189,7 +179,6 @@ export class Blobs { blob: PreparedBlobRef, recordUri: AtUri, repoRev: string, - did: string, ): Promise { await this.db.db .insertInto('repo_blob') @@ -197,30 +186,16 @@ export class Blobs { cid: blob.cid.toString(), recordUri: recordUri.toString(), repoRev, - did, }) .onConflict((oc) => oc.doNothing()) .execute() } - async listSinceRev(did: string, rev?: string): Promise { - let builder = this.db.db - .selectFrom('repo_blob') - .where('did', '=', did) - .select('cid') - if (rev) { - builder = builder.where('repoRev', '>', rev) - } - const res = await builder.execute() - const cids = res.map((row) => CID.parse(row.cid)) - return new CidSet(cids).toList() - } - - async deleteForUser(did: string): Promise { + async deleteAll(): Promise { // Not done in transaction because it would be too long, prone to contention. // Also, this can safely be run multiple times if it fails. const deleted = await this.db.db.deleteFrom('blob').returningAll().execute() - await this.db.db.deleteFrom('repo_blob').where('did', '=', did).execute() + await this.db.db.deleteFrom('repo_blob').execute() const deletedCids = deleted.map((d) => d.cid) let duplicateCids: string[] = [] if (deletedCids.length > 0) { diff --git a/packages/pds/src/actor-store/repo/index.ts b/packages/pds/src/actor-store/repo/index.ts new file mode 100644 index 00000000000..1e1d1c43d32 --- /dev/null +++ b/packages/pds/src/actor-store/repo/index.ts @@ -0,0 +1,4 @@ +export * from './repo' +export * from './record-transactor' +export * from './sql-repo-storage' +export * from './blob' diff --git a/packages/pds/src/actor-store/repo/record-transactor.ts b/packages/pds/src/actor-store/repo/record-transactor.ts new file mode 100644 index 00000000000..fcafb5b4807 --- /dev/null +++ b/packages/pds/src/actor-store/repo/record-transactor.ts @@ -0,0 +1,96 @@ +import { CID } from 'multiformats/cid' +import { AtUri } from '@atproto/syntax' +import { WriteOpAction } from '@atproto/repo' +import { dbLogger as log } from '../../logger' +import { Backlink } from '../actor-db' +import { ActorRecord, getBacklinks } from '../record' + +export class ActorRecordTransactor extends ActorRecord { + async indexRecord( + uri: AtUri, + cid: CID, + obj: unknown, + action: WriteOpAction.Create | WriteOpAction.Update = WriteOpAction.Create, + repoRev?: string, + timestamp?: string, + ) { + this.db.assertTransaction() + log.debug({ uri }, 'indexing record') + const record = { + uri: uri.toString(), + cid: cid.toString(), + did: uri.host, + collection: uri.collection, + rkey: uri.rkey, + repoRev: repoRev ?? null, + indexedAt: timestamp || new Date().toISOString(), + } + if (!record.did.startsWith('did:')) { + throw new Error('Expected indexed URI to contain DID') + } else if (record.collection.length < 1) { + throw new Error('Expected indexed URI to contain a collection') + } else if (record.rkey.length < 1) { + throw new Error('Expected indexed URI to contain a record key') + } + + // Track current version of record + await this.db.db + .insertInto('record') + .values(record) + .onConflict((oc) => + oc.column('uri').doUpdateSet({ + cid: record.cid, + repoRev: repoRev ?? null, + indexedAt: record.indexedAt, + }), + ) + .execute() + + // Maintain backlinks + const backlinks = getBacklinks(uri, obj) + if (action === WriteOpAction.Update) { + // On update just recreate backlinks from scratch for the record, so we can clear out + // the old ones. E.g. for weird cases like updating a follow to be for a different did. + await this.removeBacklinksByUri(uri) + } + await this.addBacklinks(backlinks) + + log.info({ uri }, 'indexed record') + } + + async deleteRecord(uri: AtUri) { + this.db.assertTransaction() + log.debug({ uri }, 'deleting indexed record') + const deleteQuery = this.db.db + .deleteFrom('record') + .where('uri', '=', uri.toString()) + const backlinkQuery = this.db.db + .deleteFrom('backlink') + .where('uri', '=', uri.toString()) + await Promise.all([deleteQuery.execute(), backlinkQuery.execute()]) + + log.info({ uri }, 'deleted indexed record') + } + + async deleteForActor(_did: string) { + // Not done in transaction because it would be too long, prone to contention. + // Also, this can safely be run multiple times if it fails. + await this.db.db.deleteFrom('record').execute() + } + + async removeBacklinksByUri(uri: AtUri) { + await this.db.db + .deleteFrom('backlink') + .where('uri', '=', uri.toString()) + .execute() + } + + async addBacklinks(backlinks: Backlink[]) { + if (backlinks.length === 0) return + await this.db.db + .insertInto('backlink') + .values(backlinks) + .onConflict((oc) => oc.doNothing()) + .execute() + } +} diff --git a/packages/pds/src/user-store/repo.ts b/packages/pds/src/actor-store/repo/repo.ts similarity index 63% rename from packages/pds/src/user-store/repo.ts rename to packages/pds/src/actor-store/repo/repo.ts index d440ae76c99..cf26bc01804 100644 --- a/packages/pds/src/user-store/repo.ts +++ b/packages/pds/src/actor-store/repo/repo.ts @@ -9,130 +9,73 @@ import { BadRecordSwapError, PreparedCreate, PreparedWrite, -} from '../repo/types' -import { Blobs } from './blobs' -import { createWriteToOp, writeToOp } from '../repo' -import { wait } from '@atproto/common' -import { BackgroundQueue } from '../background' -import { Crawlers } from '../crawlers' -import { UserDb } from '../user-db' -import { RecordService } from './record' +} from '../../repo/types' +import { ActorBlob } from './blob' +import { createWriteToOp, writeToOp } from '../../repo' +import { BackgroundQueue } from '../../background' +import { ActorDb } from '../actor-db' +import { ActorRecordTransactor } from './record-transactor' -export class RepoService { - blobs: Blobs - record: RecordService +export class ActorRepo { + blobs: ActorBlob + record: ActorRecordTransactor constructor( - public db: UserDb, + public db: ActorDb, public repoSigningKey: crypto.Keypair, public blobstore: BlobStore, public backgroundQueue: BackgroundQueue, - public crawlers: Crawlers, ) { - this.blobs = new Blobs(db, blobstore, backgroundQueue) - this.record = new RecordService(db) + this.blobs = new ActorBlob(db, blobstore, backgroundQueue) + this.record = new ActorRecordTransactor(db) } static creator( keypair: crypto.Keypair, blobstore: BlobStore, backgroundQueue: BackgroundQueue, - crawlers: Crawlers, ) { - return (db: UserDb) => - new RepoService(db, keypair, blobstore, backgroundQueue, crawlers) + return (db: ActorDb) => + new ActorRepo(db, keypair, blobstore, backgroundQueue) } - private async serviceTx( - fn: (srvc: RepoService) => Promise, - ): Promise { - this.db.assertNotTransaction() - return this.db.transaction((dbTxn) => { - const srvc = new RepoService( - dbTxn, - this.repoSigningKey, - this.blobstore, - this.backgroundQueue, - this.crawlers, - ) - return fn(srvc) - }) - } - - async createRepo(did: string, writes: PreparedCreate[], now: string) { + async createRepo(writes: PreparedCreate[], now: string) { this.db.assertTransaction() - const storage = new SqlRepoStorage(this.db, did, now) + const storage = new SqlRepoStorage(this.db, now) const writeOps = writes.map(createWriteToOp) const commit = await Repo.formatInitCommit( storage, - did, + this.db.did, this.repoSigningKey, writeOps, ) await Promise.all([ storage.applyCommit(commit), this.indexWrites(writes, now), - this.blobs.processWriteBlobs(did, commit.rev, writes), + this.blobs.processWriteBlobs(commit.rev, writes), ]) // await this.afterWriteProcessing(did, commit, writes) } - async processCommit( - did: string, - writes: PreparedWrite[], - commitData: CommitData, - now: string, - ) { + async processWrites(writes: PreparedWrite[], swapCommitCid?: CID) { this.db.assertTransaction() - const storage = new SqlRepoStorage(this.db, did, now) + const now = new Date().toISOString() + const storage = new SqlRepoStorage(this.db, now) + const commit = await this.formatCommit(storage, writes, swapCommitCid) await Promise.all([ // persist the commit to repo storage - storage.applyCommit(commitData), + storage.applyCommit(commit), // & send to indexing - this.indexWrites(writes, now, commitData.rev), + this.indexWrites(writes, now, commit.rev), // process blobs - this.blobs.processWriteBlobs(did, commitData.rev, writes), + this.blobs.processWriteBlobs(commit.rev, writes), // do any other processing needed after write ]) // await this.afterWriteProcessing(did, commitData, writes) } - async processWrites( - toWrite: { did: string; writes: PreparedWrite[]; swapCommitCid?: CID }, - times: number, - timeout = 100, - prevStorage?: SqlRepoStorage, - ) { - this.db.assertNotTransaction() - const { did, writes, swapCommitCid } = toWrite - // we may have some useful cached blocks in the storage, so re-use the previous instance - const storage = prevStorage ?? new SqlRepoStorage(this.db, did) - try { - const commit = await this.formatCommit( - storage, - did, - writes, - swapCommitCid, - ) - await this.serviceTx(async (srvcTx) => - srvcTx.processCommit(did, writes, commit, new Date().toISOString()), - ) - } catch (err) { - if (err instanceof ConcurrentWriteError) { - if (times <= 1) { - throw err - } - await wait(timeout) - return this.processWrites(toWrite, times - 1, timeout, storage) - } else { - throw err - } - } - } - async formatCommit( storage: SqlRepoStorage, - did: string, writes: PreparedWrite[], swapCommit?: CID, ): Promise { @@ -140,9 +83,7 @@ export class RepoService { // we just check if it is currently held by another txn const currRoot = await storage.getRootDetailed() if (!currRoot) { - throw new InvalidRequestError( - `${did} is not a registered repo on this server`, - ) + throw new InvalidRequestError(`No repo root found for ${this.db.did}`) } if (swapCommit && !currRoot.cid.equals(swapCommit)) { throw new BadCommitSwapError(currRoot.cid) @@ -178,24 +119,12 @@ export class RepoService { } } - let commit: CommitData - try { - const repo = await Repo.load(storage, currRoot.cid) - const writeOps = writes.map(writeToOp) - commit = await repo.formatCommit(writeOps, this.repoSigningKey) - } catch (err) { - // if an error occurs, check if it is attributable to a concurrent write - const curr = await storage.getRoot() - if (!currRoot.cid.equals(curr)) { - throw new ConcurrentWriteError() - } else { - throw err - } - } + const repo = await Repo.load(storage, currRoot.cid) + const writeOps = writes.map(writeToOp) + const commit = await repo.formatCommit(writeOps, this.repoSigningKey) // find blocks that would be deleted but are referenced by another record const dupeRecordCids = await this.getDuplicateRecordCids( - did, commit.removedCids.toList(), delAndUpdateUris, ) @@ -237,7 +166,6 @@ export class RepoService { } async getDuplicateRecordCids( - did: string, cids: CID[], touchedUris: AtUri[], ): Promise { @@ -280,9 +208,3 @@ export class RepoService { // await this.blobs.deleteForUser(did) } } - -export class ConcurrentWriteError extends Error { - constructor() { - super('too many concurrent writes') - } -} diff --git a/packages/pds/src/user-store/sql-repo-storage.ts b/packages/pds/src/actor-store/repo/sql-repo-storage.ts similarity index 97% rename from packages/pds/src/user-store/sql-repo-storage.ts rename to packages/pds/src/actor-store/repo/sql-repo-storage.ts index 4a12686b782..42ec07c9acf 100644 --- a/packages/pds/src/user-store/sql-repo-storage.ts +++ b/packages/pds/src/actor-store/repo/sql-repo-storage.ts @@ -8,18 +8,13 @@ import { } from '@atproto/repo' import { chunkArray } from '@atproto/common' import { CID } from 'multiformats/cid' -import { UserDb } from '../user-db' -import { IpldBlock } from '../user-db/tables/ipld-block' +import { ActorDb, IpldBlock } from '../actor-db' import { sql } from 'kysely' export class SqlRepoStorage extends ReadableBlockstore implements RepoStorage { cache: BlockMap = new BlockMap() - constructor( - public db: UserDb, - public did: string, - public timestamp?: string, - ) { + constructor(public db: ActorDb, public timestamp?: string) { super() } diff --git a/packages/pds/src/api/com/atproto/repo/applyWrites.ts b/packages/pds/src/api/com/atproto/repo/applyWrites.ts index d5be8bb720d..2e1dd77e500 100644 --- a/packages/pds/src/api/com/atproto/repo/applyWrites.ts +++ b/packages/pds/src/api/com/atproto/repo/applyWrites.ts @@ -14,7 +14,6 @@ import { PreparedWrite, } from '../../../../repo' import AppContext from '../../../../context' -import { ConcurrentWriteError } from '../../../../services/repo' const ratelimitPoints = ({ input }: { input: HandlerInput }) => { let points = 0 @@ -108,19 +107,17 @@ export default function (server: Server, ctx: AppContext) { const swapCommitCid = swapCommit ? CID.parse(swapCommit) : undefined - try { - await ctx.services - .repo(ctx.db) - .processWrites({ did, writes, swapCommitCid }, 10) - } catch (err) { - if (err instanceof BadCommitSwapError) { - throw new InvalidRequestError(err.message, 'InvalidSwap') - } else if (err instanceof ConcurrentWriteError) { - throw new InvalidRequestError(err.message, 'ConcurrentWrites') - } else { - throw err + await ctx.actorStore.transact(did, async (actorTxn) => { + try { + await actorTxn.repo.processWrites(writes, swapCommitCid) + } catch (err) { + if (err instanceof BadCommitSwapError) { + throw new InvalidRequestError(err.message, 'InvalidSwap') + } else { + throw err + } } - } + }) }, }) } diff --git a/packages/pds/src/api/com/atproto/repo/createRecord.ts b/packages/pds/src/api/com/atproto/repo/createRecord.ts index 26bc5614785..58bbe411d02 100644 --- a/packages/pds/src/api/com/atproto/repo/createRecord.ts +++ b/packages/pds/src/api/com/atproto/repo/createRecord.ts @@ -6,12 +6,8 @@ import { BadCommitSwapError, InvalidRecordError, PreparedCreate, - prepareDelete, } from '../../../../repo' import AppContext from '../../../../context' -import { ids } from '../../../../lexicon/lexicons' -import Database from '../../../../db' -import { ConcurrentWriteError } from '../../../../services/repo' export default function (server: Server, ctx: AppContext) { server.com.atproto.repo.createRecord({ @@ -62,24 +58,20 @@ export default function (server: Server, ctx: AppContext) { throw err } - const backlinkDeletions = validate - ? await getBacklinkDeletions(ctx.db, ctx, write) - : [] - - const writes = [...backlinkDeletions, write] - - try { - await ctx.services - .repo(ctx.db) - .processWrites({ did, writes, swapCommitCid }, 10) - } catch (err) { - if (err instanceof BadCommitSwapError) { - throw new InvalidRequestError(err.message, 'InvalidSwap') - } else if (err instanceof ConcurrentWriteError) { - throw new InvalidRequestError(err.message, 'ConcurrentWrites') + await ctx.actorStore.transact(did, async (actorTxn) => { + const backlinkDeletions = validate + ? await actorTxn.record.getBacklinkDeletions(write.uri, write.record) + : [] + const writes = [...backlinkDeletions, write] + try { + await actorTxn.repo.processWrites(writes, swapCommitCid) + } catch (err) { + if (err instanceof BadCommitSwapError) { + throw new InvalidRequestError(err.message, 'InvalidSwap') + } + throw err } - throw err - } + }) return { encoding: 'application/json', @@ -88,50 +80,3 @@ export default function (server: Server, ctx: AppContext) { }, }) } - -// @NOTE this logic a placeholder until we allow users to specify these constraints themselves. -// Ensures that we don't end-up with duplicate likes, reposts, and follows from race conditions. - -async function getBacklinkDeletions( - tx: Database, - ctx: AppContext, - write: PreparedCreate, -) { - const recordTxn = ctx.services.record(tx) - const { - record, - uri: { host: did, collection }, - } = write - const toDelete = ({ rkey }: { rkey: string }) => - prepareDelete({ did, collection, rkey }) - - if ( - (collection === ids.AppBskyGraphFollow || - collection === ids.AppBskyGraphBlock) && - typeof record['subject'] === 'string' - ) { - const backlinks = await recordTxn.getRecordBacklinks({ - did, - collection, - path: 'subject', - linkTo: record['subject'], - }) - return backlinks.map(toDelete) - } - - if ( - (collection === ids.AppBskyFeedLike || - collection === ids.AppBskyFeedRepost) && - typeof record['subject']?.['uri'] === 'string' - ) { - const backlinks = await recordTxn.getRecordBacklinks({ - did, - collection, - path: 'subject.uri', - linkTo: record['subject']['uri'], - }) - return backlinks.map(toDelete) - } - - return [] -} diff --git a/packages/pds/src/api/com/atproto/repo/deleteRecord.ts b/packages/pds/src/api/com/atproto/repo/deleteRecord.ts index 99f171e0849..0137b5910b2 100644 --- a/packages/pds/src/api/com/atproto/repo/deleteRecord.ts +++ b/packages/pds/src/api/com/atproto/repo/deleteRecord.ts @@ -4,7 +4,6 @@ import { Server } from '../../../../lexicon' import AppContext from '../../../../context' import { BadCommitSwapError, BadRecordSwapError } from '../../../../repo' import { CID } from 'multiformats/cid' -import { ConcurrentWriteError } from '../../../../services/repo' export default function (server: Server, ctx: AppContext) { server.com.atproto.repo.deleteRecord({ @@ -41,31 +40,24 @@ export default function (server: Server, ctx: AppContext) { rkey, swapCid: swapRecordCid, }) - const record = await ctx.services - .record(ctx.db) - .getRecord(write.uri, null, true) - if (!record) { - return // No-op if record already doesn't exist - } - - const writes = [write] - - try { - await ctx.services - .repo(ctx.db) - .processWrites({ did, writes, swapCommitCid }, 10) - } catch (err) { - if ( - err instanceof BadCommitSwapError || - err instanceof BadRecordSwapError - ) { - throw new InvalidRequestError(err.message, 'InvalidSwap') - } else if (err instanceof ConcurrentWriteError) { - throw new InvalidRequestError(err.message, 'ConcurrentWrites') - } else { - throw err + await ctx.actorStore.transact(did, async (actorTxn) => { + const record = await actorTxn.record.getRecord(write.uri, null, true) + if (!record) { + return // No-op if record already doesn't exist } - } + try { + await actorTxn.repo.processWrites([write], swapCommitCid) + } catch (err) { + if ( + err instanceof BadCommitSwapError || + err instanceof BadRecordSwapError + ) { + throw new InvalidRequestError(err.message, 'InvalidSwap') + } else { + throw err + } + } + }) }, }) } diff --git a/packages/pds/src/api/com/atproto/repo/describeRepo.ts b/packages/pds/src/api/com/atproto/repo/describeRepo.ts index b340314ef77..a9d48e1c0b6 100644 --- a/packages/pds/src/api/com/atproto/repo/describeRepo.ts +++ b/packages/pds/src/api/com/atproto/repo/describeRepo.ts @@ -22,9 +22,9 @@ export default function (server: Server, ctx: AppContext) { const handle = id.getHandle(didDoc) const handleIsCorrect = handle === account.handle - const collections = await ctx.services - .record(ctx.db) - .listCollectionsForDid(account.did) + const collections = await ctx.actorStore + .reader(account.did) + .record.listCollections() return { encoding: 'application/json', diff --git a/packages/pds/src/api/com/atproto/repo/getRecord.ts b/packages/pds/src/api/com/atproto/repo/getRecord.ts index 5c99a7226c1..5b90eb8c00e 100644 --- a/packages/pds/src/api/com/atproto/repo/getRecord.ts +++ b/packages/pds/src/api/com/atproto/repo/getRecord.ts @@ -11,9 +11,9 @@ export default function (server: Server, ctx: AppContext) { // fetch from pds if available, if not then fetch from appview if (did) { const uri = AtUri.make(did, collection, rkey) - const record = await ctx.services - .record(ctx.db) - .getRecord(uri, cid || null) + const record = await ctx.actorStore + .reader(did) + .record.getRecord(uri, cid || null) if (!record || record.takedownId !== null) { throw new InvalidRequestError(`Could not locate record: ${uri}`) } diff --git a/packages/pds/src/api/com/atproto/repo/listRecords.ts b/packages/pds/src/api/com/atproto/repo/listRecords.ts index 8c2669ff010..7741fed3d36 100644 --- a/packages/pds/src/api/com/atproto/repo/listRecords.ts +++ b/packages/pds/src/api/com/atproto/repo/listRecords.ts @@ -20,15 +20,16 @@ export default function (server: Server, ctx: AppContext) { throw new InvalidRequestError(`Could not find repo: ${repo}`) } - const records = await ctx.services.record(ctx.db).listRecordsForCollection({ - did, - collection, - limit, - reverse, - cursor, - rkeyStart, - rkeyEnd, - }) + const records = await ctx.actorStore + .reader(did) + .record.listRecordsForCollection({ + collection, + limit, + reverse, + cursor, + rkeyStart, + rkeyEnd, + }) const lastRecord = records.at(-1) const lastUri = lastRecord && new AtUri(lastRecord?.uri) diff --git a/packages/pds/src/api/com/atproto/repo/putRecord.ts b/packages/pds/src/api/com/atproto/repo/putRecord.ts index 8fdcc776bb9..f86cf6635a1 100644 --- a/packages/pds/src/api/com/atproto/repo/putRecord.ts +++ b/packages/pds/src/api/com/atproto/repo/putRecord.ts @@ -11,7 +11,6 @@ import { PreparedCreate, PreparedUpdate, } from '../../../../repo' -import { ConcurrentWriteError } from '../../../../services/repo' export default function (server: Server, ctx: AppContext) { server.com.atproto.repo.putRecord({ @@ -57,48 +56,43 @@ export default function (server: Server, ctx: AppContext) { const swapRecordCid = typeof swapRecord === 'string' ? CID.parse(swapRecord) : swapRecord - const current = await ctx.services - .record(ctx.db) - .getRecord(uri, null, true) - const writeInfo = { - did, - collection, - rkey, - record, - swapCid: swapRecordCid, - validate, - } - - let write: PreparedCreate | PreparedUpdate - try { - write = current - ? await prepareUpdate(writeInfo) - : await prepareCreate(writeInfo) - } catch (err) { - if (err instanceof InvalidRecordError) { - throw new InvalidRequestError(err.message) + const write = await ctx.actorStore.transact(did, async (actorTxn) => { + const current = await actorTxn.record.getRecord(uri, null, true) + const writeInfo = { + did, + collection, + rkey, + record, + swapCid: swapRecordCid, + validate, } - throw err - } - - const writes = [write] - try { - await ctx.services - .repo(ctx.db) - .processWrites({ did, writes, swapCommitCid }, 10) - } catch (err) { - if ( - err instanceof BadCommitSwapError || - err instanceof BadRecordSwapError - ) { - throw new InvalidRequestError(err.message, 'InvalidSwap') - } else if (err instanceof ConcurrentWriteError) { - throw new InvalidRequestError(err.message, 'ConcurrentWrites') - } else { + let write: PreparedCreate | PreparedUpdate + try { + write = current + ? await prepareUpdate(writeInfo) + : await prepareCreate(writeInfo) + } catch (err) { + if (err instanceof InvalidRecordError) { + throw new InvalidRequestError(err.message) + } throw err } - } + + try { + await actorTxn.repo.processWrites([write], swapCommitCid) + } catch (err) { + if ( + err instanceof BadCommitSwapError || + err instanceof BadRecordSwapError + ) { + throw new InvalidRequestError(err.message, 'InvalidSwap') + } else { + throw err + } + } + return write + }) return { encoding: 'application/json', diff --git a/packages/pds/src/api/com/atproto/repo/uploadBlob.ts b/packages/pds/src/api/com/atproto/repo/uploadBlob.ts index b5a6eaecaef..f6e2addb6b2 100644 --- a/packages/pds/src/api/com/atproto/repo/uploadBlob.ts +++ b/packages/pds/src/api/com/atproto/repo/uploadBlob.ts @@ -6,9 +6,10 @@ export default function (server: Server, ctx: AppContext) { auth: ctx.accessVerifierCheckTakedown, handler: async ({ auth, input }) => { const requester = auth.credentials.did - const blob = await ctx.services - .repo(ctx.db) - .blobs.addUntetheredBlob(requester, input.encoding, input.body) + + const blob = await ctx.actorStore.transact(requester, (actorTxn) => { + return actorTxn.repo.blobs.addUntetheredBlob(input.encoding, input.body) + }) return { encoding: 'application/json', diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index a8f1e2003fe..82107f6f36d 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -21,11 +21,11 @@ import { Crawlers } from './crawlers' import { DiskBlobStore } from './storage' import { getRedisClient } from './redis' import { RuntimeFlags } from './runtime-flags' -import { UserDbCoordinator } from './user-db' +import { ActorStore, createActorStore } from './actor-store' export type AppContextOptions = { db: Database - userDb: UserDbCoordinator + actorStore: ActorStore blobstore: BlobStore mailer: ServerMailer moderationMailer: ModerationMailer @@ -48,7 +48,7 @@ export type AppContextOptions = { export class AppContext { public db: Database - public userDb: UserDbCoordinator + public actorStore: ActorStore public blobstore: BlobStore public mailer: ServerMailer public moderationMailer: ModerationMailer @@ -70,6 +70,7 @@ export class AppContext { constructor(opts: AppContextOptions) { this.db = opts.db + this.actorStore = opts.actorStore this.blobstore = opts.blobstore this.mailer = opts.mailer this.moderationMailer = opts.moderationMailer @@ -105,7 +106,6 @@ export class AppContext { poolMaxUses: cfg.db.pool.maxUses, poolIdleTimeoutMs: cfg.db.pool.idleTimeoutMs, }) - const userDb = new UserDbCoordinator('./users') const blobstore = cfg.blobstore.provider === 's3' ? new S3BlobStore({ bucket: cfg.blobstore.bucket }) @@ -181,7 +181,7 @@ export class AppContext { secrets.plcRotationKey.privateKeyHex, ) - const services = createServices({ + const actorStore = createActorStore({ repoSigningKey, blobstore, appViewAgent, @@ -189,12 +189,20 @@ export class AppContext { appViewDid: cfg.bskyAppView.did, appViewCdnUrlPattern: cfg.bskyAppView.cdnUrlPattern, backgroundQueue, - crawlers, + }) + + const services = createServices({ + repoSigningKey, + blobstore, + appViewAgent, + pdsHostname: cfg.service.hostname, + appViewDid: cfg.bskyAppView.did, + appViewCdnUrlPattern: cfg.bskyAppView.cdnUrlPattern, }) return new AppContext({ db, - userDb, + actorStore, blobstore, mailer, moderationMailer,