diff --git a/packages/ozone/src/api/label/subscribeLabels.ts b/packages/ozone/src/api/label/subscribeLabels.ts index 7efb339d488..701405f9dad 100644 --- a/packages/ozone/src/api/label/subscribeLabels.ts +++ b/packages/ozone/src/api/label/subscribeLabels.ts @@ -19,7 +19,7 @@ export default function (server: Server, ctx: AppContext) { } for await (const evt of outbox.events(cursor, signal)) { - yield evt + yield { $type: 'com.atproto.label.subscribeLabels#labels', ...evt } } }) } diff --git a/packages/ozone/src/mod-service/util.ts b/packages/ozone/src/mod-service/util.ts index dbb35d080eb..ec4de26c3ad 100644 --- a/packages/ozone/src/mod-service/util.ts +++ b/packages/ozone/src/mod-service/util.ts @@ -2,12 +2,16 @@ import { LabelRow } from '../db/schema/label' import { Label } from '../lexicon/types/com/atproto/label/defs' export const formatLabel = (row: LabelRow): Label => { - return { + const label: Label = { src: row.src, uri: row.uri, - cid: row.cid === '' ? undefined : row.cid, val: row.val, neg: row.neg, cts: row.cts, } + if (row.cid !== '') { + // @NOTE avoiding undefined values on label, which dag-cbor chokes on when serializing. + label.cid = row.cid + } + return label } diff --git a/packages/ozone/tests/query-labels.test.ts b/packages/ozone/tests/query-labels.test.ts index a3b51d30caa..2b4bf540450 100644 --- a/packages/ozone/tests/query-labels.test.ts +++ b/packages/ozone/tests/query-labels.test.ts @@ -1,6 +1,12 @@ import AtpAgent from '@atproto/api' import { TestNetwork } from '@atproto/dev-env' +import { DisconnectError, Subscription } from '@atproto/xrpc-server' +import { ids, lexicons } from '../src/lexicon/lexicons' import { Label } from '../src/lexicon/types/com/atproto/label/defs' +import { + OutputSchema as LabelMessage, + isLabels, +} from '../src/lexicon/types/com/atproto/label/subscribeLabels' describe('ozone query labels', () => { let network: TestNetwork @@ -121,4 +127,37 @@ describe('ozone query labels', () => { labels.slice(0, 5), ) }) + + describe('subscribeLabels', () => { + it('streams all labels from initial cursor.', async () => { + const ac = new AbortController() + let doneTimer: NodeJS.Timeout + const resetDoneTimer = () => { + clearTimeout(doneTimer) + doneTimer = setTimeout(() => ac.abort(new DisconnectError()), 100) + } + const sub = new Subscription({ + signal: ac.signal, + service: agent.service.origin.replace('http://', 'ws://'), + method: ids.ComAtprotoLabelSubscribeLabels, + getParams() { + return { cursor: 0 } + }, + validate(obj) { + return lexicons.assertValidXrpcMessage( + ids.ComAtprotoLabelSubscribeLabels, + obj, + ) + }, + }) + const streamedLabels: Label[] = [] + for await (const message of sub) { + resetDoneTimer() + if (isLabels(message)) { + streamedLabels.push(...message.labels) + } + } + expect(streamedLabels).toEqual(labels) + }) + }) })