Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/appview-v2' into appview-v2-testing
Browse files Browse the repository at this point in the history
  • Loading branch information
devinivy committed Jan 25, 2024
2 parents 6822634 + 29230db commit 7ee5723
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 16 deletions.
52 changes: 38 additions & 14 deletions packages/bsky/src/data-plane/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import { getDidKeyFromMultibase } from '@atproto/identity'
import { Service } from '../proto/bsky_connect'

export type DataPlaneClient = PromiseClient<typeof Service>
type BaseClient = { lib: DataPlaneClient; url: URL }
type HttpVersion = '1.1' | '2'
const MAX_RETRIES = 3

export const createDataPlaneClient = (
baseUrls: string[],
Expand All @@ -23,21 +25,26 @@ export const createDataPlaneClient = (
assert(clients.length > 0, 'no clients available')
return makeAnyClient(Service, (method) => {
return async (...args) => {
let client = randomElement(clients)
assert(client, 'no clients available')
try {
return await client[method.localName](...args)
} catch (err) {
if (err instanceof ConnectError && err.code === Code.Unavailable) {
// retry immediately on a different client if the first was unavailable
const remainingClients = clients.filter((c) => c !== client)
client = randomElement(remainingClients)
if (client) {
return await client[method.localName](...args)
let tries = 0
let error: unknown
let remainingClients = clients
while (tries < MAX_RETRIES) {
const client = randomElement(remainingClients)
assert(client, 'no clients available')
try {
return await client.lib[method.localName](...args)
} catch (err) {
if (err instanceof ConnectError && err.code === Code.Unavailable) {
tries++
error = err
remainingClients = getRemainingClients(remainingClients, client)
} else {
throw err
}
}
throw err
}
assert(error)
throw error
}
}) as DataPlaneClient
}
Expand All @@ -57,15 +64,32 @@ export const isDataplaneError = (
const createBaseClient = (
baseUrl: string,
opts: { httpVersion?: HttpVersion; rejectUnauthorized?: boolean },
): DataPlaneClient => {
): BaseClient => {
const { httpVersion = '2', rejectUnauthorized = true } = opts
const transport = createGrpcTransport({
baseUrl,
httpVersion,
acceptCompression: [],
nodeOptions: { rejectUnauthorized },
})
return createPromiseClient(Service, transport)
return {
lib: createPromiseClient(Service, transport),
url: new URL(baseUrl),
}
}

const getRemainingClients = (clients: BaseClient[], lastClient: BaseClient) => {
if (clients.length < 2) return clients // no clients to choose from
if (lastClient.url.port) {
// if the last client had a port, we attempt to exclude its whole host.
const maybeRemaining = clients.filter(
(c) => c.url.hostname !== lastClient.url.hostname,
)
if (maybeRemaining.length) {
return maybeRemaining
}
}
return clients.filter((c) => c !== lastClient)
}

const randomElement = <T>(arr: T[]): T | undefined => {
Expand Down
8 changes: 6 additions & 2 deletions packages/pds/tests/proxied/procedures.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ describe('proxies appview procedures', () => {
{ headers: sc.getHeaders(alice) },
)
expect(result1.notifications.length).toBeGreaterThanOrEqual(5)
expect(result1.notifications.every((n) => !n.isRead)).toBe(true)
expect(
result1.notifications.every((n, i) => {
return (i === 0 && !n.isRead) || (i !== 0 && n.isRead)
}),
).toBe(true)
// update last seen
const { indexedAt: lastSeenAt } = result1.notifications[2]
await agent.api.app.bsky.notification.updateSeen(
Expand All @@ -163,7 +167,7 @@ describe('proxies appview procedures', () => {
expect(result2.notifications).toEqual(
result1.notifications.map((n) => ({
...n,
isRead: n.indexedAt <= lastSeenAt,
isRead: n.indexedAt < lastSeenAt,
})),
)
})
Expand Down

0 comments on commit 7ee5723

Please sign in to comment.