Skip to content

Commit

Permalink
Merge pull request #1993 from bluesky-social/bav-v2-dataplane-clients
Browse files Browse the repository at this point in the history
Support multiple dataplane urls for appview
  • Loading branch information
devinivy authored Dec 29, 2023
2 parents 3feee71 + 6f05b42 commit c67e1f1
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-push-bsky-ghcr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
branches:
- main
- bav-v2-drop-pg
- bav-v2-config-pass1
- bav-v2-dataplane-clients
env:
REGISTRY: ghcr.io
USERNAME: ${{ github.actor }}
Expand Down
15 changes: 9 additions & 6 deletions packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface ServerConfigValues {
publicUrl?: string
serverDid: string
feedGenDid?: string
dataplaneUrl: string
dataplaneUrls: string[]
dataplaneHttpVersion?: '1.1' | '2'
dataplaneIgnoreBadTls?: boolean
didPlcUrl: string
Expand Down Expand Up @@ -37,14 +37,17 @@ export class ServerConfig {
: []
const imgUriEndpoint = process.env.BSKY_IMG_URI_ENDPOINT
const blobCacheLocation = process.env.BSKY_BLOB_CACHE_LOC
const dataplaneUrl = process.env.BSKY_DATAPLANE_URL
let dataplaneUrls = overrides?.dataplaneUrls
dataplaneUrls ??= process.env.BSKY_DATAPLANE_URLS
? process.env.BSKY_DATAPLANE_URLS.split(',')
: []
const dataplaneHttpVersion = process.env.BSKY_DATAPLANE_HTTP_VERSION || '2'
const dataplaneIgnoreBadTls =
process.env.BSKY_DATAPLANE_IGNORE_BAD_TLS === 'true'
const adminPassword = process.env.BSKY_ADMIN_PASSWORD || 'admin'
const moderatorPassword = process.env.BSKY_MODERATOR_PASSWORD || undefined
const triagePassword = process.env.BSKY_TRIAGE_PASSWORD || undefined
assert(dataplaneUrl)
assert(dataplaneUrls.length)
assert(dataplaneHttpVersion === '1.1' || dataplaneHttpVersion === '2')
return new ServerConfig({
version,
Expand All @@ -53,7 +56,7 @@ export class ServerConfig {
publicUrl,
serverDid,
feedGenDid,
dataplaneUrl,
dataplaneUrls,
dataplaneHttpVersion,
dataplaneIgnoreBadTls,
didPlcUrl,
Expand Down Expand Up @@ -104,8 +107,8 @@ export class ServerConfig {
return this.cfg.feedGenDid
}

get dataplaneUrl() {
return this.cfg.dataplaneUrl
get dataplaneUrls() {
return this.cfg.dataplaneUrls
}

get dataplaneHttpVersion() {
Expand Down
44 changes: 42 additions & 2 deletions packages/bsky/src/data-plane/client.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,49 @@
import assert from 'node:assert'
import { randomInt } from 'node:crypto'
import { Service } from './gen/bsky_connect'
import { PromiseClient, createPromiseClient } from '@connectrpc/connect'
import {
Code,
ConnectError,
PromiseClient,
createPromiseClient,
makeAnyClient,
} from '@connectrpc/connect'
import { createConnectTransport } from '@connectrpc/connect-node'

export type DataPlaneClient = PromiseClient<typeof Service>
type HttpVersion = '1.1' | '2'

export const createDataPlaneClient = (
baseUrl: string,
baseUrls: string[],
opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean },
) => {
const clients = baseUrls.map((baseUrl) => createBaseClient(baseUrl, opts))
assert(clients.length > 0, 'no clients available')
return makeAnyClient(Service, (method) => {
return async (...args) => {
let client = randomElement(clients)
assert(client, 'no clients available')
try {
return await client[method.localName](...args)
} catch (err) {
if (err instanceof ConnectError && err.code === Code.Unavailable) {
// retry immediately on a different client if the first was unavailable
const remainingClients = clients.filter((c) => c !== client)
client = randomElement(remainingClients)
if (client) {
return await client[method.localName](...args)
}
}
throw err
}
}
}) as DataPlaneClient
}

const createBaseClient = (
baseUrl: string,
opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean },
): DataPlaneClient => {
const { httpVersion = '2', rejectUnauthorized = true } = opts
const transport = createConnectTransport({
baseUrl,
Expand All @@ -17,3 +52,8 @@ export const createDataPlaneClient = (
})
return createPromiseClient(Service, transport)
}

const randomElement = <T>(arr: T[]): T | undefined => {
if (arr.length === 0) return
return arr[randomInt(arr.length)]
}
2 changes: 1 addition & 1 deletion packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class BskyAppView {
)
}

const dataplane = createDataPlaneClient(config.dataplaneUrl, {
const dataplane = createDataPlaneClient(config.dataplaneUrls, {
httpVersion: config.dataplaneHttpVersion,
rejectUnauthorized: !config.dataplaneIgnoreBadTls,
})
Expand Down
2 changes: 1 addition & 1 deletion packages/dev-env/src/bsky.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class TestBsky {
didPlcUrl: cfg.plcUrl,
publicUrl: 'https://bsky.public.url',
serverDid,
dataplaneUrl: `http://localhost:${dataplanePort}`,
dataplaneUrls: [`http://localhost:${dataplanePort}`],
dataplaneHttpVersion: '1.1',
...cfg,
adminPassword: ADMIN_PASSWORD,
Expand Down

0 comments on commit c67e1f1

Please sign in to comment.