From 6414311ed1dc0eeff15e7a2a666bd9119ea50d13 Mon Sep 17 00:00:00 2001 From: dholms Date: Wed, 4 Oct 2023 13:56:14 -0500 Subject: [PATCH 1/3] fix --- .../src/api/com/atproto/sync/subscribeRepos.ts | 15 ++++++++++----- packages/pds/src/sequencer/outbox.ts | 4 +--- packages/pds/src/sequencer/sequencer.ts | 12 ++++++++++++ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index 18eba6d41f0..0f98db5061c 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -15,24 +15,29 @@ export default function (server: Server, ctx: AppContext) { const backfillTime = new Date( Date.now() - ctx.cfg.subscription.repoBackfillLimitMs, ).toISOString() + let outboxCursor: number | undefined = undefined if (cursor !== undefined) { const [next, curr] = await Promise.all([ ctx.sequencer.next(cursor), ctx.sequencer.curr(), ]) - if (next && next.sequencedAt < backfillTime) { + if (cursor > (curr?.seq ?? 0)) { + throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') + } else if (next && next.sequencedAt < backfillTime) { + // if cursor is before backfill time, find earliest cursor from backfill window yield { $type: '#info', name: 'OutdatedCursor', message: 'Requested cursor exceeded limit. Possibly missing events', } - } - if (cursor > (curr?.seq ?? 0)) { - throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') + const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime) + outboxCursor = startEvt?.seq ?? undefined + } else { + outboxCursor = cursor } } - for await (const evt of outbox.events(cursor, backfillTime, signal)) { + for await (const evt of outbox.events(outboxCursor, signal)) { if (evt.type === 'commit') { yield { $type: '#commit', diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index d248099138c..0ee86075e81 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -30,12 +30,11 @@ export class Outbox { // immediately yield them async *events( backfillCursor?: number, - backFillTime?: string, signal?: AbortSignal, ): AsyncGenerator { // catch up as much as we can if (backfillCursor !== undefined) { - for await (const evt of this.getBackfill(backfillCursor, backFillTime)) { + for await (const evt of this.getBackfill(backfillCursor)) { if (signal?.aborted) return this.lastSeen = evt.seq yield evt @@ -67,7 +66,6 @@ export class Outbox { if (backfillCursor !== undefined) { const cutoverEvts = await this.sequencer.requestSeqRange({ earliestSeq: this.lastSeen > -1 ? this.lastSeen : backfillCursor, - earliestTime: backFillTime, }) this.outBuffer.pushMany(cutoverEvts) // dont worry about dupes, we ensure order on yield diff --git a/packages/pds/src/sequencer/sequencer.ts b/packages/pds/src/sequencer/sequencer.ts index 7c678bcc711..2e0ee59b60f 100644 --- a/packages/pds/src/sequencer/sequencer.ts +++ b/packages/pds/src/sequencer/sequencer.ts @@ -55,6 +55,18 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { return got || null } + async earliestAfterTime(time: string): Promise { + const got = await this.db.db + .selectFrom('repo_seq') + .selectAll() + .where('seq', 'is not', null) + .where('sequencedAt', '>', time) + .limit(1) + .orderBy('sequencedAt', 'desc') + .executeTakeFirst() + return got || null + } + async requestSeqRange(opts: { earliestSeq?: number latestSeq?: number From 2ee83f96a910dbea019570415f3947d9290f9082 Mon Sep 17 00:00:00 2001 From: dholms Date: Wed, 4 Oct 2023 13:58:03 -0500 Subject: [PATCH 2/3] tidy --- packages/pds/src/sequencer/outbox.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index 0ee86075e81..5ebdaf970a8 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -101,11 +101,10 @@ export class Outbox { } // yields only historical events - async *getBackfill(backfillCursor: number, backfillTime?: string) { + async *getBackfill(backfillCursor: number) { const PAGE_SIZE = 500 while (true) { const evts = await this.sequencer.requestSeqRange({ - earliestTime: backfillTime, earliestSeq: this.lastSeen > -1 ? this.lastSeen : backfillCursor, limit: PAGE_SIZE, }) From 9de92a8699aa9f86cae61cfb1e417611ae81f943 Mon Sep 17 00:00:00 2001 From: dholms Date: Wed, 4 Oct 2023 14:13:17 -0500 Subject: [PATCH 3/3] make tests --- packages/pds/src/api/com/atproto/sync/subscribeRepos.ts | 2 +- packages/pds/src/sequencer/sequencer.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index 0f98db5061c..7fc9be6f3ac 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -31,7 +31,7 @@ export default function (server: Server, ctx: AppContext) { message: 'Requested cursor exceeded limit. Possibly missing events', } const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime) - outboxCursor = startEvt?.seq ?? undefined + outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined } else { outboxCursor = cursor } diff --git a/packages/pds/src/sequencer/sequencer.ts b/packages/pds/src/sequencer/sequencer.ts index 2e0ee59b60f..23624800a41 100644 --- a/packages/pds/src/sequencer/sequencer.ts +++ b/packages/pds/src/sequencer/sequencer.ts @@ -60,9 +60,9 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { .selectFrom('repo_seq') .selectAll() .where('seq', 'is not', null) - .where('sequencedAt', '>', time) + .where('sequencedAt', '>=', time) + .orderBy('sequencedAt', 'asc') .limit(1) - .orderBy('sequencedAt', 'desc') .executeTakeFirst() return got || null }