From 82fd36a4b12da20154a5c147a8377e1598e7857a Mon Sep 17 00:00:00 2001 From: dholms Date: Tue, 3 Oct 2023 16:17:40 -0500 Subject: [PATCH] rework readers & transactors --- packages/pds/src/actor-store/blob/reader.ts | 61 ++++ .../{repo/blob.ts => blob/transactor.ts} | 7 +- packages/pds/src/actor-store/index.ts | 75 +++-- .../actor-store/{local.ts => local/reader.ts} | 46 +-- .../{ => preference}/preference.ts | 36 +-- .../pds/src/actor-store/preference/reader.ts | 22 ++ packages/pds/src/actor-store/reader.ts | 1 - .../{record.ts => record/reader.ts} | 14 +- .../transactor.ts} | 4 +- packages/pds/src/actor-store/repo/index.ts | 4 - packages/pds/src/actor-store/repo/reader.ts | 17 ++ ...sql-repo-storage.ts => sql-repo-reader.ts} | 94 +----- .../actor-store/repo/sql-repo-transactor.ts | 98 ++++++ .../repo/{repo.ts => transactor.ts} | 68 ++--- .../src/api/com/atproto/repo/uploadBlob.ts | 2 +- .../atproto/sync/deprecated/getCheckout.ts | 6 +- .../com/atproto/sync/deprecated/getHead.ts | 3 +- .../pds/src/api/com/atproto/sync/getBlob.ts | 38 +-- .../pds/src/api/com/atproto/sync/getBlocks.ts | 3 +- .../api/com/atproto/sync/getLatestCommit.ts | 3 +- .../pds/src/api/com/atproto/sync/getRecord.ts | 3 +- .../pds/src/api/com/atproto/sync/getRepo.ts | 6 +- .../pds/src/api/com/atproto/sync/listBlobs.ts | 22 +- packages/pds/src/sql-repo-storage.ts | 286 ------------------ packages/repo/src/sync/provider.ts | 4 +- 25 files changed, 342 insertions(+), 581 deletions(-) create mode 100644 packages/pds/src/actor-store/blob/reader.ts rename packages/pds/src/actor-store/{repo/blob.ts => blob/transactor.ts} (98%) rename packages/pds/src/actor-store/{local.ts => local/reader.ts} (90%) rename packages/pds/src/actor-store/{ => preference}/preference.ts (52%) create mode 100644 packages/pds/src/actor-store/preference/reader.ts delete mode 100644 packages/pds/src/actor-store/reader.ts rename packages/pds/src/actor-store/{record.ts => record/reader.ts} (95%) rename packages/pds/src/actor-store/{repo/record-transactor.ts => record/transactor.ts} (96%) delete mode 100644 packages/pds/src/actor-store/repo/index.ts create mode 100644 packages/pds/src/actor-store/repo/reader.ts rename packages/pds/src/actor-store/repo/{sql-repo-storage.ts => sql-repo-reader.ts} (63%) create mode 100644 packages/pds/src/actor-store/repo/sql-repo-transactor.ts rename packages/pds/src/actor-store/repo/{repo.ts => transactor.ts} (79%) delete mode 100644 packages/pds/src/sql-repo-storage.ts diff --git a/packages/pds/src/actor-store/blob/reader.ts b/packages/pds/src/actor-store/blob/reader.ts new file mode 100644 index 00000000000..0c275818d7a --- /dev/null +++ b/packages/pds/src/actor-store/blob/reader.ts @@ -0,0 +1,61 @@ +import stream from 'stream' +import { CID } from 'multiformats/cid' +import { BlobNotFoundError, BlobStore } from '@atproto/repo' +import { InvalidRequestError } from '@atproto/xrpc-server' +import { ActorDb } from '../actor-db' +import { notSoftDeletedClause } from '../../db/util' + +export class BlobReader { + constructor(public db: ActorDb, public blobstore: BlobStore) {} + + async getBlob( + cid: CID, + ): Promise<{ size: number; mimeType?: string; stream: stream.Readable }> { + const { ref } = this.db.db.dynamic + const found = await this.db.db + .selectFrom('blob') + .selectAll() + .innerJoin('repo_blob', 'repo_blob.cid', 'blob.cid') + .where('blob.cid', '=', cid.toString()) + .where(notSoftDeletedClause(ref('repo_blob'))) + .executeTakeFirst() + if (!found) { + throw new InvalidRequestError('Blob not found') + } + let blobStream + try { + blobStream = await this.blobstore.getStream(cid) + } catch (err) { + if (err instanceof BlobNotFoundError) { + throw new InvalidRequestError('Blob not found') + } + throw err + } + return { + size: found.size, + mimeType: found.mimeType, + stream: blobStream, + } + } + + async listBlobs(opts: { + since?: string + cursor?: string + limit: number + }): Promise { + const { since, cursor, limit } = opts + let builder = this.db.db + .selectFrom('repo_blob') + .select('cid') + .orderBy('cid', 'asc') + .limit(limit) + if (since) { + builder = builder.where('repoRev', '>', since) + } + if (cursor) { + builder = builder.where('cid', '>', cursor) + } + const res = await builder.execute() + return res.map((row) => row.cid) + } +} diff --git a/packages/pds/src/actor-store/repo/blob.ts b/packages/pds/src/actor-store/blob/transactor.ts similarity index 98% rename from packages/pds/src/actor-store/repo/blob.ts rename to packages/pds/src/actor-store/blob/transactor.ts index c7d4020924a..4967ee38cfe 100644 --- a/packages/pds/src/actor-store/repo/blob.ts +++ b/packages/pds/src/actor-store/blob/transactor.ts @@ -17,13 +17,16 @@ import { } from '../../repo/types' import * as img from '../../image' import { BackgroundQueue } from '../../background' +import { BlobReader } from './reader' -export class ActorBlob { +export class BlobTransactor extends BlobReader { constructor( public db: ActorDb, public blobstore: BlobStore, public backgroundQueue: BackgroundQueue, - ) {} + ) { + super(db, blobstore) + } async addUntetheredBlob( userSuggestedMime: string, diff --git a/packages/pds/src/actor-store/index.ts b/packages/pds/src/actor-store/index.ts index 04d2bbc5957..6056b296dce 100644 --- a/packages/pds/src/actor-store/index.ts +++ b/packages/pds/src/actor-store/index.ts @@ -2,29 +2,31 @@ import { AtpAgent } from '@atproto/api' import * as crypto from '@atproto/crypto' import { BlobStore } from '@atproto/repo' import { ActorDb } from './actor-db' -import { ActorRepo } from './repo' -import { ActorRecord } from './record' -import { ActorLocal } from './local' -import { ActorPreference } from './preference' import { BackgroundQueue } from '../background' +import { RecordReader } from './record/reader' +import { LocalReader } from './local/reader' +import { PreferenceReader } from './preference/reader' +import { RepoReader } from './repo/reader' +import { RepoTransactor } from './repo/transactor' +import { PreferenceTransactor } from './preference/preference' -type ActorStoreReaderResources = { +type ActorStoreResources = { repoSigningKey: crypto.Keypair + blobstore: BlobStore + backgroundQueue: BackgroundQueue pdsHostname: string appViewAgent?: AtpAgent appViewDid?: string appViewCdnUrlPattern?: string } -type ActorStoreResources = ActorStoreReaderResources & { - blobstore: BlobStore - backgroundQueue: BackgroundQueue -} - export const createActorStore = ( resources: ActorStoreResources, ): ActorStore => { return { + db: (did: string) => { + return ActorDb.sqlite('', did) + }, reader: (did: string) => { const db = ActorDb.sqlite('', did) return createActorReader(db, resources) @@ -43,28 +45,48 @@ const createActorTransactor = ( db: ActorDb, resources: ActorStoreResources, ): ActorStoreTransactor => { - const { repoSigningKey, blobstore, backgroundQueue } = resources - const reader = createActorReader(db, resources) + const { + repoSigningKey, + blobstore, + backgroundQueue, + pdsHostname, + appViewAgent, + appViewDid, + appViewCdnUrlPattern, + } = resources return { - ...reader, - repo: new ActorRepo(db, repoSigningKey, blobstore, backgroundQueue), + db, + repo: new RepoTransactor(db, repoSigningKey, blobstore, backgroundQueue), + record: new RecordReader(db), + local: new LocalReader( + db, + repoSigningKey, + pdsHostname, + appViewAgent, + appViewDid, + appViewCdnUrlPattern, + ), + pref: new PreferenceTransactor(db), } } const createActorReader = ( db: ActorDb, - resources: ActorStoreReaderResources, + resources: ActorStoreResources, ): ActorStoreReader => { const { repoSigningKey, + blobstore, pdsHostname, appViewAgent, appViewDid, appViewCdnUrlPattern, } = resources return { - record: new ActorRecord(db), - local: new ActorLocal( + db, + repo: new RepoReader(db, blobstore), + record: new RecordReader(db), + local: new LocalReader( db, repoSigningKey, pdsHostname, @@ -72,23 +94,30 @@ const createActorReader = ( appViewDid, appViewCdnUrlPattern, ), - pref: new ActorPreference(db), + pref: new PreferenceReader(db), } } export type ActorStore = { + db: (did: string) => ActorDb reader: (did: string) => ActorStoreReader transact: (did: string, store: ActorStoreTransactFn) => Promise } export type ActorStoreTransactFn = (fn: ActorStoreTransactor) => Promise -export type ActorStoreTransactor = ActorStoreReader & { - repo: ActorRepo +export type ActorStoreTransactor = { + db: ActorDb + repo: RepoTransactor + record: RecordReader + local: LocalReader + pref: PreferenceTransactor } export type ActorStoreReader = { - record: ActorRecord - local: ActorLocal - pref: ActorPreference + db: ActorDb + repo: RepoReader + record: RecordReader + local: LocalReader + pref: PreferenceReader } diff --git a/packages/pds/src/actor-store/local.ts b/packages/pds/src/actor-store/local/reader.ts similarity index 90% rename from packages/pds/src/actor-store/local.ts rename to packages/pds/src/actor-store/local/reader.ts index 754e14deacb..fdbc20521ad 100644 --- a/packages/pds/src/actor-store/local.ts +++ b/packages/pds/src/actor-store/local/reader.ts @@ -2,40 +2,40 @@ import util from 'util' import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/syntax' import { cborToLexRecord } from '@atproto/repo' -import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post' -import { Record as ProfileRecord } from '../lexicon/types/app/bsky/actor/profile' -import { ids } from '../lexicon/lexicons' +import { AtpAgent } from '@atproto/api' +import { Keypair } from '@atproto/crypto' +import { createServiceAuthHeaders } from '@atproto/xrpc-server' +import { Record as PostRecord } from '../../lexicon/types/app/bsky/feed/post' +import { Record as ProfileRecord } from '../../lexicon/types/app/bsky/actor/profile' +import { ids } from '../../lexicon/lexicons' import { ProfileViewBasic, ProfileView, ProfileViewDetailed, -} from '../lexicon/types/app/bsky/actor/defs' -import { FeedViewPost, PostView } from '../lexicon/types/app/bsky/feed/defs' +} from '../../lexicon/types/app/bsky/actor/defs' +import { FeedViewPost, PostView } from '../../lexicon/types/app/bsky/feed/defs' import { Main as EmbedImages, isMain as isEmbedImages, -} from '../lexicon/types/app/bsky/embed/images' +} from '../../lexicon/types/app/bsky/embed/images' import { Main as EmbedExternal, isMain as isEmbedExternal, -} from '../lexicon/types/app/bsky/embed/external' +} from '../../lexicon/types/app/bsky/embed/external' import { Main as EmbedRecord, isMain as isEmbedRecord, View as EmbedRecordView, -} from '../lexicon/types/app/bsky/embed/record' +} from '../../lexicon/types/app/bsky/embed/record' import { Main as EmbedRecordWithMedia, isMain as isEmbedRecordWithMedia, -} from '../lexicon/types/app/bsky/embed/recordWithMedia' -import { AtpAgent } from '@atproto/api' -import { Keypair } from '@atproto/crypto' -import { createServiceAuthHeaders } from '@atproto/xrpc-server' -import { ActorDb } from './actor-db' +} from '../../lexicon/types/app/bsky/embed/recordWithMedia' +import { ActorDb } from '../actor-db' type CommonSignedUris = 'avatar' | 'banner' | 'feed_thumbnail' | 'feed_fullsize' -export class ActorLocal { +export class LocalReader { constructor( public db: ActorDb, public signingKey: Keypair, @@ -45,24 +45,6 @@ export class ActorLocal { public appviewCdnUrlPattern?: string, ) {} - static creator( - signingKey: Keypair, - pdsHostname: string, - appViewAgent?: AtpAgent, - appviewDid?: string, - appviewCdnUrlPattern?: string, - ) { - return (db: ActorDb) => - new ActorLocal( - db, - signingKey, - pdsHostname, - appViewAgent, - appviewDid, - appviewCdnUrlPattern, - ) - } - getImageUrl(pattern: CommonSignedUris, did: string, cid: string) { if (!this.appviewCdnUrlPattern) { return `https://${this.pdsHostname}/xrpc/${ids.ComAtprotoSyncGetBlob}?did=${did}&cid=${cid}` diff --git a/packages/pds/src/actor-store/preference.ts b/packages/pds/src/actor-store/preference/preference.ts similarity index 52% rename from packages/pds/src/actor-store/preference.ts rename to packages/pds/src/actor-store/preference/preference.ts index de2c5cb2a9c..48167c790e4 100644 --- a/packages/pds/src/actor-store/preference.ts +++ b/packages/pds/src/actor-store/preference/preference.ts @@ -1,34 +1,13 @@ import { InvalidRequestError } from '@atproto/xrpc-server' -import { ActorDb } from './actor-db' - -export class ActorPreference { - constructor(public db: ActorDb) {} - - static creator() { - return (db: ActorDb) => new ActorPreference(db) - } - - async getPreferences( - did: string, - namespace?: string, - ): Promise { - const prefsRes = await this.db.db - .selectFrom('user_pref') - .orderBy('id') - .selectAll() - .execute() - return prefsRes - .filter((pref) => !namespace || matchNamespace(namespace, pref.name)) - .map((pref) => JSON.parse(pref.valueJson)) - } +import { PreferenceReader, UserPreference, prefMatchNamespace } from './reader' +export class PreferenceTransactor extends PreferenceReader { async putPreferences( - did: string, values: UserPreference[], namespace: string, ): Promise { this.db.assertTransaction() - if (!values.every((value) => matchNamespace(namespace, value.$type))) { + if (!values.every((value) => prefMatchNamespace(namespace, value.$type))) { throw new InvalidRequestError( `Some preferences are not in the ${namespace} namespace`, ) @@ -40,13 +19,12 @@ export class ActorPreference { .execute() const putPrefs = values.map((value) => { return { - did, name: value.$type, valueJson: JSON.stringify(value), } }) const allPrefIdsInNamespace = allPrefs - .filter((pref) => matchNamespace(namespace, pref.name)) + .filter((pref) => prefMatchNamespace(namespace, pref.name)) .map((pref) => pref.id) // replace all prefs in given namespace if (allPrefIdsInNamespace.length) { @@ -60,9 +38,3 @@ export class ActorPreference { } } } - -export type UserPreference = Record & { $type: string } - -const matchNamespace = (namespace: string, fullname: string) => { - return fullname === namespace || fullname.startsWith(`${namespace}.`) -} diff --git a/packages/pds/src/actor-store/preference/reader.ts b/packages/pds/src/actor-store/preference/reader.ts new file mode 100644 index 00000000000..91d8d27d806 --- /dev/null +++ b/packages/pds/src/actor-store/preference/reader.ts @@ -0,0 +1,22 @@ +import { ActorDb } from '../actor-db' + +export class PreferenceReader { + constructor(public db: ActorDb) {} + + async getPreferences(namespace?: string): Promise { + const prefsRes = await this.db.db + .selectFrom('user_pref') + .orderBy('id') + .selectAll() + .execute() + return prefsRes + .filter((pref) => !namespace || prefMatchNamespace(namespace, pref.name)) + .map((pref) => JSON.parse(pref.valueJson)) + } +} + +export type UserPreference = Record & { $type: string } + +export const prefMatchNamespace = (namespace: string, fullname: string) => { + return fullname === namespace || fullname.startsWith(`${namespace}.`) +} diff --git a/packages/pds/src/actor-store/reader.ts b/packages/pds/src/actor-store/reader.ts deleted file mode 100644 index 64740930424..00000000000 --- a/packages/pds/src/actor-store/reader.ts +++ /dev/null @@ -1 +0,0 @@ -export class ActorStoreReader {} diff --git a/packages/pds/src/actor-store/record.ts b/packages/pds/src/actor-store/record/reader.ts similarity index 95% rename from packages/pds/src/actor-store/record.ts rename to packages/pds/src/actor-store/record/reader.ts index bd119412a40..710e7e44581 100644 --- a/packages/pds/src/actor-store/record.ts +++ b/packages/pds/src/actor-store/record/reader.ts @@ -1,18 +1,14 @@ import { AtUri, ensureValidAtUri } from '@atproto/syntax' import * as syntax from '@atproto/syntax' import { cborToLexRecord } from '@atproto/repo' -import { notSoftDeletedClause } from '../db/util' -import { ids } from '../lexicon/lexicons' -import { ActorDb, Backlink } from './actor-db' -import { prepareDelete } from '../repo' +import { notSoftDeletedClause } from '../../db/util' +import { ids } from '../../lexicon/lexicons' +import { ActorDb, Backlink } from '../actor-db' +import { prepareDelete } from '../../repo' -export class ActorRecord { +export class RecordReader { constructor(public db: ActorDb) {} - static creator() { - return (db: ActorDb) => new ActorRecord(db) - } - async listCollections(): Promise { const collections = await this.db.db .selectFrom('record') diff --git a/packages/pds/src/actor-store/repo/record-transactor.ts b/packages/pds/src/actor-store/record/transactor.ts similarity index 96% rename from packages/pds/src/actor-store/repo/record-transactor.ts rename to packages/pds/src/actor-store/record/transactor.ts index fcafb5b4807..f91a46963c6 100644 --- a/packages/pds/src/actor-store/repo/record-transactor.ts +++ b/packages/pds/src/actor-store/record/transactor.ts @@ -3,9 +3,9 @@ import { AtUri } from '@atproto/syntax' import { WriteOpAction } from '@atproto/repo' import { dbLogger as log } from '../../logger' import { Backlink } from '../actor-db' -import { ActorRecord, getBacklinks } from '../record' +import { RecordReader, getBacklinks } from './reader' -export class ActorRecordTransactor extends ActorRecord { +export class RecordTransactor extends RecordReader { async indexRecord( uri: AtUri, cid: CID, diff --git a/packages/pds/src/actor-store/repo/index.ts b/packages/pds/src/actor-store/repo/index.ts deleted file mode 100644 index 1e1d1c43d32..00000000000 --- a/packages/pds/src/actor-store/repo/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -export * from './repo' -export * from './record-transactor' -export * from './sql-repo-storage' -export * from './blob' diff --git a/packages/pds/src/actor-store/repo/reader.ts b/packages/pds/src/actor-store/repo/reader.ts new file mode 100644 index 00000000000..897c22a1106 --- /dev/null +++ b/packages/pds/src/actor-store/repo/reader.ts @@ -0,0 +1,17 @@ +import { BlobStore } from '@atproto/repo' +import { SqlRepoReader } from './sql-repo-reader' +import { BlobReader } from '../blob/reader' +import { ActorDb } from '../actor-db' +import { RecordReader } from '../record/reader' + +export class RepoReader { + blob: BlobReader + record: RecordReader + storage: SqlRepoReader + + constructor(public db: ActorDb, public blobstore: BlobStore) { + this.blob = new BlobReader(db, blobstore) + this.record = new RecordReader(db) + this.storage = new SqlRepoReader(db) + } +} diff --git a/packages/pds/src/actor-store/repo/sql-repo-storage.ts b/packages/pds/src/actor-store/repo/sql-repo-reader.ts similarity index 63% rename from packages/pds/src/actor-store/repo/sql-repo-storage.ts rename to packages/pds/src/actor-store/repo/sql-repo-reader.ts index 42ec07c9acf..4ff0aa08228 100644 --- a/packages/pds/src/actor-store/repo/sql-repo-storage.ts +++ b/packages/pds/src/actor-store/repo/sql-repo-reader.ts @@ -1,6 +1,4 @@ import { - CommitData, - RepoStorage, BlockMap, CidSet, ReadableBlockstore, @@ -8,13 +6,14 @@ import { } from '@atproto/repo' import { chunkArray } from '@atproto/common' import { CID } from 'multiformats/cid' -import { ActorDb, IpldBlock } from '../actor-db' +import { ActorDb } from '../actor-db' import { sql } from 'kysely' -export class SqlRepoStorage extends ReadableBlockstore implements RepoStorage { +export class SqlRepoReader extends ReadableBlockstore { cache: BlockMap = new BlockMap() + now: string - constructor(public db: ActorDb, public timestamp?: string) { + constructor(public db: ActorDb) { super() } @@ -37,19 +36,6 @@ export class SqlRepoStorage extends ReadableBlockstore implements RepoStorage { } } - // proactively cache all blocks from a particular commit (to prevent multiple roundtrips) - async cacheRev(rev: string): Promise { - const res = await this.db.db - .selectFrom('ipld_block') - .where('repoRev', '=', rev) - .select(['ipld_block.cid', 'ipld_block.content']) - .limit(15) - .execute() - for (const row of res) { - this.cache.set(CID.parse(row.cid), row.content) - } - } - async getBytes(cid: CID): Promise { const cached = this.cache.get(cid) if (cached) return cached @@ -93,72 +79,6 @@ export class SqlRepoStorage extends ReadableBlockstore implements RepoStorage { return { blocks, missing: missing.toList() } } - async putBlock(cid: CID, block: Uint8Array, rev: string): Promise { - this.db.assertTransaction() - await this.db.db - .insertInto('ipld_block') - .values({ - cid: cid.toString(), - repoRev: rev, - size: block.length, - content: block, - }) - .onConflict((oc) => oc.doNothing()) - .execute() - this.cache.set(cid, block) - } - - async putMany(toPut: BlockMap, rev: string): Promise { - this.db.assertTransaction() - const blocks: IpldBlock[] = [] - toPut.forEach((bytes, cid) => { - blocks.push({ - cid: cid.toString(), - repoRev: rev, - size: bytes.length, - content: bytes, - }) - this.cache.addMap(toPut) - }) - await Promise.all( - chunkArray(blocks, 500).map((batch) => - this.db.db - .insertInto('ipld_block') - .values(batch) - .onConflict((oc) => oc.doNothing()) - .execute(), - ), - ) - } - - async deleteMany(cids: CID[]) { - if (cids.length < 1) return - const cidStrs = cids.map((c) => c.toString()) - await this.db.db - .deleteFrom('ipld_block') - .where('cid', 'in', cidStrs) - .execute() - } - - async applyCommit(commit: CommitData) { - await Promise.all([ - this.updateRoot(commit.cid, commit.rev), - this.putMany(commit.newBlocks, commit.rev), - this.deleteMany(commit.removedCids.toList()), - ]) - } - - async updateRoot(cid: CID, rev: string): Promise { - await this.db.db - .insertInto('repo_root') - .values({ - cid: cid.toString(), - rev: rev, - indexedAt: this.getTimestamp(), - }) - .execute() - } - async getCarStream(since?: string) { const root = await this.getRoot() if (!root) { @@ -219,10 +139,6 @@ export class SqlRepoStorage extends ReadableBlockstore implements RepoStorage { return builder.execute() } - getTimestamp(): string { - return this.timestamp || new Date().toISOString() - } - async destroy(): Promise { throw new Error('Destruction of SQL repo storage not allowed at runtime') } @@ -233,6 +149,4 @@ type RevCursor = { rev: string } -export default SqlRepoStorage - export class RepoRootNotFoundError extends Error {} diff --git a/packages/pds/src/actor-store/repo/sql-repo-transactor.ts b/packages/pds/src/actor-store/repo/sql-repo-transactor.ts new file mode 100644 index 00000000000..1cd26db5ed1 --- /dev/null +++ b/packages/pds/src/actor-store/repo/sql-repo-transactor.ts @@ -0,0 +1,98 @@ +import { CommitData, RepoStorage, BlockMap } from '@atproto/repo' +import { chunkArray } from '@atproto/common' +import { CID } from 'multiformats/cid' +import { ActorDb, IpldBlock } from '../actor-db' +import { SqlRepoReader } from './sql-repo-reader' + +export class SqlRepoTransactor extends SqlRepoReader implements RepoStorage { + cache: BlockMap = new BlockMap() + now: string + + constructor(public db: ActorDb, now?: string) { + super(db) + this.now = now ?? new Date().toISOString() + } + + // proactively cache all blocks from a particular commit (to prevent multiple roundtrips) + async cacheRev(rev: string): Promise { + const res = await this.db.db + .selectFrom('ipld_block') + .where('repoRev', '=', rev) + .select(['ipld_block.cid', 'ipld_block.content']) + .limit(15) + .execute() + for (const row of res) { + this.cache.set(CID.parse(row.cid), row.content) + } + } + + async putBlock(cid: CID, block: Uint8Array, rev: string): Promise { + this.db.assertTransaction() + await this.db.db + .insertInto('ipld_block') + .values({ + cid: cid.toString(), + repoRev: rev, + size: block.length, + content: block, + }) + .onConflict((oc) => oc.doNothing()) + .execute() + this.cache.set(cid, block) + } + + async putMany(toPut: BlockMap, rev: string): Promise { + this.db.assertTransaction() + const blocks: IpldBlock[] = [] + toPut.forEach((bytes, cid) => { + blocks.push({ + cid: cid.toString(), + repoRev: rev, + size: bytes.length, + content: bytes, + }) + this.cache.addMap(toPut) + }) + await Promise.all( + chunkArray(blocks, 500).map((batch) => + this.db.db + .insertInto('ipld_block') + .values(batch) + .onConflict((oc) => oc.doNothing()) + .execute(), + ), + ) + } + + async deleteMany(cids: CID[]) { + if (cids.length < 1) return + const cidStrs = cids.map((c) => c.toString()) + await this.db.db + .deleteFrom('ipld_block') + .where('cid', 'in', cidStrs) + .execute() + } + + async applyCommit(commit: CommitData) { + await Promise.all([ + this.updateRoot(commit.cid, commit.rev), + this.putMany(commit.newBlocks, commit.rev), + this.deleteMany(commit.removedCids.toList()), + ]) + } + + async updateRoot(cid: CID, rev: string): Promise { + await this.db.db + .insertInto('repo_root') + .values({ + cid: cid.toString(), + rev: rev, + indexedAt: this.now, + }) + .execute() + } + + async destroy(): Promise { + throw new Error('Destruction of SQL repo storage not allowed at runtime') + } +} diff --git a/packages/pds/src/actor-store/repo/repo.ts b/packages/pds/src/actor-store/repo/transactor.ts similarity index 79% rename from packages/pds/src/actor-store/repo/repo.ts rename to packages/pds/src/actor-store/repo/transactor.ts index cf26bc01804..227a3b7de9d 100644 --- a/packages/pds/src/actor-store/repo/repo.ts +++ b/packages/pds/src/actor-store/repo/transactor.ts @@ -3,85 +3,79 @@ import * as crypto from '@atproto/crypto' import { BlobStore, CommitData, Repo, WriteOpAction } from '@atproto/repo' import { InvalidRequestError } from '@atproto/xrpc-server' import { AtUri } from '@atproto/syntax' -import SqlRepoStorage from './sql-repo-storage' +import { SqlRepoTransactor } from './sql-repo-transactor' import { BadCommitSwapError, BadRecordSwapError, PreparedCreate, PreparedWrite, } from '../../repo/types' -import { ActorBlob } from './blob' +import { BlobTransactor } from '../blob/transactor' import { createWriteToOp, writeToOp } from '../../repo' import { BackgroundQueue } from '../../background' import { ActorDb } from '../actor-db' -import { ActorRecordTransactor } from './record-transactor' +import { RecordTransactor } from '../record/transactor' +import { RepoReader } from './reader' -export class ActorRepo { - blobs: ActorBlob - record: ActorRecordTransactor +export class RepoTransactor extends RepoReader { + blob: BlobTransactor + record: RecordTransactor + storage: SqlRepoTransactor + now: string constructor( public db: ActorDb, public repoSigningKey: crypto.Keypair, public blobstore: BlobStore, public backgroundQueue: BackgroundQueue, + now?: string, ) { - this.blobs = new ActorBlob(db, blobstore, backgroundQueue) - this.record = new ActorRecordTransactor(db) + super(db, blobstore) + this.blob = new BlobTransactor(db, blobstore, backgroundQueue) + this.record = new RecordTransactor(db) + this.now = now ?? new Date().toISOString() + this.storage = new SqlRepoTransactor(db, this.now) } - static creator( - keypair: crypto.Keypair, - blobstore: BlobStore, - backgroundQueue: BackgroundQueue, - ) { - return (db: ActorDb) => - new ActorRepo(db, keypair, blobstore, backgroundQueue) - } - - async createRepo(writes: PreparedCreate[], now: string) { + async createRepo(writes: PreparedCreate[]) { this.db.assertTransaction() - const storage = new SqlRepoStorage(this.db, now) const writeOps = writes.map(createWriteToOp) const commit = await Repo.formatInitCommit( - storage, + this.storage, this.db.did, this.repoSigningKey, writeOps, ) await Promise.all([ - storage.applyCommit(commit), - this.indexWrites(writes, now), - this.blobs.processWriteBlobs(commit.rev, writes), + this.storage.applyCommit(commit), + this.indexWrites(writes), + this.blob.processWriteBlobs(commit.rev, writes), ]) // await this.afterWriteProcessing(did, commit, writes) } async processWrites(writes: PreparedWrite[], swapCommitCid?: CID) { this.db.assertTransaction() - const now = new Date().toISOString() - const storage = new SqlRepoStorage(this.db, now) - const commit = await this.formatCommit(storage, writes, swapCommitCid) + const commit = await this.formatCommit(writes, swapCommitCid) await Promise.all([ // persist the commit to repo storage - storage.applyCommit(commit), + this.storage.applyCommit(commit), // & send to indexing - this.indexWrites(writes, now, commit.rev), + this.indexWrites(writes, commit.rev), // process blobs - this.blobs.processWriteBlobs(commit.rev, writes), + this.blob.processWriteBlobs(commit.rev, writes), // do any other processing needed after write ]) // await this.afterWriteProcessing(did, commitData, writes) } async formatCommit( - storage: SqlRepoStorage, writes: PreparedWrite[], swapCommit?: CID, ): Promise { // this is not in a txn, so this won't actually hold the lock, // we just check if it is currently held by another txn - const currRoot = await storage.getRootDetailed() + const currRoot = await this.storage.getRootDetailed() if (!currRoot) { throw new InvalidRequestError(`No repo root found for ${this.db.did}`) } @@ -89,7 +83,7 @@ export class ActorRepo { throw new BadCommitSwapError(currRoot.cid) } // cache last commit since there's likely overlap - await storage.cacheRev(currRoot.rev) + await this.storage.cacheRev(currRoot.rev) const newRecordCids: CID[] = [] const delAndUpdateUris: AtUri[] = [] for (const write of writes) { @@ -119,7 +113,7 @@ export class ActorRepo { } } - const repo = await Repo.load(storage, currRoot.cid) + const repo = await Repo.load(this.storage, currRoot.cid) const writeOps = writes.map(writeToOp) const commit = await repo.formatCommit(writeOps, this.repoSigningKey) @@ -136,13 +130,15 @@ export class ActorRepo { // (for instance a record that was moved but cid stayed the same) const newRecordBlocks = commit.newBlocks.getMany(newRecordCids) if (newRecordBlocks.missing.length > 0) { - const missingBlocks = await storage.getBlocks(newRecordBlocks.missing) + const missingBlocks = await this.storage.getBlocks( + newRecordBlocks.missing, + ) commit.newBlocks.addMap(missingBlocks.blocks) } return commit } - async indexWrites(writes: PreparedWrite[], now: string, rev?: string) { + async indexWrites(writes: PreparedWrite[], rev?: string) { this.db.assertTransaction() await Promise.all( writes.map(async (write) => { @@ -156,7 +152,7 @@ export class ActorRepo { write.record, write.action, rev, - now, + this.now, ) } else if (write.action === WriteOpAction.Delete) { await this.record.deleteRecord(write.uri) diff --git a/packages/pds/src/api/com/atproto/repo/uploadBlob.ts b/packages/pds/src/api/com/atproto/repo/uploadBlob.ts index f6e2addb6b2..916467e3972 100644 --- a/packages/pds/src/api/com/atproto/repo/uploadBlob.ts +++ b/packages/pds/src/api/com/atproto/repo/uploadBlob.ts @@ -8,7 +8,7 @@ export default function (server: Server, ctx: AppContext) { const requester = auth.credentials.did const blob = await ctx.actorStore.transact(requester, (actorTxn) => { - return actorTxn.repo.blobs.addUntetheredBlob(input.encoding, input.body) + return actorTxn.repo.blob.addUntetheredBlob(input.encoding, input.body) }) return { diff --git a/packages/pds/src/api/com/atproto/sync/deprecated/getCheckout.ts b/packages/pds/src/api/com/atproto/sync/deprecated/getCheckout.ts index cbde7131c66..2e84700172f 100644 --- a/packages/pds/src/api/com/atproto/sync/deprecated/getCheckout.ts +++ b/packages/pds/src/api/com/atproto/sync/deprecated/getCheckout.ts @@ -1,11 +1,9 @@ import { InvalidRequestError } from '@atproto/xrpc-server' import { byteIterableToStream } from '@atproto/common' import { Server } from '../../../../../lexicon' -import SqlRepoStorage, { - RepoRootNotFoundError, -} from '../../../../../sql-repo-storage' import AppContext from '../../../../../context' import { isUserOrAdmin } from '../../../../../auth' +import { RepoRootNotFoundError } from '../../../../../actor-store/repo/sql-repo-reader' export default function (server: Server, ctx: AppContext) { server.com.atproto.sync.getCheckout({ @@ -22,7 +20,7 @@ export default function (server: Server, ctx: AppContext) { } } - const storage = new SqlRepoStorage(ctx.db, did) + const storage = ctx.actorStore.reader(did).repo.storage let carStream: AsyncIterable try { carStream = await storage.getCarStream() diff --git a/packages/pds/src/api/com/atproto/sync/deprecated/getHead.ts b/packages/pds/src/api/com/atproto/sync/deprecated/getHead.ts index acde9cebc38..b2ef485a269 100644 --- a/packages/pds/src/api/com/atproto/sync/deprecated/getHead.ts +++ b/packages/pds/src/api/com/atproto/sync/deprecated/getHead.ts @@ -1,6 +1,5 @@ import { InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../../../../lexicon' -import SqlRepoStorage from '../../../../../sql-repo-storage' import AppContext from '../../../../../context' import { isUserOrAdmin } from '../../../../../auth' @@ -21,7 +20,7 @@ export default function (server: Server, ctx: AppContext) { ) } } - const storage = new SqlRepoStorage(ctx.db, did) + const storage = ctx.actorStore.reader(did).repo.storage const root = await storage.getRoot() if (root === null) { throw new InvalidRequestError( diff --git a/packages/pds/src/api/com/atproto/sync/getBlob.ts b/packages/pds/src/api/com/atproto/sync/getBlob.ts index b92154af80f..f7bfc813e16 100644 --- a/packages/pds/src/api/com/atproto/sync/getBlob.ts +++ b/packages/pds/src/api/com/atproto/sync/getBlob.ts @@ -2,44 +2,26 @@ import { CID } from 'multiformats/cid' import { Server } from '../../../../lexicon' import AppContext from '../../../../context' import { InvalidRequestError } from '@atproto/xrpc-server' -import { notSoftDeletedClause } from '../../../../db/util' -import { isUserOrAdmin } from '../../../../auth' import { BlobNotFoundError } from '@atproto/repo' export default function (server: Server, ctx: AppContext) { server.com.atproto.sync.getBlob({ auth: ctx.optionalAccessOrRoleVerifier, - handler: async ({ params, res, auth }) => { - const { ref } = ctx.db.db.dynamic - const found = await ctx.db.db - .selectFrom('blob') - .selectAll() - .innerJoin('repo_root', 'repo_root.did', 'blob.creator') - .innerJoin('repo_blob', (join) => - join - .onRef('repo_blob.cid', '=', 'blob.cid') - .onRef('repo_blob.did', '=', 'blob.creator'), - ) - .where('blob.cid', '=', params.cid) - .where('blob.creator', '=', params.did) - .where(notSoftDeletedClause(ref('repo_blob'))) - .if(!isUserOrAdmin(auth, params.did), (qb) => - // takedown check for anyone other than an admin or the user - qb.where(notSoftDeletedClause(ref('repo_root'))), - ) - .executeTakeFirst() - if (!found) { - throw new InvalidRequestError('Blob not found') - } + handler: async ({ params, res }) => { + // @TODO verify repo is not taken down const cid = CID.parse(params.cid) - let blobStream + let found try { - blobStream = await ctx.blobstore.getStream(cid) + found = await ctx.actorStore.reader(params.did).repo.blob.getBlob(cid) } catch (err) { if (err instanceof BlobNotFoundError) { throw new InvalidRequestError('Blob not found') + } else { + throw err } - throw err + } + if (!found) { + throw new InvalidRequestError('Blob not found') } res.setHeader('content-length', found.size) res.setHeader('x-content-type-options', 'nosniff') @@ -47,7 +29,7 @@ export default function (server: Server, ctx: AppContext) { return { // @TODO better codegen for */* mimetype encoding: (found.mimeType || 'application/octet-stream') as '*/*', - body: blobStream, + body: found.stream, } }, }) diff --git a/packages/pds/src/api/com/atproto/sync/getBlocks.ts b/packages/pds/src/api/com/atproto/sync/getBlocks.ts index a4a85355f13..24b06d29f08 100644 --- a/packages/pds/src/api/com/atproto/sync/getBlocks.ts +++ b/packages/pds/src/api/com/atproto/sync/getBlocks.ts @@ -3,7 +3,6 @@ import { InvalidRequestError } from '@atproto/xrpc-server' import { byteIterableToStream } from '@atproto/common' import { blocksToCarStream } from '@atproto/repo' import { Server } from '../../../../lexicon' -import SqlRepoStorage from '../../../../sql-repo-storage' import AppContext from '../../../../context' import { isUserOrAdmin } from '../../../../auth' @@ -23,7 +22,7 @@ export default function (server: Server, ctx: AppContext) { } const cids = params.cids.map((c) => CID.parse(c)) - const storage = new SqlRepoStorage(ctx.db, did) + const storage = ctx.actorStore.reader(did).repo.storage const got = await storage.getBlocks(cids) if (got.missing.length > 0) { const missingStr = got.missing.map((c) => c.toString()) diff --git a/packages/pds/src/api/com/atproto/sync/getLatestCommit.ts b/packages/pds/src/api/com/atproto/sync/getLatestCommit.ts index 877db7806f4..cf1b33d088a 100644 --- a/packages/pds/src/api/com/atproto/sync/getLatestCommit.ts +++ b/packages/pds/src/api/com/atproto/sync/getLatestCommit.ts @@ -1,6 +1,5 @@ import { InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../../../lexicon' -import SqlRepoStorage from '../../../../sql-repo-storage' import AppContext from '../../../../context' import { isUserOrAdmin } from '../../../../auth' @@ -21,7 +20,7 @@ export default function (server: Server, ctx: AppContext) { ) } } - const storage = new SqlRepoStorage(ctx.db, did) + const storage = ctx.actorStore.reader(did).repo.storage const root = await storage.getRootDetailed() if (root === null) { throw new InvalidRequestError( diff --git a/packages/pds/src/api/com/atproto/sync/getRecord.ts b/packages/pds/src/api/com/atproto/sync/getRecord.ts index 817d7850cb6..b316e8edc20 100644 --- a/packages/pds/src/api/com/atproto/sync/getRecord.ts +++ b/packages/pds/src/api/com/atproto/sync/getRecord.ts @@ -2,7 +2,6 @@ import { CID } from 'multiformats/cid' import * as repo from '@atproto/repo' import { InvalidRequestError } from '@atproto/xrpc-server' import { Server } from '../../../../lexicon' -import SqlRepoStorage from '../../../../sql-repo-storage' import AppContext from '../../../../context' import { byteIterableToStream } from '@atproto/common' import { isUserOrAdmin } from '../../../../auth' @@ -21,7 +20,7 @@ export default function (server: Server, ctx: AppContext) { throw new InvalidRequestError(`Could not find repo for DID: ${did}`) } } - const storage = new SqlRepoStorage(ctx.db, did) + const storage = ctx.actorStore.reader(did).repo.storage const commit = params.commit ? CID.parse(params.commit) : await storage.getRoot() diff --git a/packages/pds/src/api/com/atproto/sync/getRepo.ts b/packages/pds/src/api/com/atproto/sync/getRepo.ts index 9037a2a3a9c..87bcaf4427f 100644 --- a/packages/pds/src/api/com/atproto/sync/getRepo.ts +++ b/packages/pds/src/api/com/atproto/sync/getRepo.ts @@ -1,11 +1,9 @@ import { InvalidRequestError } from '@atproto/xrpc-server' import { byteIterableToStream } from '@atproto/common' import { Server } from '../../../../lexicon' -import SqlRepoStorage, { - RepoRootNotFoundError, -} from '../../../../sql-repo-storage' import AppContext from '../../../../context' import { isUserOrAdmin } from '../../../../auth' +import { RepoRootNotFoundError } from '../../../../actor-store/repo/sql-repo-reader' export default function (server: Server, ctx: AppContext) { server.com.atproto.sync.getRepo({ @@ -22,7 +20,7 @@ export default function (server: Server, ctx: AppContext) { } } - const storage = new SqlRepoStorage(ctx.db, did) + const storage = ctx.actorStore.reader(did).repo.storage let carStream: AsyncIterable try { carStream = await storage.getCarStream(since) diff --git a/packages/pds/src/api/com/atproto/sync/listBlobs.ts b/packages/pds/src/api/com/atproto/sync/listBlobs.ts index 5beb4d5a0fd..b7869c23c7f 100644 --- a/packages/pds/src/api/com/atproto/sync/listBlobs.ts +++ b/packages/pds/src/api/com/atproto/sync/listBlobs.ts @@ -18,27 +18,15 @@ export default function (server: Server, ctx: AppContext) { } } - let builder = ctx.db.db - .selectFrom('repo_blob') - .where('did', '=', did) - .select('cid') - .orderBy('cid', 'asc') - .limit(limit) - if (since) { - builder = builder.where('repoRev', '>', since) - } - - if (cursor) { - builder = builder.where('cid', '>', cursor) - } - - const res = await builder.execute() + const blobCids = await ctx.actorStore + .reader(did) + .repo.blob.listBlobs({ since, limit, cursor }) return { encoding: 'application/json', body: { - cursor: res.at(-1)?.cid, - cids: res.map((row) => row.cid), + cursor: blobCids.at(-1), + cids: blobCids, }, } }, diff --git a/packages/pds/src/sql-repo-storage.ts b/packages/pds/src/sql-repo-storage.ts deleted file mode 100644 index 13301ae300f..00000000000 --- a/packages/pds/src/sql-repo-storage.ts +++ /dev/null @@ -1,286 +0,0 @@ -import { - CommitData, - RepoStorage, - BlockMap, - CidSet, - ReadableBlockstore, - writeCarStream, -} from '@atproto/repo' -import { chunkArray } from '@atproto/common' -import { CID } from 'multiformats/cid' -import Database from './db' -import { IpldBlock } from './db/tables/ipld-block' -import { ConcurrentWriteError } from './services/repo' -import { sql } from 'kysely' - -export class SqlRepoStorage extends ReadableBlockstore implements RepoStorage { - cache: BlockMap = new BlockMap() - - constructor( - public db: Database, - public did: string, - public timestamp?: string, - ) { - super() - } - - async lockRepo(): Promise { - if (this.db.dialect === 'sqlite') return true - return this.db.takeTxAdvisoryLock(this.did) - } - - async lockAvailable(): Promise { - if (this.db.dialect === 'sqlite') return true - return this.db.checkTxAdvisoryLock(this.did) - } - - async getRoot(): Promise { - const res = await this.db.db - .selectFrom('repo_root') - .selectAll() - .where('did', '=', this.did) - .executeTakeFirst() - if (!res) return null - return CID.parse(res.root) - } - - async getRootDetailed(): Promise<{ cid: CID; rev: string } | null> { - const res = await this.db.db - .selectFrom('repo_root') - .selectAll() - .where('did', '=', this.did) - .executeTakeFirst() - if (!res) return null - return { - cid: CID.parse(res.root), - rev: res.rev ?? '', // @TODO add not-null constraint to rev - } - } - - // proactively cache all blocks from a particular commit (to prevent multiple roundtrips) - async cacheRev(rev: string): Promise { - const res = await this.db.db - .selectFrom('ipld_block') - .where('creator', '=', this.did) - .where('repoRev', '=', rev) - .select(['ipld_block.cid', 'ipld_block.content']) - .limit(15) - .execute() - for (const row of res) { - this.cache.set(CID.parse(row.cid), row.content) - } - } - - async getBytes(cid: CID): Promise { - const cached = this.cache.get(cid) - if (cached) return cached - const found = await this.db.db - .selectFrom('ipld_block') - .where('ipld_block.creator', '=', this.did) - .where('ipld_block.cid', '=', cid.toString()) - .select('content') - .executeTakeFirst() - if (!found) return null - this.cache.set(cid, found.content) - return found.content - } - - async has(cid: CID): Promise { - const got = await this.getBytes(cid) - return !!got - } - - async getBlocks(cids: CID[]): Promise<{ blocks: BlockMap; missing: CID[] }> { - const cached = this.cache.getMany(cids) - if (cached.missing.length < 1) return cached - const missing = new CidSet(cached.missing) - const missingStr = cached.missing.map((c) => c.toString()) - const blocks = new BlockMap() - await Promise.all( - chunkArray(missingStr, 500).map(async (batch) => { - const res = await this.db.db - .selectFrom('ipld_block') - .where('ipld_block.creator', '=', this.did) - .where('ipld_block.cid', 'in', batch) - .select(['ipld_block.cid as cid', 'ipld_block.content as content']) - .execute() - for (const row of res) { - const cid = CID.parse(row.cid) - blocks.set(cid, row.content) - missing.delete(cid) - } - }), - ) - this.cache.addMap(blocks) - blocks.addMap(cached.blocks) - return { blocks, missing: missing.toList() } - } - - async putBlock(cid: CID, block: Uint8Array, rev: string): Promise { - this.db.assertTransaction() - await this.db.db - .insertInto('ipld_block') - .values({ - cid: cid.toString(), - creator: this.did, - repoRev: rev, - size: block.length, - content: block, - }) - .onConflict((oc) => oc.doNothing()) - .execute() - this.cache.set(cid, block) - } - - async putMany(toPut: BlockMap, rev: string): Promise { - this.db.assertTransaction() - const blocks: IpldBlock[] = [] - toPut.forEach((bytes, cid) => { - blocks.push({ - cid: cid.toString(), - creator: this.did, - repoRev: rev, - size: bytes.length, - content: bytes, - }) - this.cache.addMap(toPut) - }) - await Promise.all( - chunkArray(blocks, 500).map((batch) => - this.db.db - .insertInto('ipld_block') - .values(batch) - .onConflict((oc) => oc.doNothing()) - .execute(), - ), - ) - } - - async deleteMany(cids: CID[]) { - if (cids.length < 1) return - const cidStrs = cids.map((c) => c.toString()) - await this.db.db - .deleteFrom('ipld_block') - .where('creator', '=', this.did) - .where('cid', 'in', cidStrs) - .execute() - } - - async applyCommit(commit: CommitData) { - await Promise.all([ - this.updateRoot(commit.cid, commit.prev ?? undefined), - this.putMany(commit.newBlocks, commit.rev), - this.deleteMany(commit.removedCids.toList()), - ]) - } - - async updateRoot(cid: CID, ensureSwap?: CID): Promise { - if (ensureSwap) { - const res = await this.db.db - .updateTable('repo_root') - .set({ - root: cid.toString(), - indexedAt: this.getTimestamp(), - }) - .where('did', '=', this.did) - .where('root', '=', ensureSwap.toString()) - .executeTakeFirst() - if (res.numUpdatedRows < 1) { - throw new ConcurrentWriteError() - } - } else { - await this.db.db - .insertInto('repo_root') - .values({ - did: this.did, - root: cid.toString(), - indexedAt: this.getTimestamp(), - }) - .onConflict((oc) => - oc.column('did').doUpdateSet({ - root: cid.toString(), - indexedAt: this.getTimestamp(), - }), - ) - .execute() - } - } - - async getCarStream(since?: string) { - const root = await this.getRoot() - if (!root) { - throw new RepoRootNotFoundError() - } - return writeCarStream(root, async (car) => { - let cursor: RevCursor | undefined = undefined - const writeRows = async ( - rows: { cid: string; content: Uint8Array }[], - ) => { - for (const row of rows) { - await car.put({ - cid: CID.parse(row.cid), - bytes: row.content, - }) - } - } - // allow us to write to car while fetching the next page - let writePromise: Promise = Promise.resolve() - do { - const res = await this.getBlockRange(since, cursor) - await writePromise - writePromise = writeRows(res) - const lastRow = res.at(-1) - if (lastRow && lastRow.repoRev) { - cursor = { - cid: CID.parse(lastRow.cid), - rev: lastRow.repoRev, - } - } else { - cursor = undefined - } - } while (cursor) - // ensure we flush the last page of blocks - await writePromise - }) - } - - async getBlockRange(since?: string, cursor?: RevCursor) { - const { ref } = this.db.db.dynamic - let builder = this.db.db - .selectFrom('ipld_block') - .where('creator', '=', this.did) - .select(['cid', 'repoRev', 'content']) - .orderBy('repoRev', 'desc') - .orderBy('cid', 'desc') - .limit(500) - if (cursor) { - // use this syntax to ensure we hit the index - builder = builder.where( - sql`((${ref('repoRev')}, ${ref('cid')}) < (${ - cursor.rev - }, ${cursor.cid.toString()}))`, - ) - } - if (since) { - builder = builder.where('repoRev', '>', since) - } - return builder.execute() - } - - getTimestamp(): string { - return this.timestamp || new Date().toISOString() - } - - async destroy(): Promise { - throw new Error('Destruction of SQL repo storage not allowed at runtime') - } -} - -type RevCursor = { - cid: CID - rev: string -} - -export default SqlRepoStorage - -export class RepoRootNotFoundError extends Error {} diff --git a/packages/repo/src/sync/provider.ts b/packages/repo/src/sync/provider.ts index ef6a586b15a..215481c6aa0 100644 --- a/packages/repo/src/sync/provider.ts +++ b/packages/repo/src/sync/provider.ts @@ -3,7 +3,7 @@ import { BlockWriter } from '@ipld/car/writer' import { CID } from 'multiformats/cid' import CidSet from '../cid-set' import { MissingBlocksError } from '../error' -import { RepoStorage } from '../storage' +import { ReadableBlockstore, RepoStorage } from '../storage' import * as util from '../util' import { MST } from '../mst' @@ -26,7 +26,7 @@ export const getFullRepo = ( // ------------- export const getRecords = ( - storage: RepoStorage, + storage: ReadableBlockstore, commitCid: CID, paths: RecordPath[], ): AsyncIterable => {