Skip to content

Commit

Permalink
Fix misc issues with subscribeLabels stream output (#2244)
Browse files Browse the repository at this point in the history
fix misc issues with subscribeLabels stream output
  • Loading branch information
devinivy authored Feb 29, 2024
1 parent b3434d4 commit 4d062cb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
2 changes: 1 addition & 1 deletion packages/ozone/src/api/label/subscribeLabels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export default function (server: Server, ctx: AppContext) {
}

for await (const evt of outbox.events(cursor, signal)) {
yield evt
yield { $type: 'com.atproto.label.subscribeLabels#labels', ...evt }
}
})
}
8 changes: 6 additions & 2 deletions packages/ozone/src/mod-service/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ import { LabelRow } from '../db/schema/label'
import { Label } from '../lexicon/types/com/atproto/label/defs'

export const formatLabel = (row: LabelRow): Label => {
return {
const label: Label = {
src: row.src,
uri: row.uri,
cid: row.cid === '' ? undefined : row.cid,
val: row.val,
neg: row.neg,
cts: row.cts,
}
if (row.cid !== '') {
// @NOTE avoiding undefined values on label, which dag-cbor chokes on when serializing.
label.cid = row.cid
}
return label
}
39 changes: 39 additions & 0 deletions packages/ozone/tests/query-labels.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import AtpAgent from '@atproto/api'
import { TestNetwork } from '@atproto/dev-env'
import { DisconnectError, Subscription } from '@atproto/xrpc-server'
import { ids, lexicons } from '../src/lexicon/lexicons'
import { Label } from '../src/lexicon/types/com/atproto/label/defs'
import {
OutputSchema as LabelMessage,
isLabels,
} from '../src/lexicon/types/com/atproto/label/subscribeLabels'

describe('ozone query labels', () => {
let network: TestNetwork
Expand Down Expand Up @@ -121,4 +127,37 @@ describe('ozone query labels', () => {
labels.slice(0, 5),
)
})

describe('subscribeLabels', () => {
it('streams all labels from initial cursor.', async () => {
const ac = new AbortController()
let doneTimer: NodeJS.Timeout
const resetDoneTimer = () => {
clearTimeout(doneTimer)
doneTimer = setTimeout(() => ac.abort(new DisconnectError()), 100)
}
const sub = new Subscription({
signal: ac.signal,
service: agent.service.origin.replace('http://', 'ws://'),
method: ids.ComAtprotoLabelSubscribeLabels,
getParams() {
return { cursor: 0 }
},
validate(obj) {
return lexicons.assertValidXrpcMessage<LabelMessage>(
ids.ComAtprotoLabelSubscribeLabels,
obj,
)
},
})
const streamedLabels: Label[] = []
for await (const message of sub) {
resetDoneTimer()
if (isLabels(message)) {
streamedLabels.push(...message.labels)
}
}
expect(streamedLabels).toEqual(labels)
})
})
})

0 comments on commit 4d062cb

Please sign in to comment.