From 1dab449ff5a463d5434fce865ddb3ac5514cf770 Mon Sep 17 00:00:00 2001 From: Devin Ivy Date: Wed, 27 Dec 2023 18:20:09 -0500 Subject: [PATCH 1/3] support multiple dataplane urls, retry when unavailable --- packages/bsky/src/config.ts | 15 +++++---- packages/bsky/src/data-plane/client.ts | 42 +++++++++++++++++++++++++- packages/bsky/src/index.ts | 2 +- packages/dev-env/src/bsky.ts | 2 +- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/packages/bsky/src/config.ts b/packages/bsky/src/config.ts index fd31e94afe8..1e8665a4276 100644 --- a/packages/bsky/src/config.ts +++ b/packages/bsky/src/config.ts @@ -7,7 +7,7 @@ export interface ServerConfigValues { publicUrl?: string serverDid: string feedGenDid?: string - dataplaneUrl: string + dataplaneUrls: string[] dataplaneHttpVersion?: '1.1' | '2' dataplaneIgnoreBadTls?: boolean didPlcUrl: string @@ -37,14 +37,17 @@ export class ServerConfig { : [] const imgUriEndpoint = process.env.BSKY_IMG_URI_ENDPOINT const blobCacheLocation = process.env.BSKY_BLOB_CACHE_LOC - const dataplaneUrl = process.env.BSKY_DATAPLANE_URL + let dataplaneUrls = overrides?.dataplaneUrls + dataplaneUrls ??= process.env.BSKY_DATAPLANE_URLS + ? process.env.BSKY_DATAPLANE_URLS.split(',') + : [] const dataplaneHttpVersion = process.env.BSKY_DATAPLANE_HTTP_VERSION || '2' const dataplaneIgnoreBadTls = process.env.BSKY_DATAPLANE_IGNORE_BAD_TLS === 'true' const adminPassword = process.env.BSKY_ADMIN_PASSWORD || 'admin' const moderatorPassword = process.env.BSKY_MODERATOR_PASSWORD || undefined const triagePassword = process.env.BSKY_TRIAGE_PASSWORD || undefined - assert(dataplaneUrl) + assert(dataplaneUrls.length) assert(dataplaneHttpVersion === '1.1' || dataplaneHttpVersion === '2') return new ServerConfig({ version, @@ -53,7 +56,7 @@ export class ServerConfig { publicUrl, serverDid, feedGenDid, - dataplaneUrl, + dataplaneUrls, dataplaneHttpVersion, dataplaneIgnoreBadTls, didPlcUrl, @@ -104,8 +107,8 @@ export class ServerConfig { return this.cfg.feedGenDid } - get dataplaneUrl() { - return this.cfg.dataplaneUrl + get dataplaneUrls() { + return this.cfg.dataplaneUrls } get dataplaneHttpVersion() { diff --git a/packages/bsky/src/data-plane/client.ts b/packages/bsky/src/data-plane/client.ts index e2f34586a8e..b097d08b30e 100644 --- a/packages/bsky/src/data-plane/client.ts +++ b/packages/bsky/src/data-plane/client.ts @@ -1,11 +1,46 @@ +import assert from 'node:assert' +import { randomInt } from 'node:crypto' import { Service } from './gen/bsky_connect' -import { PromiseClient, createPromiseClient } from '@connectrpc/connect' +import { + Code, + ConnectError, + PromiseClient, + createPromiseClient, + makeAnyClient, +} from '@connectrpc/connect' import { createConnectTransport } from '@connectrpc/connect-node' export type DataPlaneClient = PromiseClient type HttpVersion = '1.1' | '2' export const createDataPlaneClient = ( + baseUrls: string[], + opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean }, +) => { + assert(baseUrls.length > 0, 'no clients available') + const clients = baseUrls.map((baseUrl) => createBaseClient(baseUrl, opts)) + return makeAnyClient(Service, (method) => { + return async (...args) => { + let client = randomElement(clients) + assert(client, 'no clients available') + try { + return await client[method.localName](...args) + } catch (err) { + if (err instanceof ConnectError && err.code === Code.Unavailable) { + // retry immediately on a different client if the first was unavailable + const remainingClients = clients.filter((c) => c !== client) + client = randomElement(remainingClients) + if (client) { + return await client[method.localName](client) + } + } + throw err + } + } + }) as DataPlaneClient +} + +const createBaseClient = ( baseUrl: string, opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean }, ) => { @@ -17,3 +52,8 @@ export const createDataPlaneClient = ( }) return createPromiseClient(Service, transport) } + +const randomElement = (arr: T[]): T | undefined => { + if (arr.length === 0) return + return arr[randomInt(arr.length)] +} diff --git a/packages/bsky/src/index.ts b/packages/bsky/src/index.ts index 5b13879ac17..38377e80a18 100644 --- a/packages/bsky/src/index.ts +++ b/packages/bsky/src/index.ts @@ -75,7 +75,7 @@ export class BskyAppView { ) } - const dataplane = createDataPlaneClient(config.dataplaneUrl, { + const dataplane = createDataPlaneClient(config.dataplaneUrls, { httpVersion: config.dataplaneHttpVersion, rejectUnauthorized: !config.dataplaneIgnoreBadTls, }) diff --git a/packages/dev-env/src/bsky.ts b/packages/dev-env/src/bsky.ts index 975c3da1d4b..1c9cb694e4e 100644 --- a/packages/dev-env/src/bsky.ts +++ b/packages/dev-env/src/bsky.ts @@ -55,7 +55,7 @@ export class TestBsky { didPlcUrl: cfg.plcUrl, publicUrl: 'https://bsky.public.url', serverDid, - dataplaneUrl: `http://localhost:${dataplanePort}`, + dataplaneUrls: [`http://localhost:${dataplanePort}`], dataplaneHttpVersion: '1.1', ...cfg, adminPassword: ADMIN_PASSWORD, From 065149aa5ad5cc4665101d386c7d2e4529af5f5a Mon Sep 17 00:00:00 2001 From: Devin Ivy Date: Wed, 27 Dec 2023 18:20:49 -0500 Subject: [PATCH 2/3] build --- .github/workflows/build-and-push-bsky-ghcr.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-and-push-bsky-ghcr.yaml b/.github/workflows/build-and-push-bsky-ghcr.yaml index 9f0e03ecfba..c7b776d2bf8 100644 --- a/.github/workflows/build-and-push-bsky-ghcr.yaml +++ b/.github/workflows/build-and-push-bsky-ghcr.yaml @@ -4,7 +4,7 @@ on: branches: - main - bav-v2-drop-pg - - bav-v2-config-pass1 + - bav-v2-dataplane-clients env: REGISTRY: ghcr.io USERNAME: ${{ github.actor }} From 6f05b429786b9efee156ef9f6c483fb6b7be6597 Mon Sep 17 00:00:00 2001 From: Devin Ivy Date: Wed, 27 Dec 2023 18:24:34 -0500 Subject: [PATCH 3/3] tidy/fix --- packages/bsky/src/data-plane/client.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/bsky/src/data-plane/client.ts b/packages/bsky/src/data-plane/client.ts index b097d08b30e..d3ccf8d879e 100644 --- a/packages/bsky/src/data-plane/client.ts +++ b/packages/bsky/src/data-plane/client.ts @@ -17,8 +17,8 @@ export const createDataPlaneClient = ( baseUrls: string[], opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean }, ) => { - assert(baseUrls.length > 0, 'no clients available') const clients = baseUrls.map((baseUrl) => createBaseClient(baseUrl, opts)) + assert(clients.length > 0, 'no clients available') return makeAnyClient(Service, (method) => { return async (...args) => { let client = randomElement(clients) @@ -31,7 +31,7 @@ export const createDataPlaneClient = ( const remainingClients = clients.filter((c) => c !== client) client = randomElement(remainingClients) if (client) { - return await client[method.localName](client) + return await client[method.localName](...args) } } throw err @@ -43,7 +43,7 @@ export const createDataPlaneClient = ( const createBaseClient = ( baseUrl: string, opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean }, -) => { +): DataPlaneClient => { const { httpVersion = '2', rejectUnauthorized = true } = opts const transport = createConnectTransport({ baseUrl,