Skip to content

Commit

Permalink
give sequencer its own db
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Oct 12, 2023
1 parent 8c966b3 commit 2688764
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 68 deletions.
7 changes: 2 additions & 5 deletions packages/dev-env/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,9 @@ export class TestNetwork extends TestNetworkNoAppView {

async processFullSubscription(timeout = 5000) {
const sub = this.bsky.indexer.sub
const { db } = this.pds.ctx.db
const start = Date.now()
const { lastSeq } = await db
.selectFrom('repo_seq')
.select(db.fn.max('repo_seq.seq').as('lastSeq'))
.executeTakeFirstOrThrow()
const lastSeq = await this.pds.ctx.sequencer.curr()
if (!lastSeq) return
while (Date.now() - start < timeout) {
const partitionState = sub.partitions.get(0)
if (partitionState?.cursor === lastSeq) {
Expand Down
1 change: 1 addition & 0 deletions packages/pds/bin/migration-create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export async function main() {
'Must pass a migration name consisting of lowercase digits, numbers, and dashes.',
)
}
console.log(name)
const filename = `${prefix}-${name}`
const dir = path.join(__dirname, '..', 'src', 'db', 'migrations')

Expand Down
2 changes: 1 addition & 1 deletion packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export default function (server: Server, ctx: AppContext) {
ctx.sequencer.next(cursor),
ctx.sequencer.curr(),
])
if (cursor > (curr?.seq ?? 0)) {
if (cursor > (curr ?? 0)) {
throw new InvalidRequestError('Cursor in the future.', 'FutureCursor')
} else if (next && next.sequencedAt < backfillTime) {
// if cursor is before backfill time, find earliest cursor from backfill window
Expand Down
5 changes: 4 additions & 1 deletion packages/pds/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ export class AppContext {
cfg.crawlers,
backgroundQueue,
)
const sequencer = new Sequencer(db, crawlers)
const sequencer = new Sequencer(
path.join(cfg.db.directory, 'repo_seq.sqlite'),
crawlers,
)
const redisScratch = cfg.redis
? getRedisClient(cfg.redis.address, cfg.redis.password)
: undefined
Expand Down
11 changes: 11 additions & 0 deletions packages/pds/src/sequencer/db/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Database, Migrator } from '../../db'
import { SequencerDbSchema } from './schema'
import * as migrations from './migrations'

export * from './schema'

export type SequencerDb = Database<SequencerDbSchema>

export const getMigrator = (db: Database<SequencerDbSchema>) => {
return new Migrator(db.db, migrations)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Kysely } from 'kysely'

export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable('repo_seq')
.addColumn('seq', 'integer', (col) => col.autoIncrement().primaryKey())
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('event', 'blob', (col) => col.notNull())
.addColumn('invalidated', 'int2', (col) => col.notNull().defaultTo(0))
.addColumn('sequencedAt', 'varchar', (col) => col.notNull())
.execute()
// for filtering seqs based on did
await db.schema
.createIndex('repo_seq_did_idx')
.on('repo_seq')
.column('did')
.execute()
// for filtering seqs based on event type
await db.schema
.createIndex('repo_seq_event_type_idx')
.on('repo_seq')
.column('eventType')
.execute()
// for entering into the seq stream at a particular time
await db.schema
.createIndex('repo_seq_sequenced_at_index')
.on('repo_seq')
.column('sequencedAt')
.execute()
}

export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable('repo_seq').execute()
}
5 changes: 5 additions & 0 deletions packages/pds/src/sequencer/db/migrations/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// NOTE this file can be edited by hand, but it is also appended to by the migration:create command.
// It's important that every migration is exported from here with the proper name. We'd simplify
// this with kysely's FileMigrationProvider, but it doesn't play nicely with the build process.

export * as _20231012T200556520Z from './20231012T200556520Z-init'
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ export type RepoSeqEventType =
| 'migrate'
| 'tombstone'

export const REPO_SEQ_SEQUENCE = 'repo_seq_sequence'

export interface RepoSeq {
seq: GeneratedAlways<number>
did: string
Expand All @@ -21,8 +19,6 @@ export interface RepoSeq {
export type RepoSeqInsert = Insertable<RepoSeq>
export type RepoSeqEntry = Selectable<RepoSeq>

export const tableName = 'repo_seq'

export type PartialDB = {
[tableName]: RepoSeq
export type SequencerDbSchema = {
repo_seq: RepoSeq
}
2 changes: 1 addition & 1 deletion packages/pds/src/sequencer/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from '@atproto/repo'
import { PreparedWrite } from '../repo'
import { CID } from 'multiformats/cid'
import { RepoSeqInsert } from '../service-db'
import { RepoSeqInsert } from './db'

export const formatSeqCommit = async (
did: string,
Expand Down
19 changes: 11 additions & 8 deletions packages/pds/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import EventEmitter from 'events'
import TypedEmitter from 'typed-emitter'
import { seqLogger as log } from '../logger'
import { SECOND, cborDecode, wait } from '@atproto/common'
import { CommitData } from '@atproto/repo'
import {
CommitEvt,
HandleEvt,
Expand All @@ -11,33 +12,35 @@ import {
formatSeqHandleUpdate,
formatSeqTombstone,
} from './events'
import { ServiceDb, RepoSeqEntry, RepoSeqInsert } from '../service-db'
import { CommitData } from '@atproto/repo'
import { SequencerDb, getMigrator, RepoSeqEntry, RepoSeqInsert } from './db'
import { PreparedWrite } from '../repo'
import { Crawlers } from '../crawlers'
import { Database } from '../db'

export * from './events'

export class Sequencer extends (EventEmitter as new () => SequencerEmitter) {
db: SequencerDb
destroyed = false
pollPromise: Promise<void> | null = null
triesWithNoResults = 0

constructor(
public db: ServiceDb,
dbLocation: string,
public crawlers: Crawlers,
public lastSeen = 0,
) {
super()
// note: this does not err when surpassed, just prints a warning to stderr
this.setMaxListeners(100)
this.db = Database.sqlite(dbLocation)
}

async start() {
const migrator = getMigrator(this.db)
await migrator.migrateToLatestOrThrow()
const curr = await this.curr()
if (curr) {
this.lastSeen = curr.seq ?? 0
}
this.lastSeen = curr ?? 0
if (this.pollPromise === null) {
this.pollPromise = this.pollDb()
}
Expand All @@ -51,14 +54,14 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) {
this.emit('close')
}

async curr(): Promise<SeqRow | null> {
async curr(): Promise<number | null> {
const got = await this.db.db
.selectFrom('repo_seq')
.selectAll()
.orderBy('seq', 'desc')
.limit(1)
.executeTakeFirst()
return got || null
return got?.seq ?? null
}

async next(cursor: number): Promise<SeqRow | null> {
Expand Down
10 changes: 1 addition & 9 deletions packages/pds/src/service-db/schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import * as appPassword from './app-password'
import * as inviteCode from './invite-code'
import * as emailToken from './email-token'
import * as moderation from './moderation'
import * as repoSeq from './repo-seq'
import * as appMigration from './app-migration'

export type DatabaseSchema = appMigration.PartialDB &
Expand All @@ -19,8 +18,7 @@ export type DatabaseSchema = appMigration.PartialDB &
didCache.PartialDB &
inviteCode.PartialDB &
emailToken.PartialDB &
moderation.PartialDB &
repoSeq.PartialDB
moderation.PartialDB

export type { UserAccount, UserAccountEntry } from './user-account'
export type { DidHandle } from './did-handle'
Expand All @@ -36,10 +34,4 @@ export type {
ModerationReport,
ModerationReportResolution,
} from './moderation'
export type {
RepoSeq,
RepoSeqEntry,
RepoSeqInsert,
RepoSeqEventType,
} from './repo-seq'
export type { AppMigration } from './app-migration'
41 changes: 4 additions & 37 deletions packages/pds/tests/sync/subscribe-repos.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import { TestNetworkNoAppView, SeedClient } from '@atproto/dev-env'
import AtpAgent from '@atproto/api'
import {
cborDecode,
HOUR,
MINUTE,
readFromGenerator,
wait,
} from '@atproto/common'
import { cborDecode, HOUR, readFromGenerator, wait } from '@atproto/common'
import { randomStr } from '@atproto/crypto'
import * as repo from '@atproto/repo'
import { readCar } from '@atproto/repo'
Expand All @@ -20,13 +14,11 @@ import {
import { AppContext } from '../../src'
import basicSeed from '../seeds/basic'
import { CID } from 'multiformats/cid'
import { ServiceDb } from '../../src/service-db'

describe('repo subscribe repos', () => {
let serverHost: string

let network: TestNetworkNoAppView
let db: ServiceDb
let ctx: AppContext

let agent: AtpAgent
Expand All @@ -45,7 +37,6 @@ describe('repo subscribe repos', () => {
})
serverHost = network.pds.url.replace('http://', '')
ctx = network.pds.ctx
db = network.pds.ctx.db
agent = network.pds.getClient()
sc = network.getSeedClient()
await basicSeed(sc)
Expand Down Expand Up @@ -111,25 +102,6 @@ describe('repo subscribe repos', () => {
return evts
}

const getAllEvents = (userDid: string, frames: Frame[]) => {
const types: unknown[] = []
for (const frame of frames) {
if (frame instanceof MessageFrame) {
if (
(frame.header.t === '#commit' &&
(frame.body as CommitEvt).repo === userDid) ||
(frame.header.t === '#handle' &&
(frame.body as HandleEvt).did === userDid) ||
(frame.header.t === '#tombstone' &&
(frame.body as TombstoneEvt).did === userDid)
) {
types.push(frame.body)
}
}
}
return types
}

const verifyCommitEvents = async (frames: Frame[]) => {
await verifyRepo(alice, getCommitEvents(alice, frames))
await verifyRepo(bob, getCommitEvents(bob, frames))
Expand Down Expand Up @@ -200,13 +172,8 @@ describe('repo subscribe repos', () => {
const isDone = async (evt: any) => {
if (evt === undefined) return false
if (evt instanceof ErrorFrame) return true
const curr = await db.db
.selectFrom('repo_seq')
.select('seq')
.limit(1)
.orderBy('seq', 'desc')
.executeTakeFirst()
return curr !== undefined && evt.body.seq === curr.seq
const curr = await ctx.sequencer.curr()
return evt.body.seq === curr
}

return readFromGenerator(gen, isDone, waitFor)
Expand Down Expand Up @@ -269,7 +236,7 @@ describe('repo subscribe repos', () => {
})

it('backfills only from provided cursor', async () => {
const seqs = await db.db
const seqs = await ctx.sequencer.db.db
.selectFrom('repo_seq')
.selectAll()
.orderBy('seq', 'asc')
Expand Down

0 comments on commit 2688764

Please sign in to comment.