diff --git a/packages/bsky/src/services/indexing/index.ts b/packages/bsky/src/services/indexing/index.ts index 60c465dc0fc..44dd9c3c986 100644 --- a/packages/bsky/src/services/indexing/index.ts +++ b/packages/bsky/src/services/indexing/index.ts @@ -7,6 +7,7 @@ import { verifyRepo, Commit, VerifiedRepo, + getAndParseRecord, } from '@atproto/repo' import { AtUri } from '@atproto/syntax' import { IdResolver, getPds } from '@atproto/identity' @@ -201,10 +202,11 @@ export class IndexingService { if (op.op === 'delete') { await this.deleteRecord(uri) } else { + const parsed = await getAndParseRecord(blocks, cid) await this.indexRecord( uri, cid, - op.value, + parsed.record, op.op === 'create' ? WriteOpAction.Create : WriteOpAction.Update, now, ) @@ -389,19 +391,15 @@ type UriAndCid = { cid: CID } -type RecordDescript = UriAndCid & { - value: unknown -} - type IndexOp = | ({ op: 'create' | 'update' - } & RecordDescript) + } & UriAndCid) | ({ op: 'delete' } & UriAndCid) const findDiffFromCheckout = ( curr: Record, - checkout: Record, + checkout: Record, ): IndexOp[] => { const ops: IndexOp[] = [] for (const uri of Object.keys(checkout)) { @@ -428,14 +426,13 @@ const findDiffFromCheckout = ( const formatCheckout = ( did: string, verifiedRepo: VerifiedRepo, -): Record => { - const records: Record = {} +): Record => { + const records: Record = {} for (const create of verifiedRepo.creates) { const uri = AtUri.make(did, create.collection, create.rkey) records[uri.toString()] = { uri, cid: create.cid, - value: create.record, } } return records diff --git a/packages/pds/src/actor-store/record/reader.ts b/packages/pds/src/actor-store/record/reader.ts index 57135fbd160..ed8d231f3cf 100644 --- a/packages/pds/src/actor-store/record/reader.ts +++ b/packages/pds/src/actor-store/record/reader.ts @@ -6,6 +6,7 @@ import { notSoftDeletedClause } from '../../db/util' import { ids } from '../../lexicon/lexicons' import { ActorDb, Backlink } from '../db' import { StatusAttr } from '../../lexicon/types/com/atproto/admin/defs' +import { RepoRecord } from '@atproto/lexicon' export class RecordReader { constructor(public db: ActorDb) {} @@ -170,7 +171,7 @@ export class RecordReader { // @NOTE this logic is a placeholder until we allow users to specify these constraints themselves. // Ensures that we don't end-up with duplicate likes, reposts, and follows from race conditions. - async getBacklinkConflicts(uri: AtUri, record: unknown): Promise { + async getBacklinkConflicts(uri: AtUri, record: RepoRecord): Promise { const recordBacklinks = getBacklinks(uri, record) const conflicts = await Promise.all( recordBacklinks.map((backlink) => @@ -190,7 +191,7 @@ export class RecordReader { // @NOTE in the future this can be replaced with a more generic routine that pulls backlinks based on lex docs. // For now we just want to ensure we're tracking links from follows, blocks, likes, and reposts. -export const getBacklinks = (uri: AtUri, record: unknown): Backlink[] => { +export const getBacklinks = (uri: AtUri, record: RepoRecord): Backlink[] => { if ( record?.['$type'] === ids.AppBskyGraphFollow || record?.['$type'] === ids.AppBskyGraphBlock @@ -217,7 +218,7 @@ export const getBacklinks = (uri: AtUri, record: unknown): Backlink[] => { record?.['$type'] === ids.AppBskyFeedRepost ) { const subject = record['subject'] - if (typeof subject['uri'] !== 'string') { + if (typeof subject?.['uri'] !== 'string') { return [] } try { @@ -229,7 +230,7 @@ export const getBacklinks = (uri: AtUri, record: unknown): Backlink[] => { { uri: uri.toString(), path: 'subject.uri', - linkTo: subject.uri, + linkTo: subject['uri'], }, ] } diff --git a/packages/pds/src/actor-store/record/transactor.ts b/packages/pds/src/actor-store/record/transactor.ts index 4ac2d48a6bd..562546d9738 100644 --- a/packages/pds/src/actor-store/record/transactor.ts +++ b/packages/pds/src/actor-store/record/transactor.ts @@ -5,6 +5,7 @@ import { dbLogger as log } from '../../logger' import { ActorDb, Backlink } from '../db' import { RecordReader, getBacklinks } from './reader' import { StatusAttr } from '../../lexicon/types/com/atproto/admin/defs' +import { RepoRecord } from '@atproto/lexicon' export class RecordTransactor extends RecordReader { constructor(public db: ActorDb, public blobstore: BlobStore) { @@ -14,13 +15,13 @@ export class RecordTransactor extends RecordReader { async indexRecord( uri: AtUri, cid: CID, - obj: unknown, + record: RepoRecord | null, action: WriteOpAction.Create | WriteOpAction.Update = WriteOpAction.Create, repoRev: string, timestamp?: string, ) { log.debug({ uri }, 'indexing record') - const record = { + const row = { uri: uri.toString(), cid: cid.toString(), collection: uri.collection, @@ -30,33 +31,35 @@ export class RecordTransactor extends RecordReader { } if (!uri.hostname.startsWith('did:')) { throw new Error('Expected indexed URI to contain DID') - } else if (record.collection.length < 1) { + } else if (row.collection.length < 1) { throw new Error('Expected indexed URI to contain a collection') - } else if (record.rkey.length < 1) { + } else if (row.rkey.length < 1) { throw new Error('Expected indexed URI to contain a record key') } // Track current version of record await this.db.db .insertInto('record') - .values(record) + .values(row) .onConflict((oc) => oc.column('uri').doUpdateSet({ - cid: record.cid, + cid: row.cid, repoRev: repoRev, - indexedAt: record.indexedAt, + indexedAt: row.indexedAt, }), ) .execute() - // Maintain backlinks - const backlinks = getBacklinks(uri, obj) - if (action === WriteOpAction.Update) { - // On update just recreate backlinks from scratch for the record, so we can clear out - // the old ones. E.g. for weird cases like updating a follow to be for a different did. - await this.removeBacklinksByUri(uri) + if (record !== null) { + // Maintain backlinks + const backlinks = getBacklinks(uri, record) + if (action === WriteOpAction.Update) { + // On update just recreate backlinks from scratch for the record, so we can clear out + // the old ones. E.g. for weird cases like updating a follow to be for a different did. + await this.removeBacklinksByUri(uri) + } + await this.addBacklinks(backlinks) } - await this.addBacklinks(backlinks) log.info({ uri }, 'indexed record') } diff --git a/packages/pds/src/api/com/atproto/temp/importRepo.ts b/packages/pds/src/api/com/atproto/temp/importRepo.ts index 896e822ba97..9fd6c0bed69 100644 --- a/packages/pds/src/api/com/atproto/temp/importRepo.ts +++ b/packages/pds/src/api/com/atproto/temp/importRepo.ts @@ -6,8 +6,14 @@ import { CID } from 'multiformats/cid' import { InvalidRequestError } from '@atproto/xrpc-server' import { AsyncBuffer, TID, wait } from '@atproto/common' import { AtUri } from '@atproto/syntax' -import { Repo, WriteOpAction, readCarStream, verifyDiff } from '@atproto/repo' -import { BlobRef, LexValue } from '@atproto/lexicon' +import { + Repo, + WriteOpAction, + getAndParseRecord, + readCarStream, + verifyDiff, +} from '@atproto/repo' +import { BlobRef, LexValue, RepoRecord } from '@atproto/lexicon' import { Server } from '../../../../lexicon' import AppContext from '../../../../context' import { ActorStoreTransactor } from '../../../../actor-store' @@ -106,15 +112,22 @@ const importRepo = async ( if (write.action === WriteOpAction.Delete) { await actorStore.record.deleteRecord(uri) } else { + let parsedRecord: RepoRecord | null + try { + const parsed = await getAndParseRecord(blocks, write.cid) + parsedRecord = parsed.record + } catch { + parsedRecord = null + } const indexRecord = actorStore.record.indexRecord( uri, write.cid, - write.record, + parsedRecord, write.action, rev, now, ) - const recordBlobs = findBlobRefs(write.record) + const recordBlobs = findBlobRefs(parsedRecord) blobRefs = blobRefs.concat(recordBlobs) const blobValues = recordBlobs.map((cid) => ({ recordUri: uri.toString(), diff --git a/packages/repo/src/index.ts b/packages/repo/src/index.ts index 82ed2115ad9..111e3546264 100644 --- a/packages/repo/src/index.ts +++ b/packages/repo/src/index.ts @@ -2,6 +2,7 @@ export * from './block-map' export * from './cid-set' export * from './repo' export * from './mst' +export * from './parse' export * from './storage' export * from './sync' export * from './types' diff --git a/packages/repo/src/sync/consumer.ts b/packages/repo/src/sync/consumer.ts index 08ca98195f2..d87a712e1cf 100644 --- a/packages/repo/src/sync/consumer.ts +++ b/packages/repo/src/sync/consumer.ts @@ -60,7 +60,7 @@ export const verifyDiff = async ( signingKey, ) const diff = await DataDiff.of(updated.data, repo?.data ?? null) - const writes = await util.diffToWriteDescripts(diff, updateBlocks) + const writes = await util.diffToWriteDescripts(diff) const newBlocks = diff.newMstBlocks const leaves = updateBlocks.getMany(diff.newLeafCids.toList()) if (leaves.missing.length > 0) { diff --git a/packages/repo/src/types.ts b/packages/repo/src/types.ts index b867af77258..7aeaba03fca 100644 --- a/packages/repo/src/types.ts +++ b/packages/repo/src/types.ts @@ -96,16 +96,25 @@ export type RecordDeleteOp = { export type RecordWriteOp = RecordCreateOp | RecordUpdateOp | RecordDeleteOp -export type RecordCreateDescript = RecordCreateOp & { +export type RecordCreateDescript = { + action: WriteOpAction.Create + collection: string + rkey: string cid: CID } -export type RecordUpdateDescript = RecordUpdateOp & { +export type RecordUpdateDescript = { + action: WriteOpAction.Update + collection: string + rkey: string prev: CID cid: CID } -export type RecordDeleteDescript = RecordDeleteOp & { +export type RecordDeleteDescript = { + action: WriteOpAction.Delete + collection: string + rkey: string cid: CID } diff --git a/packages/repo/src/util.ts b/packages/repo/src/util.ts index 3c7a151a844..5d52eb565a5 100644 --- a/packages/repo/src/util.ts +++ b/packages/repo/src/util.ts @@ -30,7 +30,6 @@ import { WriteOpAction, } from './types' import BlockMap from './block-map' -import * as parse from './parse' import { Keypair } from '@atproto/crypto' import { Readable } from 'stream' @@ -133,30 +132,25 @@ export const readCarWithRoot = async ( export const diffToWriteDescripts = ( diff: DataDiff, - blocks: BlockMap, ): Promise => { return Promise.all([ ...diff.addList().map(async (add) => { const { collection, rkey } = parseDataKey(add.key) - const value = await parse.getAndParseRecord(blocks, add.cid) return { action: WriteOpAction.Create, collection, rkey, cid: add.cid, - record: value.record, } as RecordCreateDescript }), ...diff.updateList().map(async (upd) => { const { collection, rkey } = parseDataKey(upd.key) - const value = await parse.getAndParseRecord(blocks, upd.cid) return { action: WriteOpAction.Update, collection, rkey, cid: upd.cid, prev: upd.prev, - record: value.record, } as RecordUpdateDescript }), ...diff.deleteList().map((del) => { diff --git a/packages/repo/tests/sync.test.ts b/packages/repo/tests/sync.test.ts index 9c8597a0228..5018d8f58ef 100644 --- a/packages/repo/tests/sync.test.ts +++ b/packages/repo/tests/sync.test.ts @@ -4,6 +4,7 @@ import { Repo, RepoContents, RepoVerificationError, + getAndParseRecord, readCarWithRoot, } from '../src' import { MemoryBlockstore } from '../src/storage' @@ -47,7 +48,8 @@ describe('Repo Sync', () => { const contentsFromOps: RepoContents = {} for (const write of verified.creates) { contentsFromOps[write.collection] ??= {} - contentsFromOps[write.collection][write.rkey] = write.record + const parsed = await getAndParseRecord(car.blocks, write.cid) + contentsFromOps[write.collection][write.rkey] = parsed.record } expect(contentsFromOps).toEqual(repoData) })