Skip to content

Commit

Permalink
Sequencer backfill cursor fix (#1708)
Browse files Browse the repository at this point in the history
* fix

* tidy

* make tests
  • Loading branch information
dholms authored Oct 4, 2023
1 parent 26538a7 commit 27a8dee
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
15 changes: 10 additions & 5 deletions packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,29 @@ export default function (server: Server, ctx: AppContext) {
const backfillTime = new Date(
Date.now() - ctx.cfg.subscription.repoBackfillLimitMs,
).toISOString()
let outboxCursor: number | undefined = undefined
if (cursor !== undefined) {
const [next, curr] = await Promise.all([
ctx.sequencer.next(cursor),
ctx.sequencer.curr(),
])
if (next && next.sequencedAt < backfillTime) {
if (cursor > (curr?.seq ?? 0)) {
throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
} else if (next && next.sequencedAt < backfillTime) {
// if cursor is before backfill time, find earliest cursor from backfill window
yield {
$type: '#info',
name: 'OutdatedCursor',
message: 'Requested cursor exceeded limit. Possibly missing events',
}
}
if (cursor > (curr?.seq ?? 0)) {
throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
const startEvt = await ctx.sequencer.earliestAfterTime(backfillTime)
outboxCursor = startEvt?.seq ? startEvt.seq - 1 : undefined
} else {
outboxCursor = cursor
}
}

for await (const evt of outbox.events(cursor, backfillTime, signal)) {
for await (const evt of outbox.events(outboxCursor, signal)) {
if (evt.type === 'commit') {
yield {
$type: '#commit',
Expand Down
7 changes: 2 additions & 5 deletions packages/pds/src/sequencer/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ export class Outbox {
// immediately yield them
async *events(
backfillCursor?: number,
backFillTime?: string,
signal?: AbortSignal,
): AsyncGenerator<SeqEvt> {
// catch up as much as we can
if (backfillCursor !== undefined) {
for await (const evt of this.getBackfill(backfillCursor, backFillTime)) {
for await (const evt of this.getBackfill(backfillCursor)) {
if (signal?.aborted) return
this.lastSeen = evt.seq
yield evt
Expand Down Expand Up @@ -67,7 +66,6 @@ export class Outbox {
if (backfillCursor !== undefined) {
const cutoverEvts = await this.sequencer.requestSeqRange({
earliestSeq: this.lastSeen > -1 ? this.lastSeen : backfillCursor,
earliestTime: backFillTime,
})
this.outBuffer.pushMany(cutoverEvts)
// dont worry about dupes, we ensure order on yield
Expand Down Expand Up @@ -103,11 +101,10 @@ export class Outbox {
}

// yields only historical events
async *getBackfill(backfillCursor: number, backfillTime?: string) {
async *getBackfill(backfillCursor: number) {
const PAGE_SIZE = 500
while (true) {
const evts = await this.sequencer.requestSeqRange({
earliestTime: backfillTime,
earliestSeq: this.lastSeen > -1 ? this.lastSeen : backfillCursor,
limit: PAGE_SIZE,
})
Expand Down
12 changes: 12 additions & 0 deletions packages/pds/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) {
return got || null
}

async earliestAfterTime(time: string): Promise<SeqRow | null> {
const got = await this.db.db
.selectFrom('repo_seq')
.selectAll()
.where('seq', 'is not', null)
.where('sequencedAt', '>=', time)
.orderBy('sequencedAt', 'asc')
.limit(1)
.executeTakeFirst()
return got || null
}

async requestSeqRange(opts: {
earliestSeq?: number
latestSeq?: number
Expand Down

0 comments on commit 27a8dee

Please sign in to comment.