Skip to content

Commit

Permalink
Allow ignoring websocket when exporting.
Browse files Browse the repository at this point in the history
  • Loading branch information
NoahSaso committed Sep 7, 2023
1 parent 2c0f39d commit f5b600f
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 72 deletions.
8 changes: 7 additions & 1 deletion src/scripts/export/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ program.option(
'--no-webhooks',
"don't send webhooks"
)
program.option(
// Adds inverted `ws` boolean to the options object.
'--no-ws',
"don't connect to websocket"
)
program.parse()
const { config: _config, update, webhooks } = program.opts()
const { config: _config, update, webhooks, ws } = program.opts()

// Load config with config option.
const config = loadConfig(_config)
Expand Down Expand Up @@ -79,6 +84,7 @@ const trace = async () => {
config,
update: !!update,
webhooks: !!webhooks,
websocket: !!ws,
}

const worker = new Worker(path.join(__dirname, 'worker.js'), {
Expand Down
1 change: 1 addition & 0 deletions src/scripts/export/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export type WorkerInitData = {
config: Config
update: boolean
webhooks: boolean
websocket: boolean
}

export type ToWorkerMessage =
Expand Down
149 changes: 78 additions & 71 deletions src/scripts/export/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const main = async () => {
throw new Error('Must be run as a Worker')
}

const { config, update, webhooks } = workerData as WorkerInitData
const { config, update, webhooks, websocket } = workerData as WorkerInitData

// Add Sentry error reporting.
if (config.sentryDsn) {
Expand Down Expand Up @@ -93,84 +93,91 @@ const main = async () => {
}

let webSocketConnected = false
let queueHandler: Promise<void> = new Promise<void>((resolve) => {
// Wait for WebSocket to be ready.
//
// We need to read from the trace as the server is starting but not start
// processing the queue until the WebSocket block listener has connected.
// This is because the trace blocks the server from starting, but we can
// only listen for new blocks once the WebSocket is connected at some point
// after the server has started. We have to read from the trace to allow the
// server to start up.
let queueHandler: Promise<void> = websocket
? // Wait for WebSocket to be ready.
new Promise<void>((resolve) => {
// We need to read from the trace as the server is starting but not
// start processing the queue until the WebSocket block listener has
// connected. This is because the trace blocks the server from starting,
// but we can only listen for new blocks once the WebSocket is connected
// at some point after the server has started. We have to read from the
// trace to allow the server to start up.

// Wait for WebSocket to be ready.
const interval = setInterval(() => {
if (webSocketConnected) {
clearInterval(interval)
resolve()
}
}, 1000)
})

// Connect to local RPC WebSocket once ready. Don't await since we need to
// start reading from the trace FIFO before the RPC starts.
waitPort({
host: 'localhost',
port: 26657,
output: 'silent',
}).then(({ open }) => {
if (open) {
const setUpWebSocket = () => {
// Get new-block WebSocket.
const webSocket = setUpWebSocketNewBlockListener({
rpc: 'http://127.0.0.1:26657',
onNewBlock: async (block) => {
const { chain_id, height, time } = (block as any).header
const latestBlockHeight = Number(height)
const latestBlockTimeUnixMs = Date.parse(time)
// Wait for WebSocket to be ready.
const interval = setInterval(() => {
if (webSocketConnected) {
clearInterval(interval)
resolve()
}
}, 1000)
})
: // Don't wait for websocket.
Promise.resolve()

// Cache block time for block height in cache used by state.
blockHeightToTimeCache.set(latestBlockHeight, latestBlockTimeUnixMs)
if (websocket) {
// Connect to local RPC WebSocket once ready. Don't await since we need to
// start reading from the trace FIFO before the RPC starts.
waitPort({
host: 'localhost',
port: 26657,
output: 'silent',
}).then(({ open }) => {
if (open) {
const setUpWebSocket = () => {
// Get new-block WebSocket.
const webSocket = setUpWebSocketNewBlockListener({
rpc: 'http://127.0.0.1:26657',
onNewBlock: async (block) => {
const { chain_id, height, time } = (block as any).header
const latestBlockHeight = Number(height)
const latestBlockTimeUnixMs = Date.parse(time)

// Update state singleton with latest information.
await State.update(
{
chainId: chain_id,
// Cache block time for block height in cache used by state.
blockHeightToTimeCache.set(
latestBlockHeight,
latestBlockTimeUnixMs,
},
{
where: {
singleton: true,
latestBlockTimeUnixMs
)

// Update state singleton with latest information.
await State.update(
{
chainId: chain_id,
latestBlockHeight,
latestBlockTimeUnixMs,
},
{
where: {
singleton: true,
},
}
)
},
onConnect: () => {
webSocketConnected = true
console.log('WebSocket connected.')
},
onError: (error) => {
// If fails to connect, retry after three seconds.
if (error.message.includes('ECONNREFUSED')) {
console.error('Failed to connect to WebSocket.', error)
webSocket.terminate()

setTimeout(setUpWebSocket, 3000)
} else {
console.error('WebSocket error', error)
}
)
},
onConnect: () => {
webSocketConnected = true
console.log('WebSocket connected.')
},
onError: (error) => {
// If fails to connect, retry after three seconds.
if (error.message.includes('ECONNREFUSED')) {
console.error('Failed to connect to WebSocket.', error)
webSocket.terminate()
},
})
}

setTimeout(setUpWebSocket, 3000)
} else {
console.error('WebSocket error', error)
}
},
})
setUpWebSocket()
} else {
console.error(
'Failed to connect to local RPC WebSocket. Queries may be slower as block times will be fetched from a remote RPC.'
)
}

setUpWebSocket()
} else {
console.error(
'Failed to connect to local RPC WebSocket. Queries may be slower as block times will be fetched from a remote RPC.'
)
}
})
})
}

let processed = 0
// Update parent on processed count.
Expand Down

0 comments on commit f5b600f

Please sign in to comment.