Skip to content

Commit

Permalink
wire up dataplane in ctx & dev-env
Browse files Browse the repository at this point in the history
  • Loading branch information
dholms committed Dec 7, 2023
1 parent f3d5c82 commit ce5eecd
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 12 deletions.
8 changes: 8 additions & 0 deletions packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface ServerConfigValues {
dbReplicaPostgresUrls?: string[]
dbReplicaTags?: Record<string, number[]> // E.g. { timeline: [0], thread: [1] }
dbPostgresSchema?: string
dataplaneUrl: string
didPlcUrl: string
didCacheStaleTTL: number
didCacheMaxTTL: number
Expand Down Expand Up @@ -74,6 +75,8 @@ export class ServerConfig {
)
const dbPostgresSchema = process.env.DB_POSTGRES_SCHEMA
assert(dbPrimaryPostgresUrl)
const dataplaneUrl = process.env.DATAPLANE_URL
assert(dataplaneUrl)
const adminPassword = process.env.ADMIN_PASSWORD || 'admin'
const moderatorPassword = process.env.MODERATOR_PASSWORD || undefined
const triagePassword = process.env.TRIAGE_PASSWORD || undefined
Expand All @@ -93,6 +96,7 @@ export class ServerConfig {
dbReplicaPostgresUrls,
dbReplicaTags,
dbPostgresSchema,
dataplaneUrl,
didPlcUrl,
didCacheStaleTTL,
didCacheMaxTTL,
Expand Down Expand Up @@ -162,6 +166,10 @@ export class ServerConfig {
return this.cfg.dbPostgresSchema
}

get dataplaneUrl() {
return this.cfg.dataplaneUrl
}

get didCacheStaleTTL() {
return this.cfg.didCacheStaleTTL
}
Expand Down
12 changes: 12 additions & 0 deletions packages/bsky/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import { BackgroundQueue } from './background'
import { MountedAlgos } from './feed-gen/types'
import { LabelCache } from './label-cache'
import { NotificationServer } from './notifications'
import { DataPlaneClient } from './data-plane/client'
import { Hydrator } from './hydration/hydrator'

export class AppContext {
public moderationPushAgent: AtpAgent | undefined
Expand All @@ -22,6 +24,8 @@ export class AppContext {
imgUriBuilder: ImageUriBuilder
cfg: ServerConfig
services: Services
dataplane: DataPlaneClient
hydrator: Hydrator
signingKey: Keypair
idResolver: IdResolver
didCache: DidSqlCache
Expand Down Expand Up @@ -58,6 +62,14 @@ export class AppContext {
return this.opts.services
}

get dataplane(): DataPlaneClient {
return this.opts.dataplane
}

get hydrator(): Hydrator {
return this.opts.hydrator
}

get signingKey(): Keypair {
return this.opts.signingKey
}
Expand Down
2 changes: 2 additions & 0 deletions packages/bsky/src/data-plane/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './server'
export * from './client'
2 changes: 1 addition & 1 deletion packages/bsky/src/data-plane/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class DataPlaneServer {
return new DataPlaneServer(server)
}

async stop() {
async destroy() {
return new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err) {
Expand Down
8 changes: 8 additions & 0 deletions packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import { LabelCache } from './label-cache'
import { NotificationServer } from './notifications'
import { AtpAgent } from '@atproto/api'
import { Keypair } from '@atproto/crypto'
import { createDataPlaneClient } from './data-plane/client'
import { Hydrator } from './hydration/hydrator'

export * from './data-plane'
export type { ServerConfigValues } from './config'
export type { MountedAlgos } from './feed-gen/types'
export { ServerConfig } from './config'
Expand Down Expand Up @@ -115,10 +118,15 @@ export class BskyAppView {
labelCache,
})

const dataplane = createDataPlaneClient(config.dataplaneUrl, '1.1')
const hydrator = new Hydrator(dataplane)

const ctx = new AppContext({
db,
cfg: config,
services,
dataplane,
hydrator,
imgUriBuilder,
signingKey,
idResolver,
Expand Down
31 changes: 20 additions & 11 deletions packages/dev-env/src/bsky.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class TestBsky {
public server: bsky.BskyAppView,
public indexer: bsky.BskyIndexer,
public ingester: bsky.BskyIngester,
public dataplane: bsky.DataPlaneServer,
) {}

static async create(cfg: BskyConfig): Promise<TestBsky> {
Expand All @@ -34,6 +35,22 @@ export class TestBsky {
signer: serviceKeypair,
})

// shared across server, ingester, and indexer in order to share pool, avoid too many pg connections.
const db = new bsky.DatabaseCoordinator({
schema: cfg.dbPostgresSchema,
primary: {
url: cfg.dbPrimaryPostgresUrl,
poolSize: 10,
},
replicas: [],
})

const dataplanePort = await getPort()
const dataplane = await bsky.DataPlaneServer.create(
db.getPrimary(),
dataplanePort,
)

const config = new bsky.ServerConfig({
version: '0.0.0',
port,
Expand All @@ -42,6 +59,7 @@ export class TestBsky {
serverDid,
didCacheStaleTTL: HOUR,
didCacheMaxTTL: DAY,
dataplaneUrl: `http://localhost:${dataplanePort}`,
...cfg,
// Each test suite gets its own lock id for the repo subscription
adminPassword: ADMIN_PASSWORD,
Expand All @@ -51,16 +69,6 @@ export class TestBsky {
feedGenDid: 'did:example:feedGen',
})

// shared across server, ingester, and indexer in order to share pool, avoid too many pg connections.
const db = new bsky.DatabaseCoordinator({
schema: cfg.dbPostgresSchema,
primary: {
url: cfg.dbPrimaryPostgresUrl,
poolSize: 10,
},
replicas: [],
})

// Separate migration db in case migration changes some connection state that we need in the tests, e.g. "alter database ... set ..."
const migrationDb = new bsky.PrimaryDatabase({
url: cfg.dbPrimaryPostgresUrl,
Expand Down Expand Up @@ -146,7 +154,7 @@ export class TestBsky {

// we refresh label cache by hand in `processAll` instead of on a timer
server.ctx.labelCache.stop()
return new TestBsky(url, port, server, indexer, ingester)
return new TestBsky(url, port, server, indexer, ingester, dataplane)
}

get ctx(): bsky.AppContext {
Expand Down Expand Up @@ -190,6 +198,7 @@ export class TestBsky {

async close() {
await this.server.destroy({ skipDb: true })
await this.dataplane.destroy()
await this.ingester.destroy({ skipDb: true })
await this.indexer.destroy() // closes shared db
}
Expand Down

0 comments on commit ce5eecd

Please sign in to comment.