Skip to content

Commit

Permalink
feat: add customer origin url fallback (#37)
Browse files Browse the repository at this point in the history
* feat: add customer origin url fallback

* remove unused code

* simplify code

* feat: fallback from flat file origins

* use format to determine file format

* rename origin url

* DRY up fetch options

* raw fallback url

* address comments

* fix

* fixes

* remove unused code

* fix again
  • Loading branch information
AmeanAsad authored Nov 3, 2023
1 parent b809e94 commit 362c97e
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 94 deletions.
167 changes: 87 additions & 80 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,28 @@ import { isErrorUnavoidable } from './utils/errors.js'
const MAX_NODE_WEIGHT = 100
/**
* @typedef {import('./types.js').Node} Node
* @typedef {import('./types.js').FetchOptions} FetchOptions
*/

export class Saturn {
static nodesListKey = 'saturn-nodes'
static defaultRaceCount = 3
/**
*
* @param {object} [opts={}]
* @param {string} [opts.clientKey]
* @param {string} [opts.clientId=randomUUID()]
* @param {string} [opts.cdnURL=saturn.ms]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {string} [opts.orchURL]
* @param {number} [opts.fallbackLimit]
* @param {boolean} [opts.experimental]
* @param {import('./storage/index.js').Storage} [opts.storage]
* @param {object} [config={}]
* @param {string} [config.clientKey]
* @param {string} [config.clientId=randomUUID()]
* @param {string} [config.cdnURL=saturn.ms]
* @param {number} [config.connectTimeout=5000]
* @param {number} [config.downloadTimeout=0]
* @param {string} [config.orchURL]
* @param {string} [config.customerFallbackURL]
* @param {number} [config.fallbackLimit]
* @param {boolean} [config.experimental]
* @param {import('./storage/index.js').Storage} [config.storage]
*/
constructor (opts = {}) {
this.opts = Object.assign({}, {
constructor (config = {}) {
this.config = Object.assign({}, {
clientId: randomUUID(),
cdnURL: 'l1s.saturn.ms',
logURL: 'https://twb3qukm2i654i3tnvx36char40aymqq.lambda-url.us-west-2.on.aws/',
Expand All @@ -42,9 +44,9 @@ export class Saturn {
fallbackLimit: 5,
connectTimeout: 5_000,
downloadTimeout: 0
}, opts)
}, config)

if (!this.opts.clientKey) {
if (!this.config.clientKey) {
throw new Error('clientKey is required')
}

Expand All @@ -55,28 +57,24 @@ export class Saturn {
if (this.reportingLogs && this.hasPerformanceAPI) {
this._monitorPerformanceBuffer()
}
this.storage = this.opts.storage || memoryStorage()
this.loadNodesPromise = this.opts.experimental ? this._loadNodes(this.opts) : null
this.storage = this.config.storage || memoryStorage()
this.loadNodesPromise = this.config.experimental ? this._loadNodes(this.config) : null
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {Node[]} [opts.nodes]
* @param {Node} [opts.node]
* @param {('car'|'raw')} [opts.format]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {FetchOptions} [opts={}]
* @returns {Promise<object>}
*/
async fetchCIDWithRace (cidPath, opts = {}) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)

const jwt = await getJWT(this.opts, this.storage)

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)
const options = Object.assign({}, this.config, { format: 'car' }, opts)
if (!opts.originFallback) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)
const jwt = await getJWT(options, this.storage)
options.jwt = jwt
}

if (!isBrowserContext) {
options.headers = {
Expand All @@ -87,7 +85,7 @@ export class Saturn {

let nodes = options.nodes
if (!nodes || nodes.length === 0) {
const replacementNode = options.node ?? { url: this.opts.cdnURL }
const replacementNode = { url: options.cdnURL }
nodes = [replacementNode]
}
const controllers = []
Expand Down Expand Up @@ -157,22 +155,20 @@ export class Saturn {
/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {Node} [opts.node]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {FetchOptions} [opts={}]
* @returns {Promise<object>}
*/
async fetchCID (cidPath, opts = {}) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)

const jwt = await getJWT(this.opts, this.storage)
const options = Object.assign({}, this.config, { format: 'car' }, opts)
if (!opts.originFallback) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)
const jwt = await getJWT(this.config, this.storage)
options.jwt = jwt
}

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)
const node = options.node
const origin = node?.url ?? this.opts.cdnURL
const node = options.nodes && options.nodes[0]
const origin = node?.url ?? this.config.cdnURL
const url = this.createRequestURL(cidPath, { ...options, url: origin })

let log = {
Expand Down Expand Up @@ -242,20 +238,15 @@ export class Saturn {
/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {string} [opts.url]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {AbortController} [opts.controller]
* @param {FetchOptions} [opts={}]
* @returns {Promise<AsyncIterable<Uint8Array>>}
*/
async * fetchContentWithFallback (cidPath, opts = {}) {
const upstreamController = opts.controller;
delete opts.controller;
const upstreamController = opts.controller
delete opts.controller

let lastError = null
let skipNodes = false
// we use this to checkpoint at which chunk a request failed.
// this is temporary until range requests are supported.
let byteCountCheckpoint = 0
Expand All @@ -264,16 +255,17 @@ export class Saturn {
throw new Error(`All attempts to fetch content have failed. Last error: ${lastError.message}`)
}

const fetchContent = async function * () {
const controller = new AbortController();
opts.controller = controller;
const fetchContent = async function * (options) {
const controller = new AbortController()
opts.controller = controller
if (upstreamController) {
upstreamController.signal.addEventListener('abort', () => {
controller.abort();
});
controller.abort()
})
}
let byteCount = 0
const byteChunks = await this.fetchContent(cidPath, opts)
const fetchOptions = Object.assign(opts, { format: 'car' }, options)
const byteChunks = await this.fetchContent(cidPath, fetchOptions)
for await (const chunk of byteChunks) {
// avoid sending duplicate chunks
if (byteCount < byteCountCheckpoint) {
Expand All @@ -291,33 +283,34 @@ export class Saturn {
}
}.bind(this)

// Use CDN origin if node list is not loaded
if (this.nodes.length === 0) {
// fetch from origin in the case that no nodes are loaded
opts.url = this.opts.cdnURL
opts.nodes = Array({ url: this.config.cdnURL })
try {
yield * fetchContent()
return
} catch (err) {
lastError = err
if (err.res?.status === 410 || isErrorUnavoidable(err)) {
throwError()
skipNodes = true
} else {
await this.loadNodesPromise
}
await this.loadNodesPromise
}
}

let fallbackCount = 0
const nodes = this.nodes
for (let i = 0; i < nodes.length; i++) {
if (fallbackCount > this.opts.fallbackLimit || upstreamController?.signal.aborted) {
return
if (fallbackCount > this.config.fallbackLimit || skipNodes || upstreamController?.signal.aborted) {
break
}
if (opts.raceNodes) {
opts.nodes = nodes.slice(i, i + Saturn.defaultRaceCount)
} else {
opts.node = nodes[i]
opts.nodes = Array(nodes[i])
}

try {
yield * fetchContent()
return
Expand All @@ -331,18 +324,25 @@ export class Saturn {
}

if (lastError) {
const originUrl = opts.customerFallbackURL ?? this.config.customerFallbackURL
// Use customer origin if cid is not retrievable by lassie.
if (originUrl) {
opts.nodes = Array({ url: originUrl })
try {
yield * fetchContent({ format: null, originFallback: true })
return
} catch (err) {
lastError = err
}
}
throwError()
}
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {FetchOptions} [opts={}]
* @returns {Promise<AsyncIterable<Uint8Array>>}
*/
async * fetchContent (cidPath, opts = {}) {
Expand All @@ -365,7 +365,11 @@ export class Saturn {

try {
const itr = metricsIterable(asAsyncIterable(res.body))
yield * extractVerifiedContent(cidPath, itr)
if (opts.format === 'car') {
yield * extractVerifiedContent(cidPath, itr)
} else {
yield * itr
}
} catch (err) {
log.error = err.message
controller.abort()
Expand All @@ -379,11 +383,7 @@ export class Saturn {
/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {FetchOptions} [opts={}]
* @returns {Promise<Uint8Array>}
*/
async fetchContentBuffer (cidPath, opts = {}) {
Expand All @@ -395,14 +395,21 @@ export class Saturn {
* @param {string} cidPath
* @param {object} [opts={}]
* @param {string} [opts.url]
* @param {string} [opts.format]
* @param {string} [opts.originFallback]
* @param {object} [opts.jwt]
* @returns {URL}
*/
createRequestURL (cidPath, opts) {
let origin = opts.url ?? this.opts.cdnURL
createRequestURL (cidPath, opts = {}) {
let origin = opts.url ?? this.config.cdnURL
origin = addHttpPrefix(origin)
if (opts.originFallback) {
return new URL(origin)
}
const url = new URL(`${origin}/ipfs/${cidPath}`)

url.searchParams.set('format', opts.format)
if (opts.format) url.searchParams.set('format', opts.format)

if (opts.format === 'car') {
url.searchParams.set('dag-scope', 'entity')
}
Expand Down Expand Up @@ -444,10 +451,10 @@ export class Saturn {
: this.logs

await fetch(
this.opts.logURL,
this.config.logURL,
{
method: 'POST',
body: JSON.stringify({ bandwidthLogs, logSender: this.opts.logSender })
body: JSON.stringify({ bandwidthLogs, logSender: this.config.logSender })
}
)

Expand Down Expand Up @@ -569,7 +576,7 @@ export class Saturn {

const url = new URL(origin)
const controller = new AbortController()
const options = Object.assign({}, { method: 'GET' }, this.opts)
const options = Object.assign({}, { method: 'GET' }, this.config)

const connectTimeout = setTimeout(() => {
controller.abort()
Expand Down
14 changes: 14 additions & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,18 @@
* @property {string} url
*/

/**
* Common options for fetch functions.
*
* @typedef {object} FetchOptions
* @property {Node[]} [nodes] - An array of nodes.
* @property {('car'|'raw')} [format] - The format of the fetched content.
* @property {boolean} [originFallback] - Is this a fallback to the customer origin
* @property {boolean} [raceNodes] - Does the fetch race multiple nodes on requests.
* @property {string} [customerFallbackURL] - Customer Origin that is a fallback.
* @property {number} [connectTimeout=5000] - Connection timeout in milliseconds.
* @property {number} [downloadTimeout=0] - Download timeout in milliseconds.
* @property {AbortController} [controller]
*/

export {}
3 changes: 2 additions & 1 deletion src/utils/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ export function isErrorUnavoidable (error) {
/file does not exist/,
/Cannot read properties of undefined \(reading '([^']+)'\)/,
/([a-zA-Z_.]+) is undefined/,
/undefined is not an object \(evaluating '([^']+)'\)/
/undefined is not an object \(evaluating '([^']+)'\)/,
/all retrievals failed/
]

for (const pattern of errorPatterns) {
Expand Down
Loading

0 comments on commit 362c97e

Please sign in to comment.