Skip to content

Commit

Permalink
support multiple dataplane urls, retry when unavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy committed Dec 27, 2023
1 parent f431958 commit 1dab449
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 9 deletions.
15 changes: 9 additions & 6 deletions packages/bsky/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export interface ServerConfigValues {
publicUrl?: string
serverDid: string
feedGenDid?: string
dataplaneUrl: string
dataplaneUrls: string[]
dataplaneHttpVersion?: '1.1' | '2'
dataplaneIgnoreBadTls?: boolean
didPlcUrl: string
Expand Down Expand Up @@ -37,14 +37,17 @@ export class ServerConfig {
: []
const imgUriEndpoint = process.env.BSKY_IMG_URI_ENDPOINT
const blobCacheLocation = process.env.BSKY_BLOB_CACHE_LOC
const dataplaneUrl = process.env.BSKY_DATAPLANE_URL
let dataplaneUrls = overrides?.dataplaneUrls
dataplaneUrls ??= process.env.BSKY_DATAPLANE_URLS
? process.env.BSKY_DATAPLANE_URLS.split(',')
: []
const dataplaneHttpVersion = process.env.BSKY_DATAPLANE_HTTP_VERSION || '2'
const dataplaneIgnoreBadTls =
process.env.BSKY_DATAPLANE_IGNORE_BAD_TLS === 'true'
const adminPassword = process.env.BSKY_ADMIN_PASSWORD || 'admin'
const moderatorPassword = process.env.BSKY_MODERATOR_PASSWORD || undefined
const triagePassword = process.env.BSKY_TRIAGE_PASSWORD || undefined
assert(dataplaneUrl)
assert(dataplaneUrls.length)
assert(dataplaneHttpVersion === '1.1' || dataplaneHttpVersion === '2')
return new ServerConfig({
version,
Expand All @@ -53,7 +56,7 @@ export class ServerConfig {
publicUrl,
serverDid,
feedGenDid,
dataplaneUrl,
dataplaneUrls,
dataplaneHttpVersion,
dataplaneIgnoreBadTls,
didPlcUrl,
Expand Down Expand Up @@ -104,8 +107,8 @@ export class ServerConfig {
return this.cfg.feedGenDid
}

get dataplaneUrl() {
return this.cfg.dataplaneUrl
get dataplaneUrls() {
return this.cfg.dataplaneUrls
}

get dataplaneHttpVersion() {
Expand Down
42 changes: 41 additions & 1 deletion packages/bsky/src/data-plane/client.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,46 @@
import assert from 'node:assert'
import { randomInt } from 'node:crypto'
import { Service } from './gen/bsky_connect'
import { PromiseClient, createPromiseClient } from '@connectrpc/connect'
import {
Code,
ConnectError,
PromiseClient,
createPromiseClient,
makeAnyClient,
} from '@connectrpc/connect'
import { createConnectTransport } from '@connectrpc/connect-node'

export type DataPlaneClient = PromiseClient<typeof Service>
type HttpVersion = '1.1' | '2'

export const createDataPlaneClient = (
baseUrls: string[],
opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean },
) => {
assert(baseUrls.length > 0, 'no clients available')
const clients = baseUrls.map((baseUrl) => createBaseClient(baseUrl, opts))
return makeAnyClient(Service, (method) => {
return async (...args) => {
let client = randomElement(clients)
assert(client, 'no clients available')
try {
return await client[method.localName](...args)
} catch (err) {
if (err instanceof ConnectError && err.code === Code.Unavailable) {
// retry immediately on a different client if the first was unavailable
const remainingClients = clients.filter((c) => c !== client)
client = randomElement(remainingClients)
if (client) {
return await client[method.localName](client)
}
}
throw err
}
}
}) as DataPlaneClient
}

const createBaseClient = (
baseUrl: string,
opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean },
) => {
Expand All @@ -17,3 +52,8 @@ export const createDataPlaneClient = (
})
return createPromiseClient(Service, transport)
}

const randomElement = <T>(arr: T[]): T | undefined => {
if (arr.length === 0) return
return arr[randomInt(arr.length)]
}
2 changes: 1 addition & 1 deletion packages/bsky/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class BskyAppView {
)
}

const dataplane = createDataPlaneClient(config.dataplaneUrl, {
const dataplane = createDataPlaneClient(config.dataplaneUrls, {
httpVersion: config.dataplaneHttpVersion,
rejectUnauthorized: !config.dataplaneIgnoreBadTls,
})
Expand Down
2 changes: 1 addition & 1 deletion packages/dev-env/src/bsky.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class TestBsky {
didPlcUrl: cfg.plcUrl,
publicUrl: 'https://bsky.public.url',
serverDid,
dataplaneUrl: `http://localhost:${dataplanePort}`,
dataplaneUrls: [`http://localhost:${dataplanePort}`],
dataplaneHttpVersion: '1.1',
...cfg,
adminPassword: ADMIN_PASSWORD,
Expand Down

0 comments on commit 1dab449

Please sign in to comment.