diff --git a/src/scripts/export/index.ts b/src/scripts/export/index.ts index d6a04e5b..f8c86873 100644 --- a/src/scripts/export/index.ts +++ b/src/scripts/export/index.ts @@ -34,8 +34,13 @@ program.option( '--no-webhooks', "don't send webhooks" ) +program.option( + // Adds inverted `ws` boolean to the options object. + '--no-ws', + "don't connect to websocket" +) program.parse() -const { config: _config, update, webhooks } = program.opts() +const { config: _config, update, webhooks, ws } = program.opts() // Load config with config option. const config = loadConfig(_config) @@ -79,6 +84,7 @@ const trace = async () => { config, update: !!update, webhooks: !!webhooks, + websocket: !!ws, } const worker = new Worker(path.join(__dirname, 'worker.js'), { diff --git a/src/scripts/export/types.ts b/src/scripts/export/types.ts index 9e4a3eb9..f2c213d3 100644 --- a/src/scripts/export/types.ts +++ b/src/scripts/export/types.ts @@ -37,6 +37,7 @@ export type WorkerInitData = { config: Config update: boolean webhooks: boolean + websocket: boolean } export type ToWorkerMessage = diff --git a/src/scripts/export/worker.ts b/src/scripts/export/worker.ts index d3ec6dde..632656ca 100644 --- a/src/scripts/export/worker.ts +++ b/src/scripts/export/worker.ts @@ -19,7 +19,7 @@ const main = async () => { throw new Error('Must be run as a Worker') } - const { config, update, webhooks } = workerData as WorkerInitData + const { config, update, webhooks, websocket } = workerData as WorkerInitData // Add Sentry error reporting. if (config.sentryDsn) { @@ -93,84 +93,91 @@ const main = async () => { } let webSocketConnected = false - let queueHandler: Promise = new Promise((resolve) => { - // Wait for WebSocket to be ready. - // - // We need to read from the trace as the server is starting but not start - // processing the queue until the WebSocket block listener has connected. - // This is because the trace blocks the server from starting, but we can - // only listen for new blocks once the WebSocket is connected at some point - // after the server has started. We have to read from the trace to allow the - // server to start up. + let queueHandler: Promise = websocket + ? // Wait for WebSocket to be ready. + new Promise((resolve) => { + // We need to read from the trace as the server is starting but not + // start processing the queue until the WebSocket block listener has + // connected. This is because the trace blocks the server from starting, + // but we can only listen for new blocks once the WebSocket is connected + // at some point after the server has started. We have to read from the + // trace to allow the server to start up. - // Wait for WebSocket to be ready. - const interval = setInterval(() => { - if (webSocketConnected) { - clearInterval(interval) - resolve() - } - }, 1000) - }) - - // Connect to local RPC WebSocket once ready. Don't await since we need to - // start reading from the trace FIFO before the RPC starts. - waitPort({ - host: 'localhost', - port: 26657, - output: 'silent', - }).then(({ open }) => { - if (open) { - const setUpWebSocket = () => { - // Get new-block WebSocket. - const webSocket = setUpWebSocketNewBlockListener({ - rpc: 'http://127.0.0.1:26657', - onNewBlock: async (block) => { - const { chain_id, height, time } = (block as any).header - const latestBlockHeight = Number(height) - const latestBlockTimeUnixMs = Date.parse(time) + // Wait for WebSocket to be ready. + const interval = setInterval(() => { + if (webSocketConnected) { + clearInterval(interval) + resolve() + } + }, 1000) + }) + : // Don't wait for websocket. + Promise.resolve() - // Cache block time for block height in cache used by state. - blockHeightToTimeCache.set(latestBlockHeight, latestBlockTimeUnixMs) + if (websocket) { + // Connect to local RPC WebSocket once ready. Don't await since we need to + // start reading from the trace FIFO before the RPC starts. + waitPort({ + host: 'localhost', + port: 26657, + output: 'silent', + }).then(({ open }) => { + if (open) { + const setUpWebSocket = () => { + // Get new-block WebSocket. + const webSocket = setUpWebSocketNewBlockListener({ + rpc: 'http://127.0.0.1:26657', + onNewBlock: async (block) => { + const { chain_id, height, time } = (block as any).header + const latestBlockHeight = Number(height) + const latestBlockTimeUnixMs = Date.parse(time) - // Update state singleton with latest information. - await State.update( - { - chainId: chain_id, + // Cache block time for block height in cache used by state. + blockHeightToTimeCache.set( latestBlockHeight, - latestBlockTimeUnixMs, - }, - { - where: { - singleton: true, + latestBlockTimeUnixMs + ) + + // Update state singleton with latest information. + await State.update( + { + chainId: chain_id, + latestBlockHeight, + latestBlockTimeUnixMs, }, + { + where: { + singleton: true, + }, + } + ) + }, + onConnect: () => { + webSocketConnected = true + console.log('WebSocket connected.') + }, + onError: (error) => { + // If fails to connect, retry after three seconds. + if (error.message.includes('ECONNREFUSED')) { + console.error('Failed to connect to WebSocket.', error) + webSocket.terminate() + + setTimeout(setUpWebSocket, 3000) + } else { + console.error('WebSocket error', error) } - ) - }, - onConnect: () => { - webSocketConnected = true - console.log('WebSocket connected.') - }, - onError: (error) => { - // If fails to connect, retry after three seconds. - if (error.message.includes('ECONNREFUSED')) { - console.error('Failed to connect to WebSocket.', error) - webSocket.terminate() + }, + }) + } - setTimeout(setUpWebSocket, 3000) - } else { - console.error('WebSocket error', error) - } - }, - }) + setUpWebSocket() + } else { + console.error( + 'Failed to connect to local RPC WebSocket. Queries may be slower as block times will be fetched from a remote RPC.' + ) } - - setUpWebSocket() - } else { - console.error( - 'Failed to connect to local RPC WebSocket. Queries may be slower as block times will be fetched from a remote RPC.' - ) - } - }) + }) + } let processed = 0 // Update parent on processed count.