Skip to content

Commit

Permalink
feat: log node ids with smart client (#35)
Browse files Browse the repository at this point in the history
* change origin domains

* improve test setup

* enhancement: add node ids to logging

* add id to test-utils

* Update src/client.js

Co-authored-by: Eric Guan <[email protected]>

* fix typo

* Update src/client.js

Co-authored-by: Eric Guan <[email protected]>

* log parsing

---------

Co-authored-by: Eric Guan <[email protected]>
  • Loading branch information
AmeanAsad and guanzo authored Oct 31, 2023
1 parent 76c5802 commit 35119d9
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 48 deletions.
48 changes: 28 additions & 20 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ export class Saturn {
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {Node[]} [opts.nodes]
* @param {Node} [opts.node]
* @param {('car'|'raw')} [opts.format]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
Expand All @@ -83,17 +85,16 @@ export class Saturn {
}
}

let origins = options.origins
if (!origins || origins.length === 0) {
const replacementUrl = options.url ?? options.cdnURL
origins = [replacementUrl]
let nodes = options.nodes
if (!nodes || nodes.length === 0) {
const replacementNode = options.node ?? { url: this.opts.cdnURL }
nodes = [replacementNode]
}
const controllers = []

const createFetchPromise = async (origin) => {
const fetchOptions = { ...options, url: origin }
const createFetchPromise = async (node) => {
const fetchOptions = { ...options, url: node.url }
const url = this.createRequestURL(cidPath, fetchOptions)

const controller = new AbortController()
controllers.push(controller)
const connectTimeout = setTimeout(() => {
Expand All @@ -103,11 +104,10 @@ export class Saturn {
try {
res = await fetch(parseUrl(url), { signal: controller.signal, ...options })
clearTimeout(connectTimeout)
return { res, url, controller }
return { res, url, node, controller }
} catch (err) {
throw new Error(
`Non OK response received: ${res.status} ${res.statusText}`
)
err.node = node
throw err
}
}

Expand All @@ -119,17 +119,18 @@ export class Saturn {
})
}

const fetchPromises = Promise.any(origins.map((origin) => createFetchPromise(origin)))
const fetchPromises = Promise.any(nodes.map((node) => createFetchPromise(node)))

let log = {
startTime: new Date()
}

let res, url, controller
let res, url, controller, node
try {
({ res, url, controller } = await fetchPromises)
({ res, url, controller, node } = await fetchPromises)

abortRemainingFetches(controller, controllers)
log.nodeId = node.id
log = Object.assign(log, this._generateLog(res, log), { url })
if (!res.ok) {
const error = new Error(
Expand All @@ -142,6 +143,8 @@ export class Saturn {
if (!res) {
log.error = err.message
}
if (err.node) log.nodeId = err.node.id

// Report now if error, otherwise report after download is done.
this._finalizeLog(log)

Expand All @@ -156,6 +159,7 @@ export class Saturn {
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {Node} [opts.node]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @returns {Promise<object>}
Expand All @@ -167,11 +171,15 @@ export class Saturn {
const jwt = await getJWT(this.opts, this.storage)

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)
const url = this.createRequestURL(cidPath, options)
const node = options.node
const origin = node?.url ?? this.opts.cdnURL
const url = this.createRequestURL(cidPath, { ...options, url: origin })

let log = {
url,
startTime: new Date()
}
if (node?.id) log.nodeId = node.id

const controller = options.controller ?? new AbortController()
const connectTimeout = setTimeout(() => {
Expand Down Expand Up @@ -220,7 +228,7 @@ export class Saturn {
const { headers } = res
log.httpStatusCode = res.status
log.cacheHit = headers.get('saturn-cache-status') === 'HIT'
log.nodeId = headers.get('saturn-node-id')
log.nodeId = log.nodeId ?? headers.get('saturn-node-id')
log.requestId = headers.get('saturn-transfer-id')
log.httpProtocol = headers.get('quic-status')

Expand Down Expand Up @@ -294,10 +302,9 @@ export class Saturn {
return
}
if (opts.raceNodes) {
const origins = nodes.slice(i, i + Saturn.defaultRaceCount).map((node) => node.url)
opts.origins = origins
opts.nodes = nodes.slice(i, i + Saturn.defaultRaceCount)
} else {
opts.url = nodes[i].url
opts.node = nodes[i]
}

try {
Expand Down Expand Up @@ -376,10 +383,11 @@ export class Saturn {
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {string} [opts.url]
* @returns {URL}
*/
createRequestURL (cidPath, opts) {
let origin = opts.url || (opts.origins && opts.origins[0]) || opts.cdnURL
let origin = opts.url ?? this.opts.cdnURL
origin = addHttpPrefix(origin)
const url = new URL(`${origin}/ipfs/${cidPath}`)

Expand Down
1 change: 1 addition & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
/**
*
* @typedef {object} Node
* @property {string} id
* @property {string} ip
* @property {number} weight
* @property {number} distance
Expand Down
62 changes: 34 additions & 28 deletions test/fallback.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ import { describe, mock, test } from 'node:test'
import { Saturn } from '#src/index.js'
import { concatChunks, generateNodes, getMockServer, HTTP_STATUS_GONE, mockJWT, mockNodesHandlers, mockOrchHandler, mockSaturnOriginHandler, MSW_SERVER_OPTS } from './test-utils.js'

const TEST_DEFAULT_ORCH = 'https://orchestrator.strn.pl/nodes'
const TEST_DEFAULT_ORCH = 'https://orchestrator.strn.pl.test/nodes'
const TEST_NODES_LIST_KEY = 'saturn-nodes'
const TEST_AUTH = 'https://fz3dyeyxmebszwhuiky7vggmsu0rlkoy.lambda-url.us-west-2.on.aws/'
const TEST_ORIGIN_DOMAIN = 'saturn.ms'
const TEST_AUTH = 'https://auth.test/'
const TEST_ORIGIN_DOMAIN = 'l1s.saturn.test'
const CLIENT_KEY = 'key'

const experimental = true
const options = {
cdnURL: TEST_ORIGIN_DOMAIN,
orchURL: TEST_DEFAULT_ORCH,
authURL: TEST_AUTH,
experimental: true,
clientKey: CLIENT_KEY,
clientId: 'test'
}

describe('Client Fallback', () => {
test('Nodes are loaded from the orchestrator if no storage is passed', async (t) => {
Expand All @@ -24,7 +31,7 @@ describe('Client Fallback', () => {
const expectedNodes = generateNodes(2, TEST_ORIGIN_DOMAIN)

// No Storage is injected
const saturn = new Saturn({ clientKey: CLIENT_KEY, experimental })
const saturn = new Saturn({ ...options })
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }

await saturn._loadNodes(mockOpts)
Expand All @@ -37,7 +44,7 @@ describe('Client Fallback', () => {

test('Storage is invoked correctly when supplied', async (t) => {
const handlers = [
mockOrchHandler(2, TEST_DEFAULT_ORCH, 'saturn.ms')
mockOrchHandler(2, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN)
]
const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
Expand All @@ -53,7 +60,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, experimental })
const saturn = new Saturn({ storage: mockStorage, ...options })

// Mocking options
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }
Expand All @@ -75,7 +82,7 @@ describe('Client Fallback', () => {

test('Storage is loaded first when the orch is slower', async (t) => {
const handlers = [
mockOrchHandler(2, TEST_DEFAULT_ORCH, 'saturn.ms', 1000)
mockOrchHandler(2, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN, 500)
]
const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
Expand All @@ -90,7 +97,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, experimental })
const saturn = new Saturn({ storage: mockStorage, ...options })

// Mocking options
const mockOpts = { orchURL: TEST_DEFAULT_ORCH }
Expand All @@ -111,7 +118,7 @@ describe('Client Fallback', () => {

test('Content Fallback fetches a cid properly', async (t) => {
const handlers = [
mockOrchHandler(2, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(2, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(2, TEST_ORIGIN_DOMAIN)
Expand All @@ -129,7 +136,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ storage: mockStorage, ...options })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4')

Expand All @@ -144,7 +151,7 @@ describe('Client Fallback', () => {

test('Content Fallback fetches a cid properly with race', async (t) => {
const handlers = [
mockOrchHandler(5, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(5, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(5, TEST_ORIGIN_DOMAIN)
Expand All @@ -162,8 +169,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })
// const origins =
const saturn = new Saturn({ ...options })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

Expand All @@ -178,7 +184,7 @@ describe('Client Fallback', () => {

test('Content Fallback with race fetches from consecutive nodes on failure', async (t) => {
const handlers = [
mockOrchHandler(5, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(5, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
mockSaturnOriginHandler(TEST_ORIGIN_DOMAIN, 0, true),
...mockNodesHandlers(5, TEST_ORIGIN_DOMAIN, 2)
Expand All @@ -196,7 +202,7 @@ describe('Client Fallback', () => {
t.mock.method(mockStorage, 'get')
t.mock.method(mockStorage, 'set')

const saturn = new Saturn({ storage: mockStorage, clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ storage: mockStorage, ...options })

const cid = saturn.fetchContentWithFallback('bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4', { raceNodes: true })

Expand All @@ -211,14 +217,14 @@ describe('Client Fallback', () => {

test('should fetch content from the first node successfully', async () => {
const handlers = [
mockOrchHandler(2, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(2, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(2, TEST_ORIGIN_DOMAIN)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })

const fetchContentMock = mock.fn(async function * (cidPath, opts) {
yield Buffer.from('chunk1')
Expand All @@ -239,14 +245,14 @@ describe('Client Fallback', () => {
test('should try all nodes and fail if all nodes fail', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })

const fetchContentMock = mock.fn(async function * (cidPath, opts) { throw new Error('Fetch error') }) // eslint-disable-line
saturn.fetchContent = fetchContentMock
Expand All @@ -270,14 +276,14 @@ describe('Client Fallback', () => {
test('Should abort fallback on 410s', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN, 3, HTTP_STATUS_GONE)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })
await saturn.loadNodesPromise

let error
Expand All @@ -299,14 +305,14 @@ describe('Client Fallback', () => {
test('Should abort fallback on specific errors', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN, 3, HTTP_STATUS_GONE)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })
await saturn.loadNodesPromise

let callCount = 0
Expand Down Expand Up @@ -334,14 +340,14 @@ describe('Client Fallback', () => {
test('Handles fallback with chunk overlap correctly', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })

let callCount = 0
const fetchContentMock = mock.fn(async function * (cidPath, opts) {
Expand Down Expand Up @@ -373,14 +379,14 @@ describe('Client Fallback', () => {
test('should handle byte chunk overlaps correctly', async () => {
const numNodes = 3
const handlers = [
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, 'saturn.ms'),
mockOrchHandler(numNodes, TEST_DEFAULT_ORCH, TEST_ORIGIN_DOMAIN),
mockJWT(TEST_AUTH),
...mockNodesHandlers(numNodes, TEST_ORIGIN_DOMAIN)
]

const server = getMockServer(handlers)
server.listen(MSW_SERVER_OPTS)
const saturn = new Saturn({ clientKey: CLIENT_KEY, clientId: 'test', experimental })
const saturn = new Saturn({ ...options })

let callCount = 0
let fetchContentMock = mock.fn(async function * (cidPath, opts) {
Expand Down
2 changes: 2 additions & 0 deletions test/test-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export function generateNodes (count, originDomain) {
const nodeIp = `node${i}`
const node = {
ip: nodeIp,
id: nodeIp,
weight: 50,
distance: 100,
url: `https://${nodeIp}.${originDomain}`
Expand All @@ -52,6 +53,7 @@ export function generateNodes (count, originDomain) {
*/
export function mockSaturnOriginHandler (cdnURL, delay = 0, error = false) {
cdnURL = addHttpPrefix(cdnURL)
cdnURL = `${cdnURL}/ipfs/:cid`
return rest.get(cdnURL, (req, res, ctx) => {
if (error) {
throw Error('Simulated Error')
Expand Down

0 comments on commit 35119d9

Please sign in to comment.