Skip to content

Commit

Permalink
Appview v2 dataplane retries based on client host (#2098)
Browse files Browse the repository at this point in the history
choose dataplane client for retries based on host when possible/relevant
  • Loading branch information
devinivy authored Jan 25, 2024
1 parent ad72ef7 commit 29230db
Showing 1 changed file with 38 additions and 14 deletions.
52 changes: 38 additions & 14 deletions packages/bsky/src/data-plane/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import {
import { createConnectTransport } from '@connectrpc/connect-node'

export type DataPlaneClient = PromiseClient<typeof Service>
type BaseClient = { lib: DataPlaneClient; url: URL }
type HttpVersion = '1.1' | '2'
const MAX_RETRIES = 3

export const createDataPlaneClient = (
baseUrls: string[],
Expand All @@ -21,21 +23,26 @@ export const createDataPlaneClient = (
assert(clients.length > 0, 'no clients available')
return makeAnyClient(Service, (method) => {
return async (...args) => {
let client = randomElement(clients)
assert(client, 'no clients available')
try {
return await client[method.localName](...args)
} catch (err) {
if (err instanceof ConnectError && err.code === Code.Unavailable) {
// retry immediately on a different client if the first was unavailable
const remainingClients = clients.filter((c) => c !== client)
client = randomElement(remainingClients)
if (client) {
return await client[method.localName](...args)
let tries = 0
let error: unknown
let remainingClients = clients
while (tries < MAX_RETRIES) {
const client = randomElement(remainingClients)
assert(client, 'no clients available')
try {
return await client.lib[method.localName](...args)
} catch (err) {
if (err instanceof ConnectError && err.code === Code.Unavailable) {
tries++
error = err
remainingClients = getRemainingClients(remainingClients, client)
} else {
throw err
}
}
throw err
}
assert(error)
throw error
}
}) as DataPlaneClient
}
Expand All @@ -55,14 +62,31 @@ export const isDataplaneError = (
const createBaseClient = (
baseUrl: string,
opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean },
): DataPlaneClient => {
): BaseClient => {
const { httpVersion = '2', rejectUnauthorized = true } = opts
const transport = createConnectTransport({
baseUrl,
httpVersion,
nodeOptions: { rejectUnauthorized },
})
return createPromiseClient(Service, transport)
return {
lib: createPromiseClient(Service, transport),
url: new URL(baseUrl),
}
}

const getRemainingClients = (clients: BaseClient[], lastClient: BaseClient) => {
if (clients.length < 2) return clients // no clients to choose from
if (lastClient.url.port) {
// if the last client had a port, we attempt to exclude its whole host.
const maybeRemaining = clients.filter(
(c) => c.url.hostname !== lastClient.url.hostname,
)
if (maybeRemaining.length) {
return maybeRemaining
}
}
return clients.filter((c) => c !== lastClient)
}

const randomElement = <T>(arr: T[]): T | undefined => {
Expand Down

0 comments on commit 29230db

Please sign in to comment.