diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 328b61893a1..a8f1e2003fe 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -21,9 +21,11 @@ import { Crawlers } from './crawlers' import { DiskBlobStore } from './storage' import { getRedisClient } from './redis' import { RuntimeFlags } from './runtime-flags' +import { UserDbCoordinator } from './user-db' export type AppContextOptions = { db: Database + userDb: UserDbCoordinator blobstore: BlobStore mailer: ServerMailer moderationMailer: ModerationMailer @@ -46,6 +48,7 @@ export type AppContextOptions = { export class AppContext { public db: Database + public userDb: UserDbCoordinator public blobstore: BlobStore public mailer: ServerMailer public moderationMailer: ModerationMailer @@ -102,6 +105,7 @@ export class AppContext { poolMaxUses: cfg.db.pool.maxUses, poolIdleTimeoutMs: cfg.db.pool.idleTimeoutMs, }) + const userDb = new UserDbCoordinator('./users') const blobstore = cfg.blobstore.provider === 's3' ? new S3BlobStore({ bucket: cfg.blobstore.bucket }) @@ -190,6 +194,7 @@ export class AppContext { return new AppContext({ db, + userDb, blobstore, mailer, moderationMailer, diff --git a/packages/pds/src/db/database-schema.ts b/packages/pds/src/db/database-schema.ts index 26159418206..0c7d2e43183 100644 --- a/packages/pds/src/db/database-schema.ts +++ b/packages/pds/src/db/database-schema.ts @@ -1,17 +1,11 @@ import { Kysely } from 'kysely' import * as userAccount from './tables/user-account' -import * as userPref from './tables/user-pref' import * as didHandle from './tables/did-handle' import * as repoRoot from './tables/repo-root' import * as didCache from './tables/did-cache' import * as refreshToken from './tables/refresh-token' import * as appPassword from './tables/app-password' -import * as record from './tables/record' -import * as backlink from './tables/backlink' -import * as ipldBlock from './tables/ipld-block' import * as inviteCode from './tables/invite-code' -import * as blob from './tables/blob' -import * as repoBlob from './tables/repo-blob' import * as emailToken from './tables/email-token' import * as moderation from './tables/moderation' import * as repoSeq from './tables/repo-seq' @@ -21,18 +15,12 @@ import * as runtimeFlag from './tables/runtime-flag' export type DatabaseSchemaType = appMigration.PartialDB & runtimeFlag.PartialDB & userAccount.PartialDB & - userPref.PartialDB & didHandle.PartialDB & refreshToken.PartialDB & appPassword.PartialDB & repoRoot.PartialDB & didCache.PartialDB & - record.PartialDB & - backlink.PartialDB & - ipldBlock.PartialDB & inviteCode.PartialDB & - blob.PartialDB & - repoBlob.PartialDB & emailToken.PartialDB & moderation.PartialDB & repoSeq.PartialDB diff --git a/packages/pds/src/db/tables/user-account.ts b/packages/pds/src/db/tables/user-account.ts index 808663ca468..416e62f74b3 100644 --- a/packages/pds/src/db/tables/user-account.ts +++ b/packages/pds/src/db/tables/user-account.ts @@ -2,6 +2,7 @@ import { Generated, Selectable } from 'kysely' export interface UserAccount { did: string + handle: string | null email: string passwordScrypt: string createdAt: string diff --git a/packages/pds/src/services/account/index.ts b/packages/pds/src/services/account/index.ts index 9a6910d0e4f..c17d00a29fc 100644 --- a/packages/pds/src/services/account/index.ts +++ b/packages/pds/src/services/account/index.ts @@ -524,74 +524,8 @@ export class AccountService { } return res.did } - - async getPreferences( - did: string, - namespace?: string, - ): Promise { - const prefsRes = await this.db.db - .selectFrom('user_pref') - .where('did', '=', did) - .orderBy('id') - .selectAll() - .execute() - return prefsRes - .filter((pref) => !namespace || matchNamespace(namespace, pref.name)) - .map((pref) => JSON.parse(pref.valueJson)) - } - - async putPreferences( - did: string, - values: UserPreference[], - namespace: string, - ): Promise { - this.db.assertTransaction() - if (!values.every((value) => matchNamespace(namespace, value.$type))) { - throw new InvalidRequestError( - `Some preferences are not in the ${namespace} namespace`, - ) - } - // short-held row lock to prevent races - if (this.db.dialect === 'pg') { - await this.db.db - .selectFrom('user_account') - .selectAll() - .forUpdate() - .where('did', '=', did) - .executeTakeFirst() - } - // get all current prefs for user and prep new pref rows - const allPrefs = await this.db.db - .selectFrom('user_pref') - .where('did', '=', did) - .select(['id', 'name']) - .execute() - const putPrefs = values.map((value) => { - return { - did, - name: value.$type, - valueJson: JSON.stringify(value), - } - }) - const allPrefIdsInNamespace = allPrefs - .filter((pref) => matchNamespace(namespace, pref.name)) - .map((pref) => pref.id) - // replace all prefs in given namespace - if (allPrefIdsInNamespace.length) { - await this.db.db - .deleteFrom('user_pref') - .where('did', '=', did) - .where('id', 'in', allPrefIdsInNamespace) - .execute() - } - if (putPrefs.length) { - await this.db.db.insertInto('user_pref').values(putPrefs).execute() - } - } } -export type UserPreference = Record & { $type: string } - export type CodeDetail = { code: string available: number @@ -618,8 +552,4 @@ export class ListKeyset extends TimeCidKeyset<{ } } -const matchNamespace = (namespace: string, fullname: string) => { - return fullname === namespace || fullname.startsWith(`${namespace}.`) -} - export type HandleSequenceToken = { did: string; handle: string } diff --git a/packages/pds/src/services/index.ts b/packages/pds/src/services/index.ts index 954a5544e6e..e522e187882 100644 --- a/packages/pds/src/services/index.ts +++ b/packages/pds/src/services/index.ts @@ -4,12 +4,7 @@ import { BlobStore } from '@atproto/repo' import Database from '../db' import { AccountService } from './account' import { AuthService } from './auth' -import { RecordService } from './record' -import { RepoService } from './repo' import { ModerationService } from './moderation' -import { BackgroundQueue } from '../background' -import { Crawlers } from '../crawlers' -import { LocalService } from './local' export function createServices(resources: { repoSigningKey: crypto.Keypair @@ -18,36 +13,11 @@ export function createServices(resources: { appViewAgent?: AtpAgent appViewDid?: string appViewCdnUrlPattern?: string - backgroundQueue: BackgroundQueue - crawlers: Crawlers }): Services { - const { - repoSigningKey, - blobstore, - pdsHostname, - appViewAgent, - appViewDid, - appViewCdnUrlPattern, - backgroundQueue, - crawlers, - } = resources + const { blobstore } = resources return { account: AccountService.creator(), auth: AuthService.creator(), - record: RecordService.creator(), - repo: RepoService.creator( - repoSigningKey, - blobstore, - backgroundQueue, - crawlers, - ), - local: LocalService.creator( - repoSigningKey, - pdsHostname, - appViewAgent, - appViewDid, - appViewCdnUrlPattern, - ), moderation: ModerationService.creator(blobstore), } } @@ -55,9 +25,6 @@ export function createServices(resources: { export type Services = { account: FromDb auth: FromDb - record: FromDb - repo: FromDb - local: FromDb moderation: FromDb } diff --git a/packages/pds/src/user-db/index.ts b/packages/pds/src/user-db/index.ts new file mode 100644 index 00000000000..0be4d682929 --- /dev/null +++ b/packages/pds/src/user-db/index.ts @@ -0,0 +1,121 @@ +import assert from 'assert' +import { + Kysely, + SqliteDialect, + Migrator, + KyselyPlugin, + PluginTransformQueryArgs, + PluginTransformResultArgs, + RootOperationNode, + QueryResult, + UnknownRow, +} from 'kysely' +import SqliteDB from 'better-sqlite3' +import { DatabaseSchema } from './schema' +import * as migrations from './migrations' +import { CtxMigrationProvider } from './migrations/provider' + +export class UserDb { + migrator: Migrator + destroyed = false + + constructor(public db: Kysely) { + this.migrator = new Migrator({ + db, + provider: new CtxMigrationProvider(migrations), + }) + } + + static sqlite(location: string): UserDb { + const db = new Kysely({ + dialect: new SqliteDialect({ + database: new SqliteDB(location), + }), + }) + return new UserDb(db) + } + + static memory(): UserDb { + return UserDb.sqlite(':memory:') + } + + async transaction(fn: (db: UserDb) => Promise): Promise { + const leakyTxPlugin = new LeakyTxPlugin() + return this.db + .withPlugin(leakyTxPlugin) + .transaction() + .execute(async (txn) => { + const dbTxn = new UserDb(txn) + const txRes = await fn(dbTxn) + .catch(async (err) => { + leakyTxPlugin.endTx() + // ensure that all in-flight queries are flushed & the connection is open + await dbTxn.db.getExecutor().provideConnection(async () => {}) + throw err + }) + .finally(() => leakyTxPlugin.endTx()) + return txRes + }) + } + + get isTransaction() { + return this.db.isTransaction + } + + assertTransaction() { + assert(this.isTransaction, 'Transaction required') + } + + assertNotTransaction() { + assert(!this.isTransaction, 'Cannot be in a transaction') + } + + async close(): Promise { + if (this.destroyed) return + await this.db.destroy() + this.destroyed = true + } + + async migrateToOrThrow(migration: string) { + const { error, results } = await this.migrator.migrateTo(migration) + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } + + async migrateToLatestOrThrow() { + const { error, results } = await this.migrator.migrateToLatest() + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } +} + +class LeakyTxPlugin implements KyselyPlugin { + private txOver: boolean + + endTx() { + this.txOver = true + } + + transformQuery(args: PluginTransformQueryArgs): RootOperationNode { + if (this.txOver) { + throw new Error('tx already failed') + } + return args.node + } + + async transformResult( + args: PluginTransformResultArgs, + ): Promise> { + return args.result + } +} diff --git a/packages/pds/src/user-db/migrations/20230613T164932261Z-init.ts b/packages/pds/src/user-db/migrations/20230613T164932261Z-init.ts new file mode 100644 index 00000000000..17105eff6b0 --- /dev/null +++ b/packages/pds/src/user-db/migrations/20230613T164932261Z-init.ts @@ -0,0 +1,5 @@ +import { Kysely } from 'kysely' + +export async function up(db: Kysely): Promise {} + +export async function down(db: Kysely): Promise {} diff --git a/packages/pds/src/user-db/migrations/index.ts b/packages/pds/src/user-db/migrations/index.ts new file mode 100644 index 00000000000..9de245dda96 --- /dev/null +++ b/packages/pds/src/user-db/migrations/index.ts @@ -0,0 +1,5 @@ +// NOTE this file can be edited by hand, but it is also appended to by the migration:create command. +// It's important that every migration is exported from here with the proper name. We'd simplify +// this with kysely's FileMigrationProvider, but it doesn't play nicely with the build process. + +export * as _20230613T164932261Z from './20230613T164932261Z-init' diff --git a/packages/pds/src/user-db/migrations/provider.ts b/packages/pds/src/user-db/migrations/provider.ts new file mode 100644 index 00000000000..b93b01f63ce --- /dev/null +++ b/packages/pds/src/user-db/migrations/provider.ts @@ -0,0 +1,24 @@ +import { Kysely, Migration, MigrationProvider } from 'kysely' + +// @TODO remove/cleanup + +// Passes a context argument to migrations. We use this to thread the dialect into migrations + +export class CtxMigrationProvider implements MigrationProvider { + constructor(private migrations: Record) {} + async getMigrations(): Promise> { + const ctxMigrations: Record = {} + Object.entries(this.migrations).forEach(([name, migration]) => { + ctxMigrations[name] = { + up: async (db) => await migration.up(db), + down: async (db) => await migration.down?.(db), + } + }) + return ctxMigrations + } +} + +export interface CtxMigration { + up(db: Kysely): Promise + down?(db: Kysely): Promise +} diff --git a/packages/pds/src/db/tables/backlink.ts b/packages/pds/src/user-db/schema/backlink.ts similarity index 100% rename from packages/pds/src/db/tables/backlink.ts rename to packages/pds/src/user-db/schema/backlink.ts diff --git a/packages/pds/src/db/tables/blob.ts b/packages/pds/src/user-db/schema/blob.ts similarity index 93% rename from packages/pds/src/db/tables/blob.ts rename to packages/pds/src/user-db/schema/blob.ts index afea699db16..068244d7771 100644 --- a/packages/pds/src/db/tables/blob.ts +++ b/packages/pds/src/user-db/schema/blob.ts @@ -1,5 +1,4 @@ export interface Blob { - creator: string cid: string mimeType: string size: number diff --git a/packages/pds/src/user-db/schema/index.ts b/packages/pds/src/user-db/schema/index.ts new file mode 100644 index 00000000000..bebaf302b6e --- /dev/null +++ b/packages/pds/src/user-db/schema/index.ts @@ -0,0 +1,15 @@ +import * as userPref from './user-pref' +import * as repoRoot from './repo-root' +import * as record from './record' +import * as backlink from './backlink' +import * as ipldBlock from './ipld-block' +import * as blob from './blob' +import * as repoBlob from './repo-blob' + +export type DatabaseSchema = userPref.PartialDB & + repoRoot.PartialDB & + record.PartialDB & + backlink.PartialDB & + ipldBlock.PartialDB & + blob.PartialDB & + repoBlob.PartialDB diff --git a/packages/pds/src/db/tables/ipld-block.ts b/packages/pds/src/user-db/schema/ipld-block.ts similarity index 91% rename from packages/pds/src/db/tables/ipld-block.ts rename to packages/pds/src/user-db/schema/ipld-block.ts index ce7bd30a51a..5151f557867 100644 --- a/packages/pds/src/db/tables/ipld-block.ts +++ b/packages/pds/src/user-db/schema/ipld-block.ts @@ -1,6 +1,5 @@ export interface IpldBlock { cid: string - creator: string repoRev: string | null size: number content: Uint8Array diff --git a/packages/pds/src/db/tables/record.ts b/packages/pds/src/user-db/schema/record.ts similarity index 95% rename from packages/pds/src/db/tables/record.ts rename to packages/pds/src/user-db/schema/record.ts index 03f1008ef0f..00208ee14cc 100644 --- a/packages/pds/src/db/tables/record.ts +++ b/packages/pds/src/user-db/schema/record.ts @@ -2,7 +2,6 @@ export interface Record { uri: string cid: string - did: string collection: string rkey: string repoRev: string | null diff --git a/packages/pds/src/db/tables/repo-blob.ts b/packages/pds/src/user-db/schema/repo-blob.ts similarity index 93% rename from packages/pds/src/db/tables/repo-blob.ts rename to packages/pds/src/user-db/schema/repo-blob.ts index a1fed0877e5..f29e7febc7d 100644 --- a/packages/pds/src/db/tables/repo-blob.ts +++ b/packages/pds/src/user-db/schema/repo-blob.ts @@ -2,7 +2,6 @@ export interface RepoBlob { cid: string recordUri: string repoRev: string | null - did: string takedownId: number | null } diff --git a/packages/pds/src/user-db/schema/repo-root.ts b/packages/pds/src/user-db/schema/repo-root.ts new file mode 100644 index 00000000000..8f4f1122059 --- /dev/null +++ b/packages/pds/src/user-db/schema/repo-root.ts @@ -0,0 +1,10 @@ +// @NOTE also used by app-view (moderation) +export interface RepoRoot { + cid: string + rev: string + indexedAt: string +} + +export const tableName = 'repo_root' + +export type PartialDB = { [tableName]: RepoRoot } diff --git a/packages/pds/src/db/tables/user-pref.ts b/packages/pds/src/user-db/schema/user-pref.ts similarity index 94% rename from packages/pds/src/db/tables/user-pref.ts rename to packages/pds/src/user-db/schema/user-pref.ts index d72b902861b..ec74d97f62c 100644 --- a/packages/pds/src/db/tables/user-pref.ts +++ b/packages/pds/src/user-db/schema/user-pref.ts @@ -2,7 +2,6 @@ import { GeneratedAlways } from 'kysely' export interface UserPref { id: GeneratedAlways - did: string name: string valueJson: string // json } diff --git a/packages/pds/src/services/repo/blobs.ts b/packages/pds/src/user-store/blobs.ts similarity index 90% rename from packages/pds/src/services/repo/blobs.ts rename to packages/pds/src/user-store/blobs.ts index 2bedb88ecfd..fc445778f1f 100644 --- a/packages/pds/src/services/repo/blobs.ts +++ b/packages/pds/src/user-store/blobs.ts @@ -8,22 +8,25 @@ import { AtUri } from '@atproto/syntax' import { cloneStream, sha256RawToCid, streamSize } from '@atproto/common' import { InvalidRequestError } from '@atproto/xrpc-server' import { BlobRef } from '@atproto/lexicon' -import { PreparedBlobRef, PreparedWrite } from '../../repo/types' -import Database from '../../db' -import { Blob as BlobTable } from '../../db/tables/blob' -import * as img from '../../image' -import { PreparedDelete, PreparedUpdate } from '../../repo' -import { BackgroundQueue } from '../../background' +import { UserDb } from '../user-db' +import { + PreparedBlobRef, + PreparedWrite, + PreparedDelete, + PreparedUpdate, +} from '../repo/types' +import { Blob as BlobTable } from '../user-db/tables/blob' +import * as img from '../image' +import { BackgroundQueue } from '../background' -export class RepoBlobs { +export class Blobs { constructor( - public db: Database, + public db: UserDb, public blobstore: BlobStore, public backgroundQueue: BackgroundQueue, ) {} async addUntetheredBlob( - creator: string, userSuggestedMime: string, blobStream: stream.Readable, ): Promise { @@ -41,7 +44,6 @@ export class RepoBlobs { await this.db.db .insertInto('blob') .values({ - creator, cid: cid.toString(), mimeType, size, @@ -52,7 +54,7 @@ export class RepoBlobs { }) .onConflict((oc) => oc - .columns(['creator', 'cid']) + .column('cid') .doUpdateSet({ tempKey }) .where('blob.tempKey', 'is not', null), ) @@ -120,7 +122,6 @@ export class RepoBlobs { await this.db.db .deleteFrom('blob') - .where('creator', '=', did) .where('cid', 'in', cidsToDelete) .execute() @@ -135,16 +136,18 @@ export class RepoBlobs { const blobsToDelete = cidsToDelete.filter((cid) => !stillUsed.includes(cid)) + // @TODO FIX ME + // move actual blob deletion to the background queue - if (blobsToDelete.length > 0) { - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await Promise.allSettled( - blobsToDelete.map((cid) => this.blobstore.delete(CID.parse(cid))), - ) - }) - }) - } + // if (blobsToDelete.length > 0) { + // this.db.onCommit(() => { + // this.backgroundQueue.add(async () => { + // await Promise.allSettled( + // blobsToDelete.map((cid) => this.blobstore.delete(CID.parse(cid))), + // ) + // }) + // }) + // } } async verifyBlobAndMakePermanent( @@ -155,7 +158,6 @@ export class RepoBlobs { const found = await this.db.db .selectFrom('blob') .selectAll() - .where('creator', '=', creator) .where('cid', '=', blob.cid.toString()) .whereNotExists( // Check if blob has been taken down @@ -217,11 +219,7 @@ export class RepoBlobs { async deleteForUser(did: string): Promise { // Not done in transaction because it would be too long, prone to contention. // Also, this can safely be run multiple times if it fails. - const deleted = await this.db.db - .deleteFrom('blob') - .where('creator', '=', did) - .returningAll() - .execute() + const deleted = await this.db.db.deleteFrom('blob').returningAll().execute() await this.db.db.deleteFrom('repo_blob').where('did', '=', did).execute() const deletedCids = deleted.map((d) => d.cid) let duplicateCids: string[] = [] diff --git a/packages/pds/src/services/local/index.ts b/packages/pds/src/user-store/local.ts similarity index 86% rename from packages/pds/src/services/local/index.ts rename to packages/pds/src/user-store/local.ts index c5cc782357f..4286db6425d 100644 --- a/packages/pds/src/services/local/index.ts +++ b/packages/pds/src/user-store/local.ts @@ -2,42 +2,42 @@ import util from 'util' import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/syntax' import { cborToLexRecord } from '@atproto/repo' -import Database from '../../db' -import { Record as PostRecord } from '../../lexicon/types/app/bsky/feed/post' -import { Record as ProfileRecord } from '../../lexicon/types/app/bsky/actor/profile' -import { ids } from '../../lexicon/lexicons' +import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post' +import { Record as ProfileRecord } from '../lexicon/types/app/bsky/actor/profile' +import { ids } from '../lexicon/lexicons' import { ProfileViewBasic, ProfileView, ProfileViewDetailed, -} from '../../lexicon/types/app/bsky/actor/defs' -import { FeedViewPost, PostView } from '../../lexicon/types/app/bsky/feed/defs' +} from '../lexicon/types/app/bsky/actor/defs' +import { FeedViewPost, PostView } from '../lexicon/types/app/bsky/feed/defs' import { Main as EmbedImages, isMain as isEmbedImages, -} from '../../lexicon/types/app/bsky/embed/images' +} from '../lexicon/types/app/bsky/embed/images' import { Main as EmbedExternal, isMain as isEmbedExternal, -} from '../../lexicon/types/app/bsky/embed/external' +} from '../lexicon/types/app/bsky/embed/external' import { Main as EmbedRecord, isMain as isEmbedRecord, View as EmbedRecordView, -} from '../../lexicon/types/app/bsky/embed/record' +} from '../lexicon/types/app/bsky/embed/record' import { Main as EmbedRecordWithMedia, isMain as isEmbedRecordWithMedia, -} from '../../lexicon/types/app/bsky/embed/recordWithMedia' +} from '../lexicon/types/app/bsky/embed/recordWithMedia' import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' import { createServiceAuthHeaders } from '@atproto/xrpc-server' +import { UserDb } from '../user-db' type CommonSignedUris = 'avatar' | 'banner' | 'feed_thumbnail' | 'feed_fullsize' export class LocalService { constructor( - public db: Database, + public db: UserDb, public signingKey: Keypair, public pdsHostname: string, public appViewAgent?: AtpAgent, @@ -52,7 +52,7 @@ export class LocalService { appviewDid?: string, appviewCdnUrlPattern?: string, ) { - return (db: Database) => + return (db: UserDb) => new LocalService( db, signingKey, @@ -81,21 +81,16 @@ export class LocalService { }) } - async getRecordsSinceRev(did: string, rev: string): Promise { + async getRecordsSinceRev(rev: string): Promise { const res = await this.db.db .selectFrom('record') - .innerJoin('ipld_block', (join) => - join - .onRef('record.did', '=', 'ipld_block.creator') - .onRef('record.cid', '=', 'ipld_block.cid'), - ) + .innerJoin('ipld_block', 'ipld_block.cid', 'record.cid') .select([ 'ipld_block.content', 'uri', 'ipld_block.cid', 'record.indexedAt', ]) - .where('did', '=', did) .where('record.repoRev', '>', rev) .orderBy('record.repoRev', 'asc') .execute() @@ -121,16 +116,10 @@ export class LocalService { ) } - async getProfileBasic(did: string): Promise { + async getProfileBasic(): Promise { const res = await this.db.db - .selectFrom('did_handle') - .leftJoin('record', 'record.did', 'did_handle.did') - .leftJoin('ipld_block', (join) => - join - .onRef('record.did', '=', 'ipld_block.creator') - .onRef('record.cid', '=', 'ipld_block.cid'), - ) - .where('did_handle.did', '=', did) + .selectFrom('record') + .leftJoin('ipld_block', 'ipld_block.cid', 'record.cid') .where('record.collection', '=', ids.AppBskyActorProfile) .where('record.rkey', '=', 'self') .selectAll() @@ -139,9 +128,14 @@ export class LocalService { const record = res.content ? (cborToLexRecord(res.content) as ProfileRecord) : null + // @TODO fix + const did = '' + const handle = '' return { did, - handle: res.handle, + handle, + // did, + // handle: res.handle, displayName: record?.displayName, avatar: record?.avatar ? this.getImageUrl('avatar', did, record.avatar.ref.toString()) @@ -178,7 +172,7 @@ export class LocalService { descript: RecordDescript, ): Promise { const { uri, cid, indexedAt, record } = descript - const author = await this.getProfileBasic(uri.hostname) + const author = await this.getProfileBasic() if (!author) return null const embed = record.embed ? await this.formatPostEmbed(author.did, record) diff --git a/packages/pds/src/user-store/preferences.ts b/packages/pds/src/user-store/preferences.ts new file mode 100644 index 00000000000..6c0f520f49f --- /dev/null +++ b/packages/pds/src/user-store/preferences.ts @@ -0,0 +1,68 @@ +import { InvalidRequestError } from '@atproto/xrpc-server' +import { UserDb } from '../user-db' + +export class PreferencesService { + constructor(public db: UserDb) {} + + static creator() { + return (db: UserDb) => new PreferencesService(db) + } + + async getPreferences( + did: string, + namespace?: string, + ): Promise { + const prefsRes = await this.db.db + .selectFrom('user_pref') + .orderBy('id') + .selectAll() + .execute() + return prefsRes + .filter((pref) => !namespace || matchNamespace(namespace, pref.name)) + .map((pref) => JSON.parse(pref.valueJson)) + } + + async putPreferences( + did: string, + values: UserPreference[], + namespace: string, + ): Promise { + this.db.assertTransaction() + if (!values.every((value) => matchNamespace(namespace, value.$type))) { + throw new InvalidRequestError( + `Some preferences are not in the ${namespace} namespace`, + ) + } + // get all current prefs for user and prep new pref rows + const allPrefs = await this.db.db + .selectFrom('user_pref') + .select(['id', 'name']) + .execute() + const putPrefs = values.map((value) => { + return { + did, + name: value.$type, + valueJson: JSON.stringify(value), + } + }) + const allPrefIdsInNamespace = allPrefs + .filter((pref) => matchNamespace(namespace, pref.name)) + .map((pref) => pref.id) + // replace all prefs in given namespace + if (allPrefIdsInNamespace.length) { + await this.db.db + .deleteFrom('user_pref') + .where('id', 'in', allPrefIdsInNamespace) + .execute() + } + if (putPrefs.length) { + await this.db.db.insertInto('user_pref').values(putPrefs).execute() + } + } +} + +export type UserPreference = Record & { $type: string } + +const matchNamespace = (namespace: string, fullname: string) => { + return fullname === namespace || fullname.startsWith(`${namespace}.`) +} diff --git a/packages/pds/src/services/record/index.ts b/packages/pds/src/user-store/record.ts similarity index 96% rename from packages/pds/src/services/record/index.ts rename to packages/pds/src/user-store/record.ts index 1914d1b8c61..a1a36417037 100644 --- a/packages/pds/src/services/record/index.ts +++ b/packages/pds/src/user-store/record.ts @@ -2,17 +2,17 @@ import { CID } from 'multiformats/cid' import { AtUri, ensureValidAtUri } from '@atproto/syntax' import * as ident from '@atproto/syntax' import { cborToLexRecord, WriteOpAction } from '@atproto/repo' -import { dbLogger as log } from '../../logger' -import Database from '../../db' -import { notSoftDeletedClause } from '../../db/util' -import { Backlink } from '../../db/tables/backlink' -import { ids } from '../../lexicon/lexicons' +import { dbLogger as log } from '../logger' +import { notSoftDeletedClause } from '../user-db/util' +import { Backlink } from '../user-db/tables/backlink' +import { ids } from '../lexicon/lexicons' +import { UserDb } from '../user-db' export class RecordService { - constructor(public db: Database) {} + constructor(public db: UserDb) {} static creator() { - return (db: Database) => new RecordService(db) + return (db: UserDb) => new RecordService(db) } async indexRecord( diff --git a/packages/pds/src/services/repo/index.ts b/packages/pds/src/user-store/repo.ts similarity index 79% rename from packages/pds/src/services/repo/index.ts rename to packages/pds/src/user-store/repo.ts index 406635b736d..d440ae76c99 100644 --- a/packages/pds/src/services/repo/index.ts +++ b/packages/pds/src/user-store/repo.ts @@ -3,33 +3,34 @@ import * as crypto from '@atproto/crypto' import { BlobStore, CommitData, Repo, WriteOpAction } from '@atproto/repo' import { InvalidRequestError } from '@atproto/xrpc-server' import { AtUri } from '@atproto/syntax' -import Database from '../../db' -import SqlRepoStorage from '../../sql-repo-storage' +import SqlRepoStorage from './sql-repo-storage' import { BadCommitSwapError, BadRecordSwapError, PreparedCreate, PreparedWrite, -} from '../../repo/types' -import { RepoBlobs } from './blobs' -import { createWriteToOp, writeToOp } from '../../repo' -import { RecordService } from '../record' -import * as sequencer from '../../sequencer' +} from '../repo/types' +import { Blobs } from './blobs' +import { createWriteToOp, writeToOp } from '../repo' import { wait } from '@atproto/common' -import { BackgroundQueue } from '../../background' -import { Crawlers } from '../../crawlers' +import { BackgroundQueue } from '../background' +import { Crawlers } from '../crawlers' +import { UserDb } from '../user-db' +import { RecordService } from './record' export class RepoService { - blobs: RepoBlobs + blobs: Blobs + record: RecordService constructor( - public db: Database, + public db: UserDb, public repoSigningKey: crypto.Keypair, public blobstore: BlobStore, public backgroundQueue: BackgroundQueue, public crawlers: Crawlers, ) { - this.blobs = new RepoBlobs(db, blobstore, backgroundQueue) + this.blobs = new Blobs(db, blobstore, backgroundQueue) + this.record = new RecordService(db) } static creator( @@ -38,14 +39,10 @@ export class RepoService { backgroundQueue: BackgroundQueue, crawlers: Crawlers, ) { - return (db: Database) => + return (db: UserDb) => new RepoService(db, keypair, blobstore, backgroundQueue, crawlers) } - services = { - record: RecordService.creator(), - } - private async serviceTx( fn: (srvc: RepoService) => Promise, ): Promise { @@ -77,7 +74,7 @@ export class RepoService { this.indexWrites(writes, now), this.blobs.processWriteBlobs(did, commit.rev, writes), ]) - await this.afterWriteProcessing(did, commit, writes) + // await this.afterWriteProcessing(did, commit, writes) } async processCommit( @@ -88,10 +85,6 @@ export class RepoService { ) { this.db.assertTransaction() const storage = new SqlRepoStorage(this.db, did, now) - const obtained = await storage.lockRepo() - if (!obtained) { - throw new ConcurrentWriteError() - } await Promise.all([ // persist the commit to repo storage storage.applyCommit(commitData), @@ -101,7 +94,7 @@ export class RepoService { this.blobs.processWriteBlobs(did, commitData.rev, writes), // do any other processing needed after write ]) - await this.afterWriteProcessing(did, commitData, writes) + // await this.afterWriteProcessing(did, commitData, writes) } async processWrites( @@ -145,10 +138,6 @@ export class RepoService { ): Promise { // this is not in a txn, so this won't actually hold the lock, // we just check if it is currently held by another txn - const available = await storage.lockAvailable() - if (!available) { - throw new ConcurrentWriteError() - } const currRoot = await storage.getRootDetailed() if (!currRoot) { throw new InvalidRequestError( @@ -160,7 +149,6 @@ export class RepoService { } // cache last commit since there's likely overlap await storage.cacheRev(currRoot.rev) - const recordTxn = this.services.record(this.db) const newRecordCids: CID[] = [] const delAndUpdateUris: AtUri[] = [] for (const write of writes) { @@ -174,7 +162,7 @@ export class RepoService { if (swapCid === undefined) { continue } - const record = await recordTxn.getRecord(uri, null, true) + const record = await this.record.getRecord(uri, null, true) const currRecord = record && CID.parse(record.cid) if (action === WriteOpAction.Create && swapCid !== null) { throw new BadRecordSwapError(currRecord) // There should be no current record for a create @@ -227,14 +215,13 @@ export class RepoService { async indexWrites(writes: PreparedWrite[], now: string, rev?: string) { this.db.assertTransaction() - const recordTxn = this.services.record(this.db) await Promise.all( writes.map(async (write) => { if ( write.action === WriteOpAction.Create || write.action === WriteOpAction.Update ) { - await recordTxn.indexRecord( + await this.record.indexRecord( write.uri, write.cid, write.record, @@ -243,7 +230,7 @@ export class RepoService { now, ) } else if (write.action === WriteOpAction.Delete) { - await recordTxn.deleteRecord(write.uri) + await this.record.deleteRecord(write.uri) } }), ) @@ -261,7 +248,6 @@ export class RepoService { const uriStrs = touchedUris.map((u) => u.toString()) const res = await this.db.db .selectFrom('record') - .where('did', '=', did) .where('cid', 'in', cidStrs) .where('uri', 'not in', uriStrs) .select('cid') @@ -269,32 +255,29 @@ export class RepoService { return res.map((row) => CID.parse(row.cid)) } - async afterWriteProcessing( - did: string, - commitData: CommitData, - writes: PreparedWrite[], - ) { - this.db.onCommit(() => { - this.backgroundQueue.add(async () => { - await this.crawlers.notifyOfUpdate() - }) - }) - - const seqEvt = await sequencer.formatSeqCommit(did, commitData, writes) - await sequencer.sequenceEvt(this.db, seqEvt) - } + // async afterWriteProcessing( + // did: string, + // commitData: CommitData, + // writes: PreparedWrite[], + // ) { + // this.db.onCommit(() => { + // this.backgroundQueue.add(async () => { + // await this.crawlers.notifyOfUpdate() + // }) + // }) + // const seqEvt = await sequencer.formatSeqCommit(did, commitData, writes) + // await sequencer.sequenceEvt(this.db, seqEvt) + // } - async deleteRepo(did: string) { + async deleteRepo(_did: string) { + // @TODO DELETE FULL SQLITE FILE // Not done in transaction because it would be too long, prone to contention. // Also, this can safely be run multiple times if it fails. // delete all blocks from this did & no other did - await this.db.db.deleteFrom('repo_root').where('did', '=', did).execute() - await this.db.db.deleteFrom('repo_seq').where('did', '=', did).execute() - await this.db.db - .deleteFrom('ipld_block') - .where('creator', '=', did) - .execute() - await this.blobs.deleteForUser(did) + // await this.db.db.deleteFrom('repo_root').where('did', '=', did).execute() + // await this.db.db.deleteFrom('repo_seq').where('did', '=', did).execute() + // await this.db.db.deleteFrom('ipld_block').execute() + // await this.blobs.deleteForUser(did) } } diff --git a/packages/pds/src/user-store/sql-repo-storage.ts b/packages/pds/src/user-store/sql-repo-storage.ts new file mode 100644 index 00000000000..4a12686b782 --- /dev/null +++ b/packages/pds/src/user-store/sql-repo-storage.ts @@ -0,0 +1,243 @@ +import { + CommitData, + RepoStorage, + BlockMap, + CidSet, + ReadableBlockstore, + writeCarStream, +} from '@atproto/repo' +import { chunkArray } from '@atproto/common' +import { CID } from 'multiformats/cid' +import { UserDb } from '../user-db' +import { IpldBlock } from '../user-db/tables/ipld-block' +import { sql } from 'kysely' + +export class SqlRepoStorage extends ReadableBlockstore implements RepoStorage { + cache: BlockMap = new BlockMap() + + constructor( + public db: UserDb, + public did: string, + public timestamp?: string, + ) { + super() + } + + async getRoot(): Promise { + const root = await this.getRootDetailed() + return root?.cid ?? null + } + + async getRootDetailed(): Promise<{ cid: CID; rev: string } | null> { + const res = await this.db.db + .selectFrom('repo_root') + .orderBy('repo_root.rev', 'desc') + .limit(1) + .selectAll() + .executeTakeFirst() + if (!res) return null + return { + cid: CID.parse(res.cid), + rev: res.rev ?? '', // @TODO add not-null constraint to rev + } + } + + // proactively cache all blocks from a particular commit (to prevent multiple roundtrips) + async cacheRev(rev: string): Promise { + const res = await this.db.db + .selectFrom('ipld_block') + .where('repoRev', '=', rev) + .select(['ipld_block.cid', 'ipld_block.content']) + .limit(15) + .execute() + for (const row of res) { + this.cache.set(CID.parse(row.cid), row.content) + } + } + + async getBytes(cid: CID): Promise { + const cached = this.cache.get(cid) + if (cached) return cached + const found = await this.db.db + .selectFrom('ipld_block') + .where('ipld_block.cid', '=', cid.toString()) + .select('content') + .executeTakeFirst() + if (!found) return null + this.cache.set(cid, found.content) + return found.content + } + + async has(cid: CID): Promise { + const got = await this.getBytes(cid) + return !!got + } + + async getBlocks(cids: CID[]): Promise<{ blocks: BlockMap; missing: CID[] }> { + const cached = this.cache.getMany(cids) + if (cached.missing.length < 1) return cached + const missing = new CidSet(cached.missing) + const missingStr = cached.missing.map((c) => c.toString()) + const blocks = new BlockMap() + await Promise.all( + chunkArray(missingStr, 500).map(async (batch) => { + const res = await this.db.db + .selectFrom('ipld_block') + .where('ipld_block.cid', 'in', batch) + .select(['ipld_block.cid as cid', 'ipld_block.content as content']) + .execute() + for (const row of res) { + const cid = CID.parse(row.cid) + blocks.set(cid, row.content) + missing.delete(cid) + } + }), + ) + this.cache.addMap(blocks) + blocks.addMap(cached.blocks) + return { blocks, missing: missing.toList() } + } + + async putBlock(cid: CID, block: Uint8Array, rev: string): Promise { + this.db.assertTransaction() + await this.db.db + .insertInto('ipld_block') + .values({ + cid: cid.toString(), + repoRev: rev, + size: block.length, + content: block, + }) + .onConflict((oc) => oc.doNothing()) + .execute() + this.cache.set(cid, block) + } + + async putMany(toPut: BlockMap, rev: string): Promise { + this.db.assertTransaction() + const blocks: IpldBlock[] = [] + toPut.forEach((bytes, cid) => { + blocks.push({ + cid: cid.toString(), + repoRev: rev, + size: bytes.length, + content: bytes, + }) + this.cache.addMap(toPut) + }) + await Promise.all( + chunkArray(blocks, 500).map((batch) => + this.db.db + .insertInto('ipld_block') + .values(batch) + .onConflict((oc) => oc.doNothing()) + .execute(), + ), + ) + } + + async deleteMany(cids: CID[]) { + if (cids.length < 1) return + const cidStrs = cids.map((c) => c.toString()) + await this.db.db + .deleteFrom('ipld_block') + .where('cid', 'in', cidStrs) + .execute() + } + + async applyCommit(commit: CommitData) { + await Promise.all([ + this.updateRoot(commit.cid, commit.rev), + this.putMany(commit.newBlocks, commit.rev), + this.deleteMany(commit.removedCids.toList()), + ]) + } + + async updateRoot(cid: CID, rev: string): Promise { + await this.db.db + .insertInto('repo_root') + .values({ + cid: cid.toString(), + rev: rev, + indexedAt: this.getTimestamp(), + }) + .execute() + } + + async getCarStream(since?: string) { + const root = await this.getRoot() + if (!root) { + throw new RepoRootNotFoundError() + } + return writeCarStream(root, async (car) => { + let cursor: RevCursor | undefined = undefined + const writeRows = async ( + rows: { cid: string; content: Uint8Array }[], + ) => { + for (const row of rows) { + await car.put({ + cid: CID.parse(row.cid), + bytes: row.content, + }) + } + } + // allow us to write to car while fetching the next page + let writePromise: Promise = Promise.resolve() + do { + const res = await this.getBlockRange(since, cursor) + await writePromise + writePromise = writeRows(res) + const lastRow = res.at(-1) + if (lastRow && lastRow.repoRev) { + cursor = { + cid: CID.parse(lastRow.cid), + rev: lastRow.repoRev, + } + } else { + cursor = undefined + } + } while (cursor) + // ensure we flush the last page of blocks + await writePromise + }) + } + + async getBlockRange(since?: string, cursor?: RevCursor) { + const { ref } = this.db.db.dynamic + let builder = this.db.db + .selectFrom('ipld_block') + .select(['cid', 'repoRev', 'content']) + .orderBy('repoRev', 'desc') + .orderBy('cid', 'desc') + .limit(500) + if (cursor) { + // use this syntax to ensure we hit the index + builder = builder.where( + sql`((${ref('repoRev')}, ${ref('cid')}) < (${ + cursor.rev + }, ${cursor.cid.toString()}))`, + ) + } + if (since) { + builder = builder.where('repoRev', '>', since) + } + return builder.execute() + } + + getTimestamp(): string { + return this.timestamp || new Date().toISOString() + } + + async destroy(): Promise { + throw new Error('Destruction of SQL repo storage not allowed at runtime') + } +} + +type RevCursor = { + cid: CID + rev: string +} + +export default SqlRepoStorage + +export class RepoRootNotFoundError extends Error {} diff --git a/packages/repo/src/storage/memory-blockstore.ts b/packages/repo/src/storage/memory-blockstore.ts index 5f91311c345..1426d962dc0 100644 --- a/packages/repo/src/storage/memory-blockstore.ts +++ b/packages/repo/src/storage/memory-blockstore.ts @@ -10,6 +10,7 @@ export class MemoryBlockstore { blocks: BlockMap root: CID | null = null + rev: string | null = null constructor(blocks?: BlockMap) { super() @@ -43,8 +44,9 @@ export class MemoryBlockstore this.blocks.addMap(blocks) } - async updateRoot(cid: CID): Promise { + async updateRoot(cid: CID, rev: string): Promise { this.root = cid + this.rev = rev } async applyCommit(commit: CommitData): Promise { diff --git a/packages/repo/src/storage/types.ts b/packages/repo/src/storage/types.ts index 804be48cbc8..150dc66f704 100644 --- a/packages/repo/src/storage/types.ts +++ b/packages/repo/src/storage/types.ts @@ -10,7 +10,7 @@ export interface RepoStorage { getRoot(): Promise putBlock(cid: CID, block: Uint8Array, rev: string): Promise putMany(blocks: BlockMap, rev: string): Promise - updateRoot(cid: CID): Promise + updateRoot(cid: CID, rev: string): Promise applyCommit(commit: CommitData) // Readable