diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index 18eba6d41f0..0f98db5061c 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -15,24 +15,29 @@ export default function (server: Server, ctx: AppContext) { const backfillTime = new Date( Date.now() - ctx.cfg.subscription.repoBackfillLimitMs, ).toISOString() + let outboxCursor: number | undefined = undefined if (cursor !== undefined) { const [next, curr] = await Promise.all([ ctx.sequencer.next(cursor), ctx.sequencer.curr(), ]) - if (next && next.sequencedAt < backfillTime) { + if (cursor > (curr?.seq ?? 0)) { + throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') + } else if (next && next.sequencedAt < backfillTime) { + // if cursor is before backfill time, find earliest cursor from backfill window yield { $type: '#info', name: 'OutdatedCursor', message: 'Requested cursor exceeded limit. Possibly missing events', } - } - if (cursor > (curr?.seq ?? 0)) { - throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') + const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime) + outboxCursor = startEvt?.seq ?? undefined + } else { + outboxCursor = cursor } } - for await (const evt of outbox.events(cursor, backfillTime, signal)) { + for await (const evt of outbox.events(outboxCursor, signal)) { if (evt.type === 'commit') { yield { $type: '#commit', diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index d248099138c..0ee86075e81 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -30,12 +30,11 @@ export class Outbox { // immediately yield them async *events( backfillCursor?: number, - backFillTime?: string, signal?: AbortSignal, ): AsyncGenerator { // catch up as much as we can if (backfillCursor !== undefined) { - for await (const evt of this.getBackfill(backfillCursor, backFillTime)) { + for await (const evt of this.getBackfill(backfillCursor)) { if (signal?.aborted) return this.lastSeen = evt.seq yield evt @@ -67,7 +66,6 @@ export class Outbox { if (backfillCursor !== undefined) { const cutoverEvts = await this.sequencer.requestSeqRange({ earliestSeq: this.lastSeen > -1 ? this.lastSeen : backfillCursor, - earliestTime: backFillTime, }) this.outBuffer.pushMany(cutoverEvts) // dont worry about dupes, we ensure order on yield diff --git a/packages/pds/src/sequencer/sequencer.ts b/packages/pds/src/sequencer/sequencer.ts index 7c678bcc711..2e0ee59b60f 100644 --- a/packages/pds/src/sequencer/sequencer.ts +++ b/packages/pds/src/sequencer/sequencer.ts @@ -55,6 +55,18 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { return got || null } + async earliestAfterTime(time: string): Promise { + const got = await this.db.db + .selectFrom('repo_seq') + .selectAll() + .where('seq', 'is not', null) + .where('sequencedAt', '>', time) + .limit(1) + .orderBy('sequencedAt', 'desc') + .executeTakeFirst() + return got || null + } + async requestSeqRange(opts: { earliestSeq?: number latestSeq?: number