diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index 18eba6d41f0..7fc9be6f3ac 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -15,24 +15,29 @@ export default function (server: Server, ctx: AppContext) { const backfillTime = new Date( Date.now() - ctx.cfg.subscription.repoBackfillLimitMs, ).toISOString() + let outboxCursor: number | undefined = undefined if (cursor !== undefined) { const [next, curr] = await Promise.all([ ctx.sequencer.next(cursor), ctx.sequencer.curr(), ]) - if (next && next.sequencedAt < backfillTime) { + if (cursor > (curr?.seq ?? 0)) { + throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') + } else if (next && next.sequencedAt < backfillTime) { + // if cursor is before backfill time, find earliest cursor from backfill window yield { $type: '#info', name: 'OutdatedCursor', message: 'Requested cursor exceeded limit. Possibly missing events', } - } - if (cursor > (curr?.seq ?? 0)) { - throw new InvalidRequestError('Cursor in the future.', 'FutureCursor') + const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime) + outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined + } else { + outboxCursor = cursor } } - for await (const evt of outbox.events(cursor, backfillTime, signal)) { + for await (const evt of outbox.events(outboxCursor, signal)) { if (evt.type === 'commit') { yield { $type: '#commit', diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index d248099138c..5ebdaf970a8 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -30,12 +30,11 @@ export class Outbox { // immediately yield them async *events( backfillCursor?: number, - backFillTime?: string, signal?: AbortSignal, ): AsyncGenerator { // catch up as much as we can if (backfillCursor !== undefined) { - for await (const evt of this.getBackfill(backfillCursor, backFillTime)) { + for await (const evt of this.getBackfill(backfillCursor)) { if (signal?.aborted) return this.lastSeen = evt.seq yield evt @@ -67,7 +66,6 @@ export class Outbox { if (backfillCursor !== undefined) { const cutoverEvts = await this.sequencer.requestSeqRange({ earliestSeq: this.lastSeen > -1 ? this.lastSeen : backfillCursor, - earliestTime: backFillTime, }) this.outBuffer.pushMany(cutoverEvts) // dont worry about dupes, we ensure order on yield @@ -103,11 +101,10 @@ export class Outbox { } // yields only historical events - async *getBackfill(backfillCursor: number, backfillTime?: string) { + async *getBackfill(backfillCursor: number) { const PAGE_SIZE = 500 while (true) { const evts = await this.sequencer.requestSeqRange({ - earliestTime: backfillTime, earliestSeq: this.lastSeen > -1 ? this.lastSeen : backfillCursor, limit: PAGE_SIZE, }) diff --git a/packages/pds/src/sequencer/sequencer.ts b/packages/pds/src/sequencer/sequencer.ts index 7c678bcc711..23624800a41 100644 --- a/packages/pds/src/sequencer/sequencer.ts +++ b/packages/pds/src/sequencer/sequencer.ts @@ -55,6 +55,18 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { return got || null } + async earliestAfterTime(time: string): Promise { + const got = await this.db.db + .selectFrom('repo_seq') + .selectAll() + .where('seq', 'is not', null) + .where('sequencedAt', '>=', time) + .orderBy('sequencedAt', 'asc') + .limit(1) + .executeTakeFirst() + return got || null + } + async requestSeqRange(opts: { earliestSeq?: number latestSeq?: number