diff --git a/packages/pds/src/api/com/atproto/repo/applyWrites.ts b/packages/pds/src/api/com/atproto/repo/applyWrites.ts index 16f620a30fc..d93c29fa32f 100644 --- a/packages/pds/src/api/com/atproto/repo/applyWrites.ts +++ b/packages/pds/src/api/com/atproto/repo/applyWrites.ts @@ -62,11 +62,6 @@ export default function (server: Server, ctx: AppContext) { if (did !== auth.credentials.did) { throw new AuthRequiredError() } - if (validate === false) { - throw new InvalidRequestError( - 'Unvalidated writes are not yet supported.', - ) - } if (tx.writes.length > 200) { throw new InvalidRequestError('Too many writes. Max: 200') } diff --git a/packages/pds/src/api/com/atproto/repo/createRecord.ts b/packages/pds/src/api/com/atproto/repo/createRecord.ts index 7e3aead0c56..88d3b910d41 100644 --- a/packages/pds/src/api/com/atproto/repo/createRecord.ts +++ b/packages/pds/src/api/com/atproto/repo/createRecord.ts @@ -41,11 +41,6 @@ export default function (server: Server, ctx: AppContext) { if (did !== auth.credentials.did) { throw new AuthRequiredError() } - if (validate === false) { - throw new InvalidRequestError( - 'Unvalidated writes are not yet supported.', - ) - } const swapCommitCid = swapCommit ? CID.parse(swapCommit) : undefined let write: PreparedCreate diff --git a/packages/pds/src/api/com/atproto/repo/putRecord.ts b/packages/pds/src/api/com/atproto/repo/putRecord.ts index 97ee3e4819f..4e930365ff4 100644 --- a/packages/pds/src/api/com/atproto/repo/putRecord.ts +++ b/packages/pds/src/api/com/atproto/repo/putRecord.ts @@ -51,11 +51,6 @@ export default function (server: Server, ctx: AppContext) { if (did !== auth.credentials.did) { throw new AuthRequiredError() } - if (validate === false) { - throw new InvalidRequestError( - 'Unvalidated writes are not yet supported.', - ) - } const uri = AtUri.make(did, collection, rkey) const swapCommitCid = swapCommit ? CID.parse(swapCommit) : undefined diff --git a/packages/pds/src/repo/prepare.ts b/packages/pds/src/repo/prepare.ts index 0c311462d23..ffc8cd80843 100644 --- a/packages/pds/src/repo/prepare.ts +++ b/packages/pds/src/repo/prepare.ts @@ -4,12 +4,15 @@ import { ensureValidRecordKey, ensureValidDatetime, } from '@atproto/syntax' -import { MINUTE, TID, dataToCborBlock } from '@atproto/common' +import { TID, check, dataToCborBlock } from '@atproto/common' import { + BlobRef, + LexValue, LexiconDefNotFoundError, RepoRecord, ValidationError, lexToIpld, + untypedJsonBlobRef, } from '@atproto/lexicon' import { cborToLex, @@ -28,91 +31,12 @@ import { PreparedBlobRef, } from './types' import * as lex from '../lexicon/lexicons' -import { isMain as isExternalEmbed } from '../lexicon/types/app/bsky/embed/external' -import { isMain as isImagesEmbed } from '../lexicon/types/app/bsky/embed/images' -import { isMain as isRecordWithMediaEmbed } from '../lexicon/types/app/bsky/embed/recordWithMedia' import { isRecord as isFeedGenerator } from '../lexicon/types/app/bsky/feed/generator' -import { - Record as PostRecord, - isRecord as isPost, -} from '../lexicon/types/app/bsky/feed/post' +import { isRecord as isPost } from '../lexicon/types/app/bsky/feed/post' import { isTag } from '../lexicon/types/app/bsky/richtext/facet' import { isRecord as isList } from '../lexicon/types/app/bsky/graph/list' import { isRecord as isProfile } from '../lexicon/types/app/bsky/actor/profile' import { hasExplicitSlur } from '../handle/explicit-slurs' -import { InvalidRequestError } from '@atproto/xrpc-server' - -// @TODO do this dynamically off of schemas -export const blobsForWrite = (record: unknown): PreparedBlobRef[] => { - if (isProfile(record)) { - const doc = lex.schemaDict.AppBskyActorProfile - const refs: PreparedBlobRef[] = [] - if (record.avatar) { - refs.push({ - cid: record.avatar.ref, - mimeType: record.avatar.mimeType, - constraints: doc.defs.main.record.properties.avatar, - }) - } - if (record.banner) { - refs.push({ - cid: record.banner.ref, - mimeType: record.banner.mimeType, - constraints: doc.defs.main.record.properties.banner, - }) - } - return refs - } else if (isFeedGenerator(record)) { - const doc = lex.schemaDict.AppBskyFeedGenerator - if (!record.avatar) { - return [] - } - return [ - { - cid: record.avatar.ref, - mimeType: record.avatar.mimeType, - constraints: doc.defs.main.record.properties.avatar, - }, - ] - } else if (isList(record)) { - const doc = lex.schemaDict.AppBskyGraphList - if (!record.avatar) { - return [] - } - return [ - { - cid: record.avatar.ref, - mimeType: record.avatar.mimeType, - constraints: doc.defs.main.record.properties.avatar, - }, - ] - } else if (isPost(record)) { - const refs: PreparedBlobRef[] = [] - const embeds = separateEmbeds(record.embed) - for (const embed of embeds) { - if (isImagesEmbed(embed)) { - const doc = lex.schemaDict.AppBskyEmbedImages - for (let i = 0; i < embed.images.length || 0; i++) { - const img = embed.images[i] - refs.push({ - cid: img.image.ref, - mimeType: img.image.mimeType, - constraints: doc.defs.image.properties.image, - }) - } - } else if (isExternalEmbed(embed) && embed.external.thumb) { - const doc = lex.schemaDict.AppBskyEmbedExternal - refs.push({ - cid: embed.external.thumb.ref, - mimeType: embed.external.thumb.mimeType, - constraints: doc.defs.external.properties.thumb, - }) - } - } - return refs - } - return [] -} export const assertValidRecord = (record: Record) => { if (typeof record.$type !== 'string') { @@ -180,17 +104,6 @@ export const prepareCreate = async (opts: { } const nextRkey = TID.next() - if ( - collection === lex.ids.AppBskyFeedPost && - opts.rkey && - !rkeyIsInWindow(nextRkey, new TID(opts.rkey)) - ) { - // @TODO temporary. allowing a window supports creation of post and gate records at the same time. - throw new InvalidRequestError( - 'Custom rkeys for post records should be near the present.', - ) - } - const rkey = opts.rkey || nextRkey.toString() // @TODO: validate against Lexicon record 'key' type, not just overall recordkey syntax ensureValidRecordKey(rkey) @@ -201,17 +114,10 @@ export const prepareCreate = async (opts: { cid: await cidForSafeRecord(record), swapCid, record, - blobs: blobsForWrite(record), + blobs: blobsForWrite(record, validate), } } -// only allow PUTs to certain collections -const ALLOWED_PUTS = [ - lex.ids.AppBskyActorProfile, - lex.ids.AppBskyGraphList, - lex.ids.AppBskyFeedGenerator, -] - export const prepareUpdate = async (opts: { did: string collection: string @@ -221,15 +127,6 @@ export const prepareUpdate = async (opts: { validate?: boolean }): Promise => { const { did, collection, rkey, swapCid, validate = true } = opts - if (!ALLOWED_PUTS.includes(collection)) { - // @TODO temporary - throw new InvalidRequestError( - `Temporarily only accepting updates for collections: ${ALLOWED_PUTS.join( - ', ', - )}`, - ) - } - const record = setCollectionName(collection, opts.record, validate) if (validate) { assertValidRecord(record) @@ -241,7 +138,7 @@ export const prepareUpdate = async (opts: { cid: await cidForSafeRecord(record), swapCid, record, - blobs: blobsForWrite(record), + blobs: blobsForWrite(record, validate), } } @@ -292,16 +189,6 @@ export const writeToOp = (write: PreparedWrite): RecordWriteOp => { } } -function separateEmbeds(embed: PostRecord['embed']) { - if (!embed) { - return [] - } - if (isRecordWithMediaEmbed(embed)) { - return [{ $type: lex.ids.AppBskyEmbedRecord, ...embed.record }, embed.media] - } - return [embed] -} - async function cidForSafeRecord(record: RepoRecord) { try { const block = await dataToCborBlock(lexToIpld(record)) @@ -342,8 +229,92 @@ function assertNoExplicitSlurs(rkey: string, record: RepoRecord) { } } -// ensures two rkeys are not far apart -function rkeyIsInWindow(rkey1: TID, rkey2: TID) { - const ms = Math.abs(rkey1.timestamp() - rkey2.timestamp()) / 1000 - return ms < 10 * MINUTE +type FoundBlobRef = { + ref: BlobRef + path: string[] +} + +export const blobsForWrite = ( + record: RepoRecord, + validate: boolean, +): PreparedBlobRef[] => { + const refs = findBlobRefs(record) + const recordType = + typeof record['$type'] === 'string' ? record['$type'] : undefined + + for (const ref of refs) { + if (check.is(ref.ref.original, untypedJsonBlobRef)) { + throw new InvalidRecordError(`Legacy blob ref at '${ref.path.join('/')}'`) + } + } + + return refs.map(({ ref, path }) => ({ + cid: ref.ref, + mimeType: ref.mimeType, + constraints: + validate && recordType + ? CONSTRAINTS[recordType]?.[path.join('/')] ?? {} + : {}, + })) +} + +export const findBlobRefs = ( + val: LexValue, + path: string[] = [], + layer = 0, +): FoundBlobRef[] => { + if (layer > 32) { + return [] + } + // walk arrays + if (Array.isArray(val)) { + return val.flatMap((item) => findBlobRefs(item, path, layer + 1)) + } + // objects + if (val && typeof val === 'object') { + // convert blobs, leaving the original encoding so that we don't change CIDs on re-encode + if (val instanceof BlobRef) { + return [ + { + ref: val, + path, + }, + ] + } + // retain cids & bytes + if (CID.asCID(val) || val instanceof Uint8Array) { + return [] + } + return Object.entries(val).flatMap(([key, item]) => + findBlobRefs(item, [...path, key], layer + 1), + ) + } + // pass through + return [] +} + +const CONSTRAINTS = { + [lex.ids.AppBskyActorProfile]: { + avatar: + lex.schemaDict.AppBskyActorProfile.defs.main.record.properties.avatar, + banner: + lex.schemaDict.AppBskyActorProfile.defs.main.record.properties.banner, + }, + [lex.ids.AppBskyFeedGenerator]: { + avatar: + lex.schemaDict.AppBskyFeedGenerator.defs.main.record.properties.avatar, + }, + [lex.ids.AppBskyGraphList]: { + avatar: lex.schemaDict.AppBskyGraphList.defs.main.record.properties.avatar, + }, + [lex.ids.AppBskyFeedPost]: { + 'embed/images/image': + lex.schemaDict.AppBskyEmbedImages.defs.image.properties.image, + 'embed/external/thumb': + lex.schemaDict.AppBskyEmbedExternal.defs.external.properties.thumb, + 'embed/media/images/image': + lex.schemaDict.AppBskyEmbedImages.defs.image.properties.image, + 'embed/media/external/thumb': + lex.schemaDict.AppBskyEmbedExternal.defs.external.properties.thumb, + }, } diff --git a/packages/pds/tests/crud.test.ts b/packages/pds/tests/crud.test.ts index d70da099ce3..dd3f3f3b11f 100644 --- a/packages/pds/tests/crud.test.ts +++ b/packages/pds/tests/crud.test.ts @@ -483,32 +483,6 @@ describe('crud operations', () => { expect(rootRes2.data.rev).toEqual(rootRes1.data.rev) }) - it('temporarily only allows updates to profile', async () => { - const { repo } = bobAgent.api.com.atproto - const put = await repo.putRecord({ - repo: bob.did, - collection: ids.AppBskyGraphFollow, - rkey: TID.nextStr(), - record: { - subject: alice.did, - createdAt: new Date().toISOString(), - }, - }) - const edit = repo.putRecord({ - repo: bob.did, - collection: ids.AppBskyGraphFollow, - rkey: new AtUri(put.data.uri).rkey, - record: { - subject: bob.did, - createdAt: new Date().toISOString(), - }, - }) - - await expect(edit).rejects.toThrow( - 'Temporarily only accepting updates for collections: app.bsky.actor.profile, app.bsky.graph.list, app.bsky.feed.generator', - ) - }) - it('fails on user mismatch', async () => { const { repo } = aliceAgent.api.com.atproto const put = repo.putRecord({ @@ -638,6 +612,146 @@ describe('crud operations', () => { ) }) + describe('unvalidated writes', () => { + it('disallows creation of unknown lexicons when validate is set to true', async () => { + const attempt = aliceAgent.api.com.atproto.repo.createRecord({ + repo: alice.did, + collection: 'com.example.record', + record: { + blah: 'thing', + }, + }) + await expect(attempt).rejects.toThrow( + 'Lexicon not found: lex:com.example.record', + ) + }) + + it('allows creation of unknown lexicons when validate is set to false', async () => { + const res = await aliceAgent.api.com.atproto.repo.createRecord({ + repo: alice.did, + collection: 'com.example.record', + record: { + blah: 'thing', + }, + validate: false, + }) + const record = await ctx.actorStore.read(alice.did, (store) => + store.record.getRecord(new AtUri(res.data.uri), res.data.cid), + ) + expect(record?.value).toEqual({ + $type: 'com.example.record', + blah: 'thing', + }) + }) + + it('allows update of unknown lexicons when validate is set to false', async () => { + const createRes = await aliceAgent.api.com.atproto.repo.createRecord({ + repo: alice.did, + collection: 'com.example.record', + record: { + blah: 'thing', + }, + validate: false, + }) + const uri = new AtUri(createRes.data.uri) + const updateRes = await aliceAgent.api.com.atproto.repo.putRecord({ + repo: alice.did, + collection: 'com.example.record', + rkey: uri.rkey, + record: { + blah: 'something else', + }, + validate: false, + }) + const record = await ctx.actorStore.read(alice.did, (store) => + store.record.getRecord(uri, updateRes.data.cid), + ) + expect(record?.value).toEqual({ + $type: 'com.example.record', + blah: 'something else', + }) + }) + + it('correctly associates images with unknown record types', async () => { + const file = await fs.readFile( + '../dev-env/src/seed/img/key-portrait-small.jpg', + ) + const uploadedRes = await aliceAgent.api.com.atproto.repo.uploadBlob( + file, + { + encoding: 'image/jpeg', + }, + ) + + const res = await aliceAgent.api.com.atproto.repo.createRecord({ + repo: alice.did, + collection: 'com.example.record', + record: { + blah: 'thing', + image: uploadedRes.data.blob, + }, + validate: false, + }) + const record = await ctx.actorStore.read(alice.did, (store) => + store.record.getRecord(new AtUri(res.data.uri), res.data.cid), + ) + expect(record?.value).toMatchObject({ + $type: 'com.example.record', + blah: 'thing', + }) + const recordBlobs = await ctx.actorStore.read(alice.did, (store) => + store.db.db + .selectFrom('blob') + .innerJoin('record_blob', 'record_blob.blobCid', 'blob.cid') + .where('recordUri', '=', res.data.uri) + .selectAll() + .execute(), + ) + expect(recordBlobs.length).toBe(1) + expect(recordBlobs.at(0)?.cid).toBe(uploadedRes.data.blob.ref.toString()) + }) + + it('enforces record type constraint even when unvalidated', async () => { + const attempt = aliceAgent.api.com.atproto.repo.createRecord({ + repo: alice.did, + collection: 'com.example.record', + record: { + $type: 'com.example.other', + blah: 'thing', + }, + }) + await expect(attempt).rejects.toThrow( + 'Invalid $type: expected com.example.record, got com.example.other', + ) + }) + + it('enforces blob ref format even when unvalidated', async () => { + const file = await fs.readFile( + '../dev-env/src/seed/img/key-portrait-small.jpg', + ) + const uploadedRes = await aliceAgent.api.com.atproto.repo.uploadBlob( + file, + { + encoding: 'image/jpeg', + }, + ) + + const attempt = aliceAgent.api.com.atproto.repo.createRecord({ + repo: alice.did, + collection: 'com.example.record', + record: { + blah: 'thing', + image: { + cid: uploadedRes.data.blob.ref.toString(), + mimeType: uploadedRes.data.blob.mimeType, + }, + }, + validate: false, + }) + await expect(attempt).rejects.toThrow(`Legacy blob ref at 'image'`) + }) + }) + describe('compare-and-swap', () => { let recordCount = 0 // Ensures unique cids const postRecord = () => ({