Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shard): allow domain sharding #860

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/data/listen.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {Observable} from 'rxjs'

import {domainSharder as sharder} from '../http/domainSharding'
import type {ObservableSanityClient, SanityClient} from '../SanityClient'
import type {Any, ListenEvent, ListenOptions, ListenParams, MutationEvent} from '../types'
import defaults from '../util/defaults'
Expand Down Expand Up @@ -64,7 +65,11 @@ export function _listen<R extends Record<string, Any> = Record<string, Any>>(
const listenOpts = pick(options, possibleOptions)
const qs = encodeQueryString({query, params, options: {tag, ...listenOpts}})

const uri = `${url}${_getDataUrl(this, 'listen', qs)}`
let uri = `${url}${_getDataUrl(this, 'listen', qs)}`
if (this.config().useDomainSharding) {
uri = sharder.getShardedUrl(uri)
}

if (uri.length > MAX_URL_LENGTH) {
return new Observable((observer) => observer.error(new Error('Query too large for listener')))
}
Expand All @@ -91,6 +96,12 @@ export function _listen<R extends Record<string, Any> = Record<string, Any>>(
// Once it is`true`, it will never be `false` again.
let unsubscribed = false

// We're about to connect, and will reuse the same shard/bucket for every reconnect henceforth.
// This may seem inoptimal, but once connected we should just consider this as a "permanent"
// connection, since we'll automatically retry on failures/disconnects. Once we explicitly
// unsubsccribe, we can decrement the bucket and free up the shard.
sharder.incrementBucketForUrl(uri)

open()

function onError() {
Expand Down Expand Up @@ -187,6 +198,7 @@ export function _listen<R extends Record<string, Any> = Record<string, Any>>(
stopped = true
unsubscribe()
unsubscribed = true
sharder.decrementBucketForUrl(uri)
}

return stop
Expand Down
4 changes: 3 additions & 1 deletion src/http/browserMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export default []
import {domainSharder} from './domainSharding'

export default [domainSharder.middleware]
100 changes: 100 additions & 0 deletions src/http/domainSharding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import type {Middleware, RequestOptions} from 'get-it'

const DEFAULT_NUM_SHARD_BUCKETS = 9
const UNSHARDED_URL_RE = /^https:\/\/([a-z0-9]+)\.api\.(sanity\..*)/
const SHARDED_URL_RE = /^https:\/\/[a-z0-9]+\.api\.r(\d+)\.sanity\.(.*)/

/**
* Get a default sharding implementation where buckets are reused across instances.
* Helps prevent the case when multiple clients are instantiated, each having their
* own state of which buckets are least used.
*/
export const domainSharder = getDomainSharder()

/**
* @internal
*/
export function getDomainSharder(initialBuckets?: number[]) {
const buckets: number[] = initialBuckets || new Array(DEFAULT_NUM_SHARD_BUCKETS).fill(0, 0)

function incrementBucketForUrl(url: string) {
const shard = getShardFromUrl(url)
if (shard !== null) {
buckets[shard]++
}
}

function decrementBucketForUrl(url: string) {
const shard = getShardFromUrl(url)
if (shard !== null) {
buckets[shard]--
}
}

function getShardedUrl(url: string): string {
const [isMatch, projectId, rest] = url.match(UNSHARDED_URL_RE) || []
if (!isMatch) {
return url
}

// Find index of bucket with fewest requests
const bucket = buckets.reduce(
(smallest, count, index) => (count < buckets[smallest] ? index : smallest),
0,
)

// We start buckets at 1, not zero - so add 1 to the bucket index
return `https://${projectId}.api.r${bucket + 1}.${rest}`
}

function getShardFromUrl(url: string): number | null {
const [isMatch, shard] = url.match(SHARDED_URL_RE) || []

// We start buckets at 1, not zero, but buckets are zero-indexed.
// Substract one from the shard number in the URL to get the correct bucket index
return isMatch ? parseInt(shard, 10) - 1 : null
}

const middleware = {
processOptions: (options: {useDomainSharding?: boolean; url: string}) => {
if (!useDomainSharding(options)) {
return options
}

const url = getShardedUrl(options.url)
options.url = url

return options
},

onRequest(req: {
options: Partial<RequestOptions> & {useDomainSharding?: boolean; url: string}
}) {
if (useDomainSharding(req.options)) {
incrementBucketForUrl(req.options.url)
}
return req
},

onResponse(
res,
context: {options: Partial<RequestOptions> & {useDomainSharding?: boolean; url: string}},
) {
if (useDomainSharding(context.options)) {
decrementBucketForUrl(context.options.url)
}
return res
},
} satisfies Middleware

return {
middleware,
incrementBucketForUrl,
decrementBucketForUrl,
getShardedUrl,
}
}

function useDomainSharding(options: RequestOptions | {useDomainSharding?: boolean}): boolean {
return 'useDomainSharding' in options && options.useDomainSharding === true
}
3 changes: 3 additions & 0 deletions src/http/nodeMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import {agent, debug, headers} from 'get-it/middleware'

import {name, version} from '../../package.json'
import {domainSharder} from './domainSharding'

const middleware = [
domainSharder.middleware,

debug({verbose: true, namespace: 'sanity:client'}),
headers({'User-Agent': `${name} ${version}`}),

Expand Down
1 change: 1 addition & 0 deletions src/http/requestOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export function requestOptions(config: Any, overrides: Any = {}): Omit<RequestOp
proxy: overrides.proxy || config.proxy,
json: true,
withCredentials,
useDomainSharding: config.useDomainSharding,
fetch:
typeof overrides.fetch === 'object' && typeof config.fetch === 'object'
? {...config.fetch, ...overrides.fetch}
Expand Down
8 changes: 8 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ export interface ClientConfig {
apiVersion?: string
proxy?: string

/**
* Spread the requests over a number of hostnames to work around HTTP/1.1 limitations.
* Only applicable in browsers, and for certain allowed projects.
*
* @alpha
*/
useDomainSharding?: boolean

/**
* Optional request tag prefix for all request tags
*/
Expand Down
163 changes: 163 additions & 0 deletions test/domainSharding.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import {type ClientConfig, createClient} from '@sanity/client'
import {describe, expect, test} from 'vitest'

import {getDomainSharder} from '../src/http/domainSharding'

const apiHost = 'api.sanity.url'
const defaultProjectId = 'bf1942'
const clientConfig = {
apiHost: `https://${apiHost}`,
projectId: defaultProjectId,
apiVersion: '1',
dataset: 'foo',
useCdn: false,
}

describe('domain sharding', async () => {
const isBrowser = typeof window !== 'undefined' && window.location && window.location.hostname
const isEdge = typeof EdgeRuntime === 'string'
let nock: typeof import('nock') = (() => {
throw new Error('Not supported in EdgeRuntime')
}) as any
if (!isEdge) {
const _nock = await import('nock')
nock = _nock.default
}

const testClient = describe.each([
[
'static config',
(conf?: ClientConfig) =>
createClient({...clientConfig, useDomainSharding: true, ...(conf || {})}),
],
[
'reconfigured config',
(conf?: ClientConfig) =>
createClient({...clientConfig, ...(conf || {}), useDomainSharding: false}).withConfig({
useDomainSharding:
typeof conf?.useDomainSharding === 'undefined' ? true : conf.useDomainSharding,
}),
],
])

testClient('%s: some test', (name, getClient) => {
test.skipIf(isEdge || isBrowser)(
'can create a client that spreads request over a number of hostnames',
async () => {
const client = getClient()

for (let i = 0; i <= 15; i++) {
const shard = (i % 9) + 1
const mockHost = `https://${defaultProjectId}.api.r${shard}.sanity.url`
const mockPath = `/v1/ping?req=${i}`
nock(mockHost).get(mockPath).delay(25).reply(200, {req: i})
}

const requests = []
for (let i = 0; i <= 15; i++) {
requests.push(client.request({uri: `/ping?req=${i}`}))
}

const responses = await Promise.all(requests)

for (let i = 0; i <= 15; i++) {
const res = responses[i]
expect(res).toMatchObject({req: i})
}
},
)

test.skipIf(isEdge || isBrowser)('listen() uses sharding', async () => {
const client = getClient()
const listenerName = 'QYdPOBgC3V0Os5QsphvTKu'

nock('https://bf1942.api.r1.sanity.url', {encodedQueryParams: true})
.get('/v1/data/listen/foo')
.query({query: 'true', includeResult: 'true'})
.reply(200, `\n:\nevent: welcome\ndata: {"listenerName": "${listenerName}"}\n\n\n`, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Transfer-Encoding': 'chunked',
})

return new Promise<void>((resolve, reject) => {
const subscription = client.listen('true', {}, {events: ['welcome']}).subscribe({
next: (msg) => {
expect(msg).toMatchObject({listenerName})
subscription.unsubscribe()
resolve()
},
error: (err) => {
subscription.unsubscribe()
reject(err)
},
})
})
})

test('middleware does not shard if `useDomainSharding` is undefined', () => {
const {middleware} = getDomainSharder()
const out = middleware.processOptions({url: 'https://bf1942.api.sanity.url/v1/ping'})
expect(out.url).toBe('https://bf1942.api.sanity.url/v1/ping')
})

test('middleware does not shard if `useDomainSharding` is false', () => {
const {middleware} = getDomainSharder()
const out = middleware.processOptions({
url: 'https://bf1942.api.sanity.url/v1/ping',
useDomainSharding: false,
})
expect(out.url).toBe('https://bf1942.api.sanity.url/v1/ping')
})

test('middleware rewrites hostname to be shared if `useDomainSharding` is true', () => {
const {middleware} = getDomainSharder()
const out = middleware.processOptions({
url: 'https://bf1942.api.sanity.url/v1/ping',
useDomainSharding: true,
})
expect(out.url).toBe('https://bf1942.api.r1.sanity.url/v1/ping')
})

test('middleware uses first bucket with fewest pending requests', () => {
const {middleware} = getDomainSharder([9, 6, 3, 8, 1, 2, 5, 4, 1, 7])
const out = middleware.processOptions({
url: 'https://bf1942.api.sanity.url/v1/ping',
useDomainSharding: true,
})
expect(out.url).toBe('https://bf1942.api.r5.sanity.url/v1/ping')
})

test('middleware increases bucket request number on request', () => {
const buckets = [1, 1]
const {middleware} = getDomainSharder(buckets)
middleware.onRequest({
options: {
url: 'https://bf1942.api.r2.sanity.url/v1/ping',
useDomainSharding: true,
},
})
expect(buckets).toEqual([1, 2])
})

test('middleware decreases bucket request number on response', () => {
const buckets = [2, 1]
const {middleware} = getDomainSharder(buckets)
const context = {
options: {
url: 'https://bf1942.api.r1.sanity.url/v1/ping',
useDomainSharding: true,
},
}
middleware.onResponse({} as any, context as any)
expect(buckets).toEqual([1, 1])
})

test('reconfiguring with `withConfig()` maintains sharding setting', () => {
const client = getClient()
expect(client.config().useDomainSharding).toBe(true)

const client2 = client.withConfig({apiVersion: '2024-07-01'})
expect(client2.config().useDomainSharding).toBe(true)
})
})
})
Loading