Skip to content

Commit

Permalink
remove unused db primary/replica/coordinator from bsky dataplane
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy committed Dec 29, 2023
1 parent b40e8c6 commit 9b2153f
Show file tree
Hide file tree
Showing 38 changed files with 192 additions and 398 deletions.
6 changes: 3 additions & 3 deletions packages/bsky/src/data-plane/server/background.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import PQueue from 'p-queue'
import { PrimaryDatabase } from './db'
import { Database } from './db'
import { dbLogger } from '../../logger'

// A simple queue for in-process, out-of-band/backgrounded work

export class BackgroundQueue {
queue = new PQueue()
destroyed = false
constructor(public db: PrimaryDatabase) {}
constructor(public db: Database) {}

add(task: Task) {
if (this.destroyed) {
Expand All @@ -32,4 +32,4 @@ export class BackgroundQueue {
}
}

type Task = (db: PrimaryDatabase) => Promise<void>
type Task = (db: Database) => Promise<void>
107 changes: 0 additions & 107 deletions packages/bsky/src/data-plane/server/db/coordinator.ts

This file was deleted.

117 changes: 112 additions & 5 deletions packages/bsky/src/data-plane/server/db/db.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,40 @@
import assert from 'assert'
import { Kysely, PostgresDialect } from 'kysely'
import EventEmitter from 'events'
import {
Kysely,
KyselyPlugin,
Migrator,
PluginTransformQueryArgs,
PluginTransformResultArgs,
PostgresDialect,
QueryResult,
RootOperationNode,
UnknownRow,
} from 'kysely'
import TypedEmitter from 'typed-emitter'
import { Pool as PgPool, types as pgTypes } from 'pg'
import * as migrations from './migrations'
import DatabaseSchema, { DatabaseSchemaType } from './database-schema'
import { PgOptions } from './types'
import { dbLogger } from '../../../logger'
import { CtxMigrationProvider } from './migrations/provider'

export class Database {
pool: PgPool
db: DatabaseSchema
migrator: Migrator
txEvt = new EventEmitter() as TxnEmitter
destroyed = false
isPrimary = false

constructor(
public opts: PgOptions,
instances?: { db: DatabaseSchema; pool: PgPool },
instances?: { db: DatabaseSchema; pool: PgPool; migrator: Migrator },
) {
// if instances are provided, use those
if (instances) {
this.db = instances.db
this.pool = instances.pool
this.migrator = instances.migrator
return
}

Expand Down Expand Up @@ -56,12 +72,42 @@ export class Database {
this.db = new Kysely<DatabaseSchemaType>({
dialect: new PostgresDialect({ pool }),
})
this.migrator = new Migrator({
db: this.db,
migrationTableSchema: opts.schema,
provider: new CtxMigrationProvider(migrations, 'pg'),
})
}

get schema(): string | undefined {
return this.opts.schema
}

async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> {
const leakyTxPlugin = new LeakyTxPlugin()
const { dbTxn, txRes } = await this.db
.withPlugin(leakyTxPlugin)
.transaction()
.execute(async (txn) => {
const dbTxn = new Database(this.opts, {
db: txn,
pool: this.pool,
migrator: this.migrator,
})
const txRes = await fn(dbTxn)
.catch(async (err) => {
leakyTxPlugin.endTx()
// ensure that all in-flight queries are flushed & the connection is open
await dbTxn.db.getExecutor().provideConnection(noopAsync)
throw err
})
.finally(() => leakyTxPlugin.endTx())
return { dbTxn, txRes }
})
dbTxn?.txEvt.emit('commit')
return txRes
}

get isTransaction() {
return this.db.isTransaction
}
Expand All @@ -74,8 +120,37 @@ export class Database {
assert(!this.isTransaction, 'Cannot be in a transaction')
}

asPrimary(): Database {
throw new Error('Primary db required')
onCommit(fn: () => void) {
this.assertTransaction()
this.txEvt.once('commit', fn)
}

async migrateToOrThrow(migration: string) {
if (this.schema) {
await this.db.schema.createSchema(this.schema).ifNotExists().execute()
}
const { error, results } = await this.migrator.migrateTo(migration)
if (error) {
throw error
}
if (!results) {
throw new Error('An unknown failure occurred while migrating')
}
return results
}

async migrateToLatestOrThrow() {
if (this.schema) {
await this.db.schema.createSchema(this.schema).ifNotExists().execute()
}
const { error, results } = await this.migrator.migrateToLatest()
if (error) {
throw error
}
if (!results) {
throw new Error('An unknown failure occurred while migrating')
}
return results
}

async close(): Promise<void> {
Expand All @@ -89,3 +164,35 @@ export default Database

const onPoolError = (err: Error) => dbLogger.error({ err }, 'db pool error')
const onClientError = (err: Error) => dbLogger.error({ err }, 'db client error')

// utils
// -------

class LeakyTxPlugin implements KyselyPlugin {
private txOver: boolean

endTx() {
this.txOver = true
}

transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
if (this.txOver) {
throw new Error('tx already failed')
}
return args.node
}

async transformResult(
args: PluginTransformResultArgs,
): Promise<QueryResult<UnknownRow>> {
return args.result
}
}

type TxnEmitter = TypedEmitter<TxnEvents>

type TxnEvents = {
commit: () => void
}

const noopAsync = async () => {}
2 changes: 0 additions & 2 deletions packages/bsky/src/data-plane/server/db/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
export * from './primary'
export * from './db'
export * from './coordinator'
Loading

0 comments on commit 9b2153f

Please sign in to comment.