From ce5eecd3c1e8ce0fcf4a1f1b5274d1494e5d4cbc Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 7 Dec 2023 14:35:57 -0600 Subject: [PATCH] wire up dataplane in ctx & dev-env --- packages/bsky/src/config.ts | 8 +++++ packages/bsky/src/context.ts | 12 ++++++++ packages/bsky/src/data-plane/index.ts | 2 ++ packages/bsky/src/data-plane/server/index.ts | 2 +- packages/bsky/src/index.ts | 8 +++++ packages/dev-env/src/bsky.ts | 31 +++++++++++++------- 6 files changed, 51 insertions(+), 12 deletions(-) create mode 100644 packages/bsky/src/data-plane/index.ts diff --git a/packages/bsky/src/config.ts b/packages/bsky/src/config.ts index 679f99c59a4..b2808326fa6 100644 --- a/packages/bsky/src/config.ts +++ b/packages/bsky/src/config.ts @@ -12,6 +12,7 @@ export interface ServerConfigValues { dbReplicaPostgresUrls?: string[] dbReplicaTags?: Record // E.g. { timeline: [0], thread: [1] } dbPostgresSchema?: string + dataplaneUrl: string didPlcUrl: string didCacheStaleTTL: number didCacheMaxTTL: number @@ -74,6 +75,8 @@ export class ServerConfig { ) const dbPostgresSchema = process.env.DB_POSTGRES_SCHEMA assert(dbPrimaryPostgresUrl) + const dataplaneUrl = process.env.DATAPLANE_URL + assert(dataplaneUrl) const adminPassword = process.env.ADMIN_PASSWORD || 'admin' const moderatorPassword = process.env.MODERATOR_PASSWORD || undefined const triagePassword = process.env.TRIAGE_PASSWORD || undefined @@ -93,6 +96,7 @@ export class ServerConfig { dbReplicaPostgresUrls, dbReplicaTags, dbPostgresSchema, + dataplaneUrl, didPlcUrl, didCacheStaleTTL, didCacheMaxTTL, @@ -162,6 +166,10 @@ export class ServerConfig { return this.cfg.dbPostgresSchema } + get dataplaneUrl() { + return this.cfg.dataplaneUrl + } + get didCacheStaleTTL() { return this.cfg.didCacheStaleTTL } diff --git a/packages/bsky/src/context.ts b/packages/bsky/src/context.ts index 3488c6a5c02..b4ceff951f6 100644 --- a/packages/bsky/src/context.ts +++ b/packages/bsky/src/context.ts @@ -13,6 +13,8 @@ import { BackgroundQueue } from './background' import { MountedAlgos } from './feed-gen/types' import { LabelCache } from './label-cache' import { NotificationServer } from './notifications' +import { DataPlaneClient } from './data-plane/client' +import { Hydrator } from './hydration/hydrator' export class AppContext { public moderationPushAgent: AtpAgent | undefined @@ -22,6 +24,8 @@ export class AppContext { imgUriBuilder: ImageUriBuilder cfg: ServerConfig services: Services + dataplane: DataPlaneClient + hydrator: Hydrator signingKey: Keypair idResolver: IdResolver didCache: DidSqlCache @@ -58,6 +62,14 @@ export class AppContext { return this.opts.services } + get dataplane(): DataPlaneClient { + return this.opts.dataplane + } + + get hydrator(): Hydrator { + return this.opts.hydrator + } + get signingKey(): Keypair { return this.opts.signingKey } diff --git a/packages/bsky/src/data-plane/index.ts b/packages/bsky/src/data-plane/index.ts new file mode 100644 index 00000000000..6047188a655 --- /dev/null +++ b/packages/bsky/src/data-plane/index.ts @@ -0,0 +1,2 @@ +export * from './server' +export * from './client' diff --git a/packages/bsky/src/data-plane/server/index.ts b/packages/bsky/src/data-plane/server/index.ts index 6ceec06fd72..554d5bd2f7d 100644 --- a/packages/bsky/src/data-plane/server/index.ts +++ b/packages/bsky/src/data-plane/server/index.ts @@ -17,7 +17,7 @@ export class DataPlaneServer { return new DataPlaneServer(server) } - async stop() { + async destroy() { return new Promise((resolve, reject) => { this.server.close((err) => { if (err) { diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index 7ceba61f990..741b000ab8b 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -27,7 +27,10 @@ import { LabelCache } from './label-cache' import { NotificationServer } from './notifications' import { AtpAgent } from '@atproto/api' import { Keypair } from '@atproto/crypto' +import { createDataPlaneClient } from './data-plane/client' +import { Hydrator } from './hydration/hydrator' +export * from './data-plane' export type { ServerConfigValues } from './config' export type { MountedAlgos } from './feed-gen/types' export { ServerConfig } from './config' @@ -115,10 +118,15 @@ export class BskyAppView { labelCache, }) + const dataplane = createDataPlaneClient(config.dataplaneUrl, '1.1') + const hydrator = new Hydrator(dataplane) + const ctx = new AppContext({ db, cfg: config, services, + dataplane, + hydrator, imgUriBuilder, signingKey, idResolver, diff --git a/packages/dev-env/src/bsky.ts b/packages/dev-env/src/bsky.ts index 8320130eb43..7c29ef4e86f 100644 --- a/packages/dev-env/src/bsky.ts +++ b/packages/dev-env/src/bsky.ts @@ -18,6 +18,7 @@ export class TestBsky { public server: bsky.BskyAppView, public indexer: bsky.BskyIndexer, public ingester: bsky.BskyIngester, + public dataplane: bsky.DataPlaneServer, ) {} static async create(cfg: BskyConfig): Promise { @@ -34,6 +35,22 @@ export class TestBsky { signer: serviceKeypair, }) + // shared across server, ingester, and indexer in order to share pool, avoid too many pg connections. + const db = new bsky.DatabaseCoordinator({ + schema: cfg.dbPostgresSchema, + primary: { + url: cfg.dbPrimaryPostgresUrl, + poolSize: 10, + }, + replicas: [], + }) + + const dataplanePort = await getPort() + const dataplane = await bsky.DataPlaneServer.create( + db.getPrimary(), + dataplanePort, + ) + const config = new bsky.ServerConfig({ version: '0.0.0', port, @@ -42,6 +59,7 @@ export class TestBsky { serverDid, didCacheStaleTTL: HOUR, didCacheMaxTTL: DAY, + dataplaneUrl: `http://localhost:${dataplanePort}`, ...cfg, // Each test suite gets its own lock id for the repo subscription adminPassword: ADMIN_PASSWORD, @@ -51,16 +69,6 @@ export class TestBsky { feedGenDid: 'did:example:feedGen', }) - // shared across server, ingester, and indexer in order to share pool, avoid too many pg connections. - const db = new bsky.DatabaseCoordinator({ - schema: cfg.dbPostgresSchema, - primary: { - url: cfg.dbPrimaryPostgresUrl, - poolSize: 10, - }, - replicas: [], - }) - // Separate migration db in case migration changes some connection state that we need in the tests, e.g. "alter database ... set ..." const migrationDb = new bsky.PrimaryDatabase({ url: cfg.dbPrimaryPostgresUrl, @@ -146,7 +154,7 @@ export class TestBsky { // we refresh label cache by hand in `processAll` instead of on a timer server.ctx.labelCache.stop() - return new TestBsky(url, port, server, indexer, ingester) + return new TestBsky(url, port, server, indexer, ingester, dataplane) } get ctx(): bsky.AppContext { @@ -190,6 +198,7 @@ export class TestBsky { async close() { await this.server.destroy({ skipDb: true }) + await this.dataplane.destroy() await this.ingester.destroy({ skipDb: true }) await this.indexer.destroy() // closes shared db }