diff --git a/packages/bsky/src/data-plane/server/background.ts b/packages/bsky/src/data-plane/server/background.ts index 0ce785b21ce..59d8ccf0ddf 100644 --- a/packages/bsky/src/data-plane/server/background.ts +++ b/packages/bsky/src/data-plane/server/background.ts @@ -1,5 +1,5 @@ import PQueue from 'p-queue' -import { PrimaryDatabase } from './db' +import { Database } from './db' import { dbLogger } from '../../logger' // A simple queue for in-process, out-of-band/backgrounded work @@ -7,7 +7,7 @@ import { dbLogger } from '../../logger' export class BackgroundQueue { queue = new PQueue() destroyed = false - constructor(public db: PrimaryDatabase) {} + constructor(public db: Database) {} add(task: Task) { if (this.destroyed) { @@ -32,4 +32,4 @@ export class BackgroundQueue { } } -type Task = (db: PrimaryDatabase) => Promise +type Task = (db: Database) => Promise diff --git a/packages/bsky/src/data-plane/server/db/coordinator.ts b/packages/bsky/src/data-plane/server/db/coordinator.ts deleted file mode 100644 index 25ef305ed5a..00000000000 --- a/packages/bsky/src/data-plane/server/db/coordinator.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { Migrator } from 'kysely' -import PrimaryDatabase from './primary' -import Database from './db' -import { PgOptions } from './types' -import { dbLogger } from '../../../logger' - -type ReplicaTag = 'timeline' | 'feed' | 'search' | 'thread' | '*' -type ReplicaOptions = PgOptions & { tags?: ReplicaTag[] } - -type CoordinatorOptions = { - schema?: string - primary: PgOptions - replicas?: ReplicaOptions[] -} - -type ReplicaGroup = { - dbs: Database[] - roundRobinIdx: number -} - -export class DatabaseCoordinator { - migrator: Migrator - destroyed = false - - private primary: PrimaryDatabase - private allReplicas: Database[] - private tagged: Record - private untagged: ReplicaGroup - private tagWarns = new Set() - - constructor(public opts: CoordinatorOptions) { - this.primary = new PrimaryDatabase({ - schema: opts.schema, - ...opts.primary, - }) - this.allReplicas = [] - this.tagged = {} - this.untagged = { - dbs: [], - roundRobinIdx: 0, - } - for (const cfg of opts.replicas ?? []) { - const db = new Database({ - schema: opts.schema, - ...cfg, - }) - this.allReplicas.push(db) - // setup different groups of replicas based on tag, each round-robins separately. - if (cfg.tags?.length) { - for (const tag of cfg.tags) { - if (tag === '*') { - this.untagged.dbs.push(db) - } else { - this.tagged[tag] ??= { - dbs: [], - roundRobinIdx: 0, - } - this.tagged[tag].dbs.push(db) - } - } - } else { - this.untagged.dbs.push(db) - } - } - // guarantee there is always a replica around to service any query, falling back to primary. - if (!this.untagged.dbs.length) { - if (this.allReplicas.length) { - this.untagged.dbs = [...this.allReplicas] - } else { - this.untagged.dbs = [this.primary] - } - } - } - - getPrimary(): PrimaryDatabase { - return this.primary - } - - getReplicas(): Database[] { - return this.allReplicas - } - - getReplica(tag?: ReplicaTag): Database { - if (tag && this.tagged[tag]) { - return nextDb(this.tagged[tag]) - } - if (tag && !this.tagWarns.has(tag)) { - this.tagWarns.add(tag) - dbLogger.warn({ tag }, 'no replica for tag, falling back to any replica') - } - return nextDb(this.untagged) - } - - async close(): Promise { - await Promise.all([ - this.primary.close(), - ...this.allReplicas.map((db) => db.close()), - ]) - } -} - -// @NOTE mutates group incrementing roundRobinIdx -const nextDb = (group: ReplicaGroup) => { - const db = group.dbs[group.roundRobinIdx] - group.roundRobinIdx = (group.roundRobinIdx + 1) % group.dbs.length - return db -} diff --git a/packages/bsky/src/data-plane/server/db/db.ts b/packages/bsky/src/data-plane/server/db/db.ts index 35fa6d0857a..6411938d69d 100644 --- a/packages/bsky/src/data-plane/server/db/db.ts +++ b/packages/bsky/src/data-plane/server/db/db.ts @@ -1,24 +1,40 @@ import assert from 'assert' -import { Kysely, PostgresDialect } from 'kysely' +import EventEmitter from 'events' +import { + Kysely, + KyselyPlugin, + Migrator, + PluginTransformQueryArgs, + PluginTransformResultArgs, + PostgresDialect, + QueryResult, + RootOperationNode, + UnknownRow, +} from 'kysely' +import TypedEmitter from 'typed-emitter' import { Pool as PgPool, types as pgTypes } from 'pg' +import * as migrations from './migrations' import DatabaseSchema, { DatabaseSchemaType } from './database-schema' import { PgOptions } from './types' import { dbLogger } from '../../../logger' +import { CtxMigrationProvider } from './migrations/provider' export class Database { pool: PgPool db: DatabaseSchema + migrator: Migrator + txEvt = new EventEmitter() as TxnEmitter destroyed = false - isPrimary = false constructor( public opts: PgOptions, - instances?: { db: DatabaseSchema; pool: PgPool }, + instances?: { db: DatabaseSchema; pool: PgPool; migrator: Migrator }, ) { // if instances are provided, use those if (instances) { this.db = instances.db this.pool = instances.pool + this.migrator = instances.migrator return } @@ -56,12 +72,42 @@ export class Database { this.db = new Kysely({ dialect: new PostgresDialect({ pool }), }) + this.migrator = new Migrator({ + db: this.db, + migrationTableSchema: opts.schema, + provider: new CtxMigrationProvider(migrations, 'pg'), + }) } get schema(): string | undefined { return this.opts.schema } + async transaction(fn: (db: Database) => Promise): Promise { + const leakyTxPlugin = new LeakyTxPlugin() + const { dbTxn, txRes } = await this.db + .withPlugin(leakyTxPlugin) + .transaction() + .execute(async (txn) => { + const dbTxn = new Database(this.opts, { + db: txn, + pool: this.pool, + migrator: this.migrator, + }) + const txRes = await fn(dbTxn) + .catch(async (err) => { + leakyTxPlugin.endTx() + // ensure that all in-flight queries are flushed & the connection is open + await dbTxn.db.getExecutor().provideConnection(noopAsync) + throw err + }) + .finally(() => leakyTxPlugin.endTx()) + return { dbTxn, txRes } + }) + dbTxn?.txEvt.emit('commit') + return txRes + } + get isTransaction() { return this.db.isTransaction } @@ -74,8 +120,37 @@ export class Database { assert(!this.isTransaction, 'Cannot be in a transaction') } - asPrimary(): Database { - throw new Error('Primary db required') + onCommit(fn: () => void) { + this.assertTransaction() + this.txEvt.once('commit', fn) + } + + async migrateToOrThrow(migration: string) { + if (this.schema) { + await this.db.schema.createSchema(this.schema).ifNotExists().execute() + } + const { error, results } = await this.migrator.migrateTo(migration) + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } + + async migrateToLatestOrThrow() { + if (this.schema) { + await this.db.schema.createSchema(this.schema).ifNotExists().execute() + } + const { error, results } = await this.migrator.migrateToLatest() + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results } async close(): Promise { @@ -89,3 +164,35 @@ export default Database const onPoolError = (err: Error) => dbLogger.error({ err }, 'db pool error') const onClientError = (err: Error) => dbLogger.error({ err }, 'db client error') + +// utils +// ------- + +class LeakyTxPlugin implements KyselyPlugin { + private txOver: boolean + + endTx() { + this.txOver = true + } + + transformQuery(args: PluginTransformQueryArgs): RootOperationNode { + if (this.txOver) { + throw new Error('tx already failed') + } + return args.node + } + + async transformResult( + args: PluginTransformResultArgs, + ): Promise> { + return args.result + } +} + +type TxnEmitter = TypedEmitter + +type TxnEvents = { + commit: () => void +} + +const noopAsync = async () => {} diff --git a/packages/bsky/src/data-plane/server/db/index.ts b/packages/bsky/src/data-plane/server/db/index.ts index 1c5886fb10e..1beb455f5e3 100644 --- a/packages/bsky/src/data-plane/server/db/index.ts +++ b/packages/bsky/src/data-plane/server/db/index.ts @@ -1,3 +1 @@ -export * from './primary' export * from './db' -export * from './coordinator' diff --git a/packages/bsky/src/data-plane/server/db/primary.ts b/packages/bsky/src/data-plane/server/db/primary.ts deleted file mode 100644 index 61b5765967a..00000000000 --- a/packages/bsky/src/data-plane/server/db/primary.ts +++ /dev/null @@ -1,184 +0,0 @@ -import EventEmitter from 'events' -import { - Migrator, - KyselyPlugin, - PluginTransformQueryArgs, - PluginTransformResultArgs, - RootOperationNode, - QueryResult, - UnknownRow, - sql, -} from 'kysely' -import { Pool as PgPool } from 'pg' -import TypedEmitter from 'typed-emitter' -import { wait } from '@atproto/common' -import DatabaseSchema from './database-schema' -import * as migrations from './migrations' -import { CtxMigrationProvider } from './migrations/provider' -import { dbLogger as log } from '../../../logger' -import { PgOptions } from './types' -import { Database } from './db' - -export class PrimaryDatabase extends Database { - migrator: Migrator - txEvt = new EventEmitter() as TxnEmitter - destroyed = false - isPrimary = true - - constructor( - public opts: PgOptions, - instances?: { db: DatabaseSchema; pool: PgPool }, - ) { - super(opts, instances) - this.migrator = new Migrator({ - db: this.db, - migrationTableSchema: opts.schema, - provider: new CtxMigrationProvider(migrations, 'pg'), - }) - } - - static is(db: Database): db is PrimaryDatabase { - return db.isPrimary - } - - asPrimary(): PrimaryDatabase { - return this - } - - async transaction(fn: (db: PrimaryDatabase) => Promise): Promise { - const leakyTxPlugin = new LeakyTxPlugin() - const { dbTxn, txRes } = await this.db - .withPlugin(leakyTxPlugin) - .transaction() - .execute(async (txn) => { - const dbTxn = new PrimaryDatabase(this.opts, { - db: txn, - pool: this.pool, - }) - const txRes = await fn(dbTxn) - .catch(async (err) => { - leakyTxPlugin.endTx() - // ensure that all in-flight queries are flushed & the connection is open - await dbTxn.db.getExecutor().provideConnection(noopAsync) - throw err - }) - .finally(() => leakyTxPlugin.endTx()) - return { dbTxn, txRes } - }) - dbTxn?.txEvt.emit('commit') - return txRes - } - - onCommit(fn: () => void) { - this.assertTransaction() - this.txEvt.once('commit', fn) - } - - async close(): Promise { - if (this.destroyed) return - await this.db.destroy() - this.destroyed = true - } - - async migrateToOrThrow(migration: string) { - if (this.schema) { - await this.db.schema.createSchema(this.schema).ifNotExists().execute() - } - const { error, results } = await this.migrator.migrateTo(migration) - if (error) { - throw error - } - if (!results) { - throw new Error('An unknown failure occurred while migrating') - } - return results - } - - async migrateToLatestOrThrow() { - if (this.schema) { - await this.db.schema.createSchema(this.schema).ifNotExists().execute() - } - const { error, results } = await this.migrator.migrateToLatest() - if (error) { - throw error - } - if (!results) { - throw new Error('An unknown failure occurred while migrating') - } - return results - } - - async maintainMaterializedViews(opts: { - views: string[] - intervalSec: number - signal: AbortSignal - }) { - const { views, intervalSec, signal } = opts - while (!signal.aborted) { - // super basic synchronization by agreeing when the intervals land relative to unix timestamp - const now = Date.now() - const intervalMs = 1000 * intervalSec - const nextIteration = Math.ceil(now / intervalMs) - const nextInMs = nextIteration * intervalMs - now - await wait(nextInMs) - if (signal.aborted) break - await Promise.all( - views.map(async (view) => { - try { - await this.refreshMaterializedView(view) - log.info( - { view, time: new Date().toISOString() }, - 'materialized view refreshed', - ) - } catch (err) { - log.error( - { view, err, time: new Date().toISOString() }, - 'materialized view refresh failed', - ) - } - }), - ) - } - } - - async refreshMaterializedView(view: string) { - const { ref } = this.db.dynamic - await sql`refresh materialized view concurrently ${ref(view)}`.execute( - this.db, - ) - } -} - -export default PrimaryDatabase - -// utils -// ------- - -class LeakyTxPlugin implements KyselyPlugin { - private txOver: boolean - - endTx() { - this.txOver = true - } - - transformQuery(args: PluginTransformQueryArgs): RootOperationNode { - if (this.txOver) { - throw new Error('tx already failed') - } - return args.node - } - - async transformResult( - args: PluginTransformResultArgs, - ): Promise> { - return args.result - } -} - -type TxnEmitter = TypedEmitter - -type TxnEvents = { - commit: () => void -} - -const noopAsync = async () => {} diff --git a/packages/bsky/src/data-plane/server/did-cache.ts b/packages/bsky/src/data-plane/server/did-cache.ts index 3fed82df5ed..2ffe3b5aa69 100644 --- a/packages/bsky/src/data-plane/server/did-cache.ts +++ b/packages/bsky/src/data-plane/server/did-cache.ts @@ -1,6 +1,6 @@ import PQueue from 'p-queue' import { CacheResult, DidCache, DidDocument } from '@atproto/identity' -import { PrimaryDatabase } from './db' +import { Database } from './db' import { excluded } from './db/util' import { dbLogger } from '../../logger' @@ -10,7 +10,7 @@ export class DidSqlCache implements DidCache { constructor( // @TODO perhaps could use both primary and non-primary. not high enough // throughput to matter right now. also may just move this over to redis before long! - public db: PrimaryDatabase, + public db: Database, public staleTTL: number, public maxTTL: number, ) { diff --git a/packages/bsky/src/data-plane/server/indexing/index.ts b/packages/bsky/src/data-plane/server/indexing/index.ts index 69fc73dc0e6..68f5ff8b721 100644 --- a/packages/bsky/src/data-plane/server/indexing/index.ts +++ b/packages/bsky/src/data-plane/server/indexing/index.ts @@ -13,7 +13,7 @@ import { AtUri } from '@atproto/syntax' import { IdResolver, getPds } from '@atproto/identity' import { DAY, HOUR } from '@atproto/common' import { ValidationError } from '@atproto/lexicon' -import { PrimaryDatabase } from '../db' +import { Database } from '../db' import { Actor } from '../db/tables/actor' import * as Post from './plugins/post' import * as Threadgate from './plugins/thread-gate' @@ -47,7 +47,7 @@ export class IndexingService { } constructor( - public db: PrimaryDatabase, + public db: Database, public idResolver: IdResolver, public background: BackgroundQueue, ) { @@ -66,7 +66,7 @@ export class IndexingService { } } - transact(txn: PrimaryDatabase) { + transact(txn: Database) { txn.assertTransaction() return new IndexingService(txn, this.idResolver, this.background) } diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/block.ts b/packages/bsky/src/data-plane/server/indexing/plugins/block.ts index feb94b3256f..ec4956a04f5 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/block.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/block.ts @@ -3,7 +3,7 @@ import { AtUri, normalizeDatetimeAlways } from '@atproto/syntax' import { CID } from 'multiformats/cid' import * as Block from '../../../../lexicon/types/app/bsky/graph/block' import * as lex from '../../../../lexicon/lexicons' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import RecordProcessor from '../processor' import { BackgroundQueue } from '../../background' @@ -71,7 +71,7 @@ const notifsForDelete = () => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/feed-generator.ts b/packages/bsky/src/data-plane/server/indexing/plugins/feed-generator.ts index 7af296fb26f..f3b82c75567 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/feed-generator.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/feed-generator.ts @@ -3,7 +3,7 @@ import { AtUri, normalizeDatetimeAlways } from '@atproto/syntax' import { CID } from 'multiformats/cid' import * as FeedGenerator from '../../../../lexicon/types/app/bsky/feed/generator' import * as lex from '../../../../lexicon/lexicons' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import RecordProcessor from '../processor' import { BackgroundQueue } from '../../background' @@ -70,7 +70,7 @@ export type PluginType = RecordProcessor< > export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/follow.ts b/packages/bsky/src/data-plane/server/indexing/plugins/follow.ts index f8f10069191..6f238755761 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/follow.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/follow.ts @@ -4,7 +4,7 @@ import { CID } from 'multiformats/cid' import * as Follow from '../../../../lexicon/types/app/bsky/graph/follow' import * as lex from '../../../../lexicon/lexicons' import RecordProcessor from '../processor' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { countAll, excluded } from '../../db/util' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import { BackgroundQueue } from '../../background' @@ -118,7 +118,7 @@ const updateAggregates = async (db: DatabaseSchema, follow: IndexedFollow) => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/like.ts b/packages/bsky/src/data-plane/server/indexing/plugins/like.ts index 849e0ed5bbb..98e9fc722f8 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/like.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/like.ts @@ -5,7 +5,7 @@ import * as Like from '../../../../lexicon/types/app/bsky/feed/like' import * as lex from '../../../../lexicon/lexicons' import RecordProcessor from '../processor' import { countAll, excluded } from '../../db/util' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import { BackgroundQueue } from '../../background' @@ -108,7 +108,7 @@ const updateAggregates = async (db: DatabaseSchema, like: IndexedLike) => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/list-block.ts b/packages/bsky/src/data-plane/server/indexing/plugins/list-block.ts index da6d1481553..09eabcdb9f4 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/list-block.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/list-block.ts @@ -3,7 +3,7 @@ import { AtUri, normalizeDatetimeAlways } from '@atproto/syntax' import { CID } from 'multiformats/cid' import * as ListBlock from '../../../../lexicon/types/app/bsky/graph/listblock' import * as lex from '../../../../lexicon/lexicons' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import RecordProcessor from '../processor' import { BackgroundQueue } from '../../background' @@ -71,7 +71,7 @@ const notifsForDelete = () => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/list-item.ts b/packages/bsky/src/data-plane/server/indexing/plugins/list-item.ts index 37b987f8f18..f2a43cff485 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/list-item.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/list-item.ts @@ -5,7 +5,7 @@ import { CID } from 'multiformats/cid' import * as ListItem from '../../../../lexicon/types/app/bsky/graph/listitem' import * as lex from '../../../../lexicon/lexicons' import RecordProcessor from '../processor' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import { BackgroundQueue } from '../../background' @@ -79,7 +79,7 @@ const notifsForDelete = () => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/list.ts b/packages/bsky/src/data-plane/server/indexing/plugins/list.ts index 52dc67e4e7d..f6deaf0a68e 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/list.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/list.ts @@ -5,7 +5,7 @@ import * as List from '../../../../lexicon/types/app/bsky/graph/list' import * as lex from '../../../../lexicon/lexicons' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import RecordProcessor from '../processor' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { BackgroundQueue } from '../../background' const lexId = lex.ids.AppBskyGraphList @@ -67,7 +67,7 @@ const notifsForDelete = () => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/post.ts b/packages/bsky/src/data-plane/server/indexing/plugins/post.ts index 1ed538a55b8..cc4121ab667 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/post.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/post.ts @@ -19,7 +19,7 @@ import * as lex from '../../../../lexicon/lexicons' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import RecordProcessor from '../processor' import { Notification } from '../../db/tables/notification' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { countAll, excluded } from '../../db/util' import { getAncestorsAndSelfQb, @@ -395,7 +395,7 @@ const updateAggregates = async (db: DatabaseSchema, postIdx: IndexedPost) => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/profile.ts b/packages/bsky/src/data-plane/server/indexing/plugins/profile.ts index 1a4f3804f55..18c9b54bbb9 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/profile.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/profile.ts @@ -4,7 +4,7 @@ import * as Profile from '../../../../lexicon/types/app/bsky/actor/profile' import * as lex from '../../../../lexicon/lexicons' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import RecordProcessor from '../processor' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { BackgroundQueue } from '../../background' const lexId = lex.ids.AppBskyActorProfile @@ -63,7 +63,7 @@ const notifsForDelete = () => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/repost.ts b/packages/bsky/src/data-plane/server/indexing/plugins/repost.ts index 8e6c794fcec..ec2e7754fb0 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/repost.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/repost.ts @@ -4,7 +4,7 @@ import { AtUri, normalizeDatetimeAlways } from '@atproto/syntax' import * as Repost from '../../../../lexicon/types/app/bsky/feed/repost' import * as lex from '../../../../lexicon/lexicons' import RecordProcessor from '../processor' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import { countAll, excluded } from '../../db/util' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' import { BackgroundQueue } from '../../background' @@ -133,7 +133,7 @@ const updateAggregates = async (db: DatabaseSchema, repost: IndexedRepost) => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/plugins/thread-gate.ts b/packages/bsky/src/data-plane/server/indexing/plugins/thread-gate.ts index c306602f973..0402fe8289f 100644 --- a/packages/bsky/src/data-plane/server/indexing/plugins/thread-gate.ts +++ b/packages/bsky/src/data-plane/server/indexing/plugins/thread-gate.ts @@ -4,7 +4,7 @@ import { CID } from 'multiformats/cid' import * as Threadgate from '../../../../lexicon/types/app/bsky/feed/threadgate' import * as lex from '../../../../lexicon/lexicons' import { DatabaseSchema, DatabaseSchemaType } from '../../db/database-schema' -import { PrimaryDatabase } from '../../db' +import { Database } from '../../db' import RecordProcessor from '../processor' import { BackgroundQueue } from '../../background' @@ -76,7 +76,7 @@ const notifsForDelete = () => { export type PluginType = RecordProcessor export const makePlugin = ( - db: PrimaryDatabase, + db: Database, background: BackgroundQueue, ): PluginType => { return new RecordProcessor(db, background, { diff --git a/packages/bsky/src/data-plane/server/indexing/processor.ts b/packages/bsky/src/data-plane/server/indexing/processor.ts index 341dd1ccb0a..77a8fbdf09f 100644 --- a/packages/bsky/src/data-plane/server/indexing/processor.ts +++ b/packages/bsky/src/data-plane/server/indexing/processor.ts @@ -4,7 +4,7 @@ import { AtUri } from '@atproto/syntax' import { chunkArray } from '@atproto/common' import { jsonStringToLex, stringifyLex } from '@atproto/lexicon' import { lexicons } from '../../../lexicon/lexicons' -import { PrimaryDatabase } from '../db' +import { Database } from '../db' import DatabaseSchema from '../db/database-schema' import { Notification } from '../db/tables/notification' import { BackgroundQueue } from '../background' @@ -40,7 +40,7 @@ export class RecordProcessor { collection: string db: DatabaseSchema constructor( - private appDb: PrimaryDatabase, + private appDb: Database, private background: BackgroundQueue, private params: RecordProcessorParams, ) { @@ -225,7 +225,7 @@ export class RecordProcessor { async handleNotifs(op: { deleted?: S; inserted?: S }) { let notifs: Notif[] = [] - const runOnCommit: ((db: PrimaryDatabase) => Promise)[] = [] + const runOnCommit: ((db: Database) => Promise)[] = [] if (op.deleted) { const forDelete = this.params.notifsForDelete( op.deleted, diff --git a/packages/bsky/src/data-plane/server/subscription/index.ts b/packages/bsky/src/data-plane/server/subscription/index.ts index 5054441b6b9..dddc07aa39c 100644 --- a/packages/bsky/src/data-plane/server/subscription/index.ts +++ b/packages/bsky/src/data-plane/server/subscription/index.ts @@ -17,7 +17,7 @@ import { OutputSchema as Message } from '../../../lexicon/types/com/atproto/sync import * as message from '../../../lexicon/types/com/atproto/sync/subscribeRepos' import { subLogger as log } from '../../../logger' import { IndexingService } from '../indexing' -import { PrimaryDatabase } from '../db' +import { Database } from '../db' import { ConsecutiveItem, ConsecutiveList, @@ -40,7 +40,7 @@ export class RepoSubscription { constructor( private opts: { service: string - db: PrimaryDatabase + db: Database idResolver: IdResolver background: BackgroundQueue }, diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index df8476f7f36..930e327d150 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -24,11 +24,7 @@ export * from './data-plane' export type { ServerConfigValues } from './config' export type { MountedAlgos } from './api/feed-gen/types' export { ServerConfig } from './config' -export { - Database, - PrimaryDatabase, - DatabaseCoordinator, -} from './data-plane/server/db' +export { Database } from './data-plane/server/db' export { Redis } from './redis' export { AppContext } from './context' export { makeAlgos } from './api/feed-gen' diff --git a/packages/bsky/tests/data-plane/db.test.ts b/packages/bsky/tests/data-plane/db.test.ts index 1b0787828ae..379906ab847 100644 --- a/packages/bsky/tests/data-plane/db.test.ts +++ b/packages/bsky/tests/data-plane/db.test.ts @@ -1,17 +1,17 @@ import { sql } from 'kysely' import { wait } from '@atproto/common' import { TestNetwork } from '@atproto/dev-env' -import { Database, PrimaryDatabase } from '../../src' +import { Database } from '../../src' describe('db', () => { let network: TestNetwork - let db: PrimaryDatabase + let db: Database beforeAll(async () => { network = await TestNetwork.create({ dbPostgresSchema: 'bsky_db', }) - db = network.bsky.db.getPrimary() + db = network.bsky.db }) afterAll(async () => { diff --git a/packages/bsky/tests/data-plane/did-cache.test.ts b/packages/bsky/tests/data-plane/did-cache.test.ts index e5192ed3d97..d90de832cd6 100644 --- a/packages/bsky/tests/data-plane/did-cache.test.ts +++ b/packages/bsky/tests/data-plane/did-cache.test.ts @@ -81,7 +81,7 @@ describe('did cache', () => { }) it('accurately reports expired dids & refreshes the cache', async () => { - const didCache = new DidSqlCache(network.bsky.db.getPrimary(), 1, 60000) + const didCache = new DidSqlCache(network.bsky.db, 1, 60000) const shortCacheResolver = new IdResolver({ plcUrl: network.bsky.ctx.cfg.didPlcUrl, didCache, @@ -110,7 +110,7 @@ describe('did cache', () => { }) it('does not return expired dids & refreshes the cache', async () => { - const didCache = new DidSqlCache(network.bsky.db.getPrimary(), 0, 1) + const didCache = new DidSqlCache(network.bsky.db, 0, 1) const shortExpireResolver = new IdResolver({ plcUrl: network.bsky.ctx.cfg.didPlcUrl, didCache, diff --git a/packages/bsky/tests/data-plane/duplicate-records.test.ts b/packages/bsky/tests/data-plane/duplicate-records.test.ts index d9570d03b37..da7287893ba 100644 --- a/packages/bsky/tests/data-plane/duplicate-records.test.ts +++ b/packages/bsky/tests/data-plane/duplicate-records.test.ts @@ -3,18 +3,18 @@ import { cidForCbor, TID } from '@atproto/common' import { WriteOpAction } from '@atproto/repo' import { TestNetwork } from '@atproto/dev-env' import * as lex from '../../src/lexicon/lexicons' -import { Database, PrimaryDatabase } from '../../src' +import { Database } from '../../src' describe('duplicate record', () => { let network: TestNetwork let did: string - let db: PrimaryDatabase + let db: Database beforeAll(async () => { network = await TestNetwork.create({ dbPostgresSchema: 'bsky_duplicates', }) - db = network.bsky.db.getPrimary() + db = network.bsky.db did = 'did:example:alice' }) diff --git a/packages/bsky/tests/data-plane/handle-invalidation.test.ts b/packages/bsky/tests/data-plane/handle-invalidation.test.ts index fa20a5f02bd..2f0df04e4ff 100644 --- a/packages/bsky/tests/data-plane/handle-invalidation.test.ts +++ b/packages/bsky/tests/data-plane/handle-invalidation.test.ts @@ -43,9 +43,8 @@ describe('handle invalidation', () => { const backdateIndexedAt = async (did: string) => { const TWO_DAYS_AGO = new Date(Date.now() - 2 * DAY).toISOString() - await network.bsky.db - .getPrimary() - .db.updateTable('actor') + await network.bsky.db.db + .updateTable('actor') .set({ indexedAt: TWO_DAYS_AGO }) .where('did', '=', did) .execute() diff --git a/packages/bsky/tests/data-plane/indexing.test.ts b/packages/bsky/tests/data-plane/indexing.test.ts index e141491dd34..1a84d7dc320 100644 --- a/packages/bsky/tests/data-plane/indexing.test.ts +++ b/packages/bsky/tests/data-plane/indexing.test.ts @@ -32,7 +32,7 @@ describe('indexing', () => { agent = network.bsky.getClient() pdsAgent = network.pds.getClient() sc = network.getSeedClient() - db = network.bsky.db.getPrimary() + db = network.bsky.db await usersSeed(sc) // Data in tests is not processed from subscription await network.processAll() diff --git a/packages/bsky/tests/data-plane/subscription/repo.test.ts b/packages/bsky/tests/data-plane/subscription/repo.test.ts index a80ab92aa33..42ee6da7e3d 100644 --- a/packages/bsky/tests/data-plane/subscription/repo.test.ts +++ b/packages/bsky/tests/data-plane/subscription/repo.test.ts @@ -29,7 +29,7 @@ describe('sync', () => { }) it('indexes permit history being replayed.', async () => { - const db = network.bsky.db.getPrimary() + const { db } = network.bsky // Generate some modifications and dupes const { alice, bob, carol, dan } = sc.dids @@ -97,9 +97,7 @@ describe('sync', () => { }) await network.processAll() // confirm jack was indexed as an actor despite the bad event - const actors = await dumpTable(network.bsky.db.getPrimary(), 'actor', [ - 'did', - ]) + const actors = await dumpTable(network.bsky.db, 'actor', ['did']) expect(actors.map((a) => a.handle)).toContain('jack.test') network.pds.ctx.sequencer.sequenceCommit = sequenceCommitOrig }) diff --git a/packages/bsky/tests/feed-generation.test.ts b/packages/bsky/tests/feed-generation.test.ts index 500b6e68379..23b0f2bc170 100644 --- a/packages/bsky/tests/feed-generation.test.ts +++ b/packages/bsky/tests/feed-generation.test.ts @@ -74,9 +74,8 @@ describe('feed generation', () => { { uri: feedUriBadPagination.toString(), order: 3 }, { uri: primeUri.toString(), order: 4 }, ] - await network.bsky.db - .getPrimary() - .db.insertInto('suggested_feed') + await network.bsky.db.db + .insertInto('suggested_feed') .values(feedSuggestions) .execute() }) diff --git a/packages/bsky/tests/seeds/basic.ts b/packages/bsky/tests/seeds/basic.ts index 87311b78ccc..e674d2243dc 100644 --- a/packages/bsky/tests/seeds/basic.ts +++ b/packages/bsky/tests/seeds/basic.ts @@ -1,7 +1,8 @@ import { SeedClient, TestNetwork, TestNetworkNoAppView } from '@atproto/dev-env' import { ids } from '../../src/lexicon/lexicons' import usersSeed from './users' -import { PrimaryDatabase } from '../../src' +import { Database } from '../../src' +import DatabaseSchema from '@atproto/pds/dist/db/database-schema' export default async ( sc: SeedClient, @@ -135,7 +136,7 @@ export default async ( await sc.repost(dan, alicesReplyToBob.ref) if (sc.network instanceof TestNetwork) { - const db = sc.network.bsky.db.getPrimary() + const { db } = sc.network.bsky await createLabel(db, { val: 'test-label', uri: sc.posts[alice][2].ref.uriStr, @@ -170,7 +171,7 @@ export const replies = { } const createLabel = async ( - db: PrimaryDatabase, + db: Database, opts: { uri: string; cid: string; val: string }, ) => { await db.db diff --git a/packages/bsky/tests/server.test.ts b/packages/bsky/tests/server.test.ts index 3084d8f2727..a6f65513c40 100644 --- a/packages/bsky/tests/server.test.ts +++ b/packages/bsky/tests/server.test.ts @@ -3,13 +3,11 @@ import express from 'express' import axios, { AxiosError } from 'axios' import { TestNetwork } from '@atproto/dev-env' import { handler as errorHandler } from '../src/error' -import { Database } from '../src' import basicSeed from './seeds/basic' import { once } from 'events' describe('server', () => { let network: TestNetwork - let db: Database let alice: string beforeAll(async () => { @@ -20,7 +18,6 @@ describe('server', () => { await basicSeed(sc) await network.processAll() alice = sc.dids.alice - db = network.bsky.db.getPrimary() }) afterAll(async () => { diff --git a/packages/bsky/tests/views/actor-search.test.ts b/packages/bsky/tests/views/actor-search.test.ts index 0a51556c591..d1dea5ab285 100644 --- a/packages/bsky/tests/views/actor-search.test.ts +++ b/packages/bsky/tests/views/actor-search.test.ts @@ -24,7 +24,7 @@ describe.skip('pds actor search views', () => { await usersBulkSeed(sc) // Skip did/handle resolution for expediency - const db = network.bsky.db.getPrimary() + const { db } = network.bsky const now = new Date().toISOString() await db.db .insertInto('actor') diff --git a/packages/bsky/tests/views/suggested-follows.test.ts b/packages/bsky/tests/views/suggested-follows.test.ts index 2be077e7dc6..5f2df54b4c5 100644 --- a/packages/bsky/tests/views/suggested-follows.test.ts +++ b/packages/bsky/tests/views/suggested-follows.test.ts @@ -26,9 +26,8 @@ describe('suggested follows', () => { { did: sc.dids.fred, order: 5 }, { did: sc.dids.gina, order: 6 }, ] - await network.bsky.db - .getPrimary() - .db.insertInto('suggested_follow') + await network.bsky.db.db + .insertInto('suggested_follow') .values(suggestions) .execute() }) diff --git a/packages/bsky/tests/views/suggestions.test.ts b/packages/bsky/tests/views/suggestions.test.ts index d524a420295..401bf79030c 100644 --- a/packages/bsky/tests/views/suggestions.test.ts +++ b/packages/bsky/tests/views/suggestions.test.ts @@ -24,9 +24,8 @@ describe('pds user search views', () => { { did: sc.dids.dan, order: 4 }, ] - await network.bsky.db - .getPrimary() - .db.insertInto('suggested_follow') + await network.bsky.db.db + .insertInto('suggested_follow') .values(suggestions) .execute() }) diff --git a/packages/bsky/tests/views/timeline.test.ts b/packages/bsky/tests/views/timeline.test.ts index 70a3cfad21d..25b80d65a63 100644 --- a/packages/bsky/tests/views/timeline.test.ts +++ b/packages/bsky/tests/views/timeline.test.ts @@ -4,7 +4,7 @@ import { TestNetwork, SeedClient } from '@atproto/dev-env' import { forSnapshot, getOriginator, paginateAll } from '../_util' import basicSeed from '../seeds/basic' import { FeedViewPost } from '../../src/lexicon/types/app/bsky/feed/defs' -import { PrimaryDatabase } from '../../src' +import { Database } from '../../src' const REVERSE_CHRON = 'reverse-chronological' @@ -32,7 +32,7 @@ describe('timeline views', () => { carol = sc.dids.carol dan = sc.dids.dan // covers label hydration on embeds - const db = network.bsky.db.getPrimary() + const { db } = network.bsky await createLabel(db, { val: 'test-label-3', uri: sc.posts[bob][0].ref.uriStr, @@ -234,7 +234,7 @@ describe('timeline views', () => { }) const createLabel = async ( - db: PrimaryDatabase, + db: Database, opts: { uri: string; cid: string; val: string }, ) => { await db.db diff --git a/packages/dev-env/src/bsky.ts b/packages/dev-env/src/bsky.ts index 3b726f2a096..e9f95158235 100644 --- a/packages/dev-env/src/bsky.ts +++ b/packages/dev-env/src/bsky.ts @@ -13,7 +13,7 @@ export class TestBsky { constructor( public url: string, public port: number, - public db: bsky.DatabaseCoordinator, + public db: bsky.Database, public server: bsky.BskyAppView, public dataplane: bsky.DataPlaneServer, public sub: bsky.RepoSubscription, @@ -34,20 +34,14 @@ export class TestBsky { }) // shared across server, ingester, and indexer in order to share pool, avoid too many pg connections. - const db = new bsky.DatabaseCoordinator({ + const db = new bsky.Database({ + url: cfg.dbPostgresUrl, schema: cfg.dbPostgresSchema, - primary: { - url: cfg.dbPostgresUrl, - poolSize: 10, - }, - replicas: [], + poolSize: 10, }) const dataplanePort = await getPort() - const dataplane = await bsky.DataPlaneServer.create( - db.getPrimary(), - dataplanePort, - ) + const dataplane = await bsky.DataPlaneServer.create(db, dataplanePort) const config = new bsky.ServerConfig({ version: 'unknown', @@ -64,7 +58,7 @@ export class TestBsky { }) // Separate migration db in case migration changes some connection state that we need in the tests, e.g. "alter database ... set ..." - const migrationDb = new bsky.PrimaryDatabase({ + const migrationDb = new bsky.Database({ url: cfg.dbPostgresUrl, schema: cfg.dbPostgresSchema, }) @@ -75,7 +69,7 @@ export class TestBsky { } await migrationDb.close() - const didCache = new bsky.DidSqlCache(db.getPrimary(), HOUR, DAY) + const didCache = new bsky.DidSqlCache(db, HOUR, DAY) // api server const server = bsky.BskyAppView.create({ @@ -87,9 +81,9 @@ export class TestBsky { const sub = new bsky.RepoSubscription({ service: cfg.repoProvider, - db: db.getPrimary(), + db, idResolver: server.ctx.idResolver, - background: new BackgroundQueue(db.getPrimary()), + background: new BackgroundQueue(db), }) await server.start() diff --git a/packages/dev-env/src/mock/index.ts b/packages/dev-env/src/mock/index.ts index eec0bb3b38e..f115a1112ef 100644 --- a/packages/dev-env/src/mock/index.ts +++ b/packages/dev-env/src/mock/index.ts @@ -1,6 +1,6 @@ import { AtUri } from '@atproto/syntax' import AtpAgent from '@atproto/api' -import { PrimaryDatabase } from '@atproto/bsky' +import { Database } from '@atproto/bsky' import { REASONSPAM, REASONOTHER, @@ -187,12 +187,12 @@ export async function generateMockSetup(env: TestNetwork) { }, ) - await createLabel(env.bsky.db.getPrimary(), { + await createLabel(env.bsky.db, { uri: labeledPost.uri, cid: labeledPost.cid, val: 'nudity', }) - await createLabel(env.bsky.db.getPrimary(), { + await createLabel(env.bsky.db, { uri: filteredPost.uri, cid: filteredPost.cid, val: 'dmca-violation', @@ -332,7 +332,7 @@ function ucfirst(str: string): string { } const createLabel = async ( - db: PrimaryDatabase, + db: Database, opts: { uri: string; cid: string; val: string }, ) => { await db.db diff --git a/packages/pds/tests/proxied/views.test.ts b/packages/pds/tests/proxied/views.test.ts index b0fd3e6e17a..39525d33c52 100644 --- a/packages/pds/tests/proxied/views.test.ts +++ b/packages/pds/tests/proxied/views.test.ts @@ -79,9 +79,8 @@ describe('proxies view requests', () => { { did: sc.dids.carol, order: 2 }, { did: sc.dids.dan, order: 3 }, ] - await network.bsky.db - .getPrimary() - .db.insertInto('suggested_follow') + await network.bsky.db.db + .insertInto('suggested_follow') .values(suggestions) .execute() diff --git a/packages/pds/tests/seeds/basic.ts b/packages/pds/tests/seeds/basic.ts index 31195c5b9ba..89a5b210c2f 100644 --- a/packages/pds/tests/seeds/basic.ts +++ b/packages/pds/tests/seeds/basic.ts @@ -157,9 +157,8 @@ const createLabel = async ( bsky: TestBsky, opts: { did: string; val: string }, ) => { - await bsky.db - .getPrimary() - .db.insertInto('label') + await bsky.db.db + .insertInto('label') .values({ uri: opts.did, cid: '',