Skip to content

Commit

Permalink
Improve and re-enable BlockbookWsAdapter integration
Browse files Browse the repository at this point in the history
The EthereumNetwork instance will now gracefully fallback to polling
when an adapter disconnects.

Connection to an adapter (or re-connection) will always require an
initial fetch for transactions before disabling polling.
  • Loading branch information
samholmes committed Nov 13, 2024
1 parent 6c2904d commit 337a876
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

- fixed: (ETH) Improved and re-enabled BlockbookWsAdapter integration
- fixed: (FIO) Fix unstakefio amount saved to `stakedAmounts`

## 4.27.1 (2024-11-11)
Expand Down
94 changes: 69 additions & 25 deletions src/ethereum/EthereumNetwork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ interface EthereumNeeds {
* is not defined, then the engine will always pull for address data; it
* assume all needs are true.
*/
addressSync?: {
addressSync: {
// This should be true immediately after connecting to a network adapter.
needsInitialSync: boolean
// These are the txids that need to be checked from the
// subscribeAddressSync handler.
needsTxids: string[]
}
}
Expand Down Expand Up @@ -145,9 +149,20 @@ export class EthereumNetwork {
needsLoopTask: PeriodicTask
networkAdapters: NetworkAdapter[]

// Add properties to manage websocket connections and retries
private readonly adapterConnections: Map<
NetworkAdapter,
'connected' | 'disconnected'
> = new Map()

constructor(ethEngine: EthereumEngine) {
this.ethEngine = ethEngine
this.ethNeeds = {}
this.ethNeeds = {
addressSync: {
needsInitialSync: true,
needsTxids: []
}
}
this.needsLoopTask = makePeriodicTask(
this.needsLoop.bind(this),
NEEDS_LOOP_INTERVAL,
Expand All @@ -163,19 +178,12 @@ export class EthereumNetwork {

private setupAdapterSubscriptions(): void {
const handleSubscribeAddressSync = (txid?: string): void => {
if (this.ethNeeds.addressSync == null) {
this.ethNeeds.addressSync = {
needsTxids: []
}
} else {
if (txid != null) {
this.ethNeeds.addressSync = {
needsTxids: [...this.ethNeeds.addressSync.needsTxids, txid]
}
}
if (txid != null) {
this.ethNeeds.addressSync.needsTxids.push(txid)
}
}
this.qualifyNetworkAdapters('subscribeAddressSync').forEach(adapter => {
const adapters = this.qualifyNetworkAdapters('subscribeAddressSync')
adapters.forEach(adapter => {
adapter.subscribeAddressSync(
this.ethEngine.walletLocalData.publicKey,
handleSubscribeAddressSync
Expand Down Expand Up @@ -409,18 +417,39 @@ export class EthereumNetwork {
}
}

protected shouldCheckAndUpdateTxsOrBalances(): boolean {
// We should check txs and balances if:
return (
// The wallet is still doing initial sync:
!this.ethEngine.addressesChecked ||
// The wallet has no push-based syndication state:
this.ethNeeds.addressSync == null ||
// The wallet has txs that need to be checked:
this.ethNeeds.addressSync.needsTxids.length > 0
private isAnAdapterConnected(): boolean {
return [...this.adapterConnections.values()].some(
status => status === 'connected'
)
}

protected shouldCheckAndUpdateTxsOrBalances(): boolean {
if (!this.ethEngine.addressesChecked) {
// The wallet is still doing initial sync
return true
}

if (this.isAnAdapterConnected()) {
// Conditions only while the wallet is connected to a network adapter

if (this.ethNeeds.addressSync.needsInitialSync) {
// The wallet has no push-based syndication state:
return true
}

if (this.ethNeeds.addressSync.needsTxids.length > 0) {
// The wallet has txs that need to be checked:
return true
}

// The wallet has no checks needed:
return false
}

// The default is always to check
return true
}

processEthereumNetworkUpdate = (
ethereumNetworkUpdate: EthereumNetworkUpdate
): void => {
Expand Down Expand Up @@ -504,8 +533,8 @@ export class EthereumNetwork {

// Update addressSync state:
if (
this.ethNeeds.addressSync != null &&
// Don't change address needs if address has not done it's initial sync
// Don't update address needs if the engine has not finished it's
// initial sync.
this.ethEngine.addressesChecked
) {
// Filter the txids that have been processed:
Expand All @@ -525,6 +554,10 @@ export class EthereumNetwork {
}
)
this.ethNeeds.addressSync.needsTxids = updatedNeedsTxIds
// Since we've processed all the txids, we can set needsSync to false
// because we've done a sync. However, the engine will still poll for
// address data if there remains txids in the `needsTxIds`.
this.ethNeeds.addressSync.needsInitialSync = false
}
}

Expand All @@ -547,7 +580,18 @@ export class EthereumNetwork {

connectNetworkAdapters(): void {
this.qualifyNetworkAdapters('connect').forEach(adapter => {
adapter.connect()
adapter.connect(isConnected => {
if (isConnected) {
this.ethEngine.log('Adapter connected')
this.adapterConnections.set(adapter, 'connected')
} else {
this.ethEngine.log('Adapter disconnected')
this.adapterConnections.set(adapter, 'disconnected')
// This allows for the engine to poll from addresses once again when an
// adapter reconnects.
this.ethNeeds.addressSync.needsInitialSync = true
}
})
})
}

Expand Down
22 changes: 11 additions & 11 deletions src/ethereum/info/ethereumInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1164,17 +1164,17 @@ const networkInfo: EthereumNetworkInfo = {
'https://eth2.trezor.io'
]
},
// {
// type: 'blockbook-ws',
// connections: [
// // {
// // url: 'wss://eth-blockbook.nownodes.io/wss',
// // keyType: 'nowNodesApiKey'
// // },
// { url: 'wss://eth1.trezor.io/websocket' },
// { url: 'wss://eth2.trezor.io/websocket' }
// ]
// },
{
type: 'blockbook-ws',
connections: [
// {
// url: 'wss://eth-blockbook.nownodes.io/wss',
// keyType: 'nowNodesApiKey'
// },
{ url: 'wss://eth1.trezor.io/websocket' },
{ url: 'wss://eth2.trezor.io/websocket' }
]
},
{
type: 'blockchair',
servers: ['https://api.blockchair.com']
Expand Down
20 changes: 14 additions & 6 deletions src/ethereum/networkAdapters/BlockbookWsAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { makePeriodicTask, PeriodicTask } from '../../common/periodicTask'
import { pickRandomOne } from '../../common/utils'
import { EthereumEngine } from '../EthereumEngine'
import { EthereumInitOptions } from '../ethereumTypes'
import { NetworkAdapter } from './types'
import { ConnectionChangeHandler, NetworkAdapter } from './types'

export interface BlockbookWsAdapterConfig {
type: 'blockbook-ws'
Expand Down Expand Up @@ -80,6 +80,7 @@ export class BlockbookWsAdapter extends NetworkAdapter<BlockbookWsAdapterConfig>
private isConnected: Promise<boolean> = Promise.resolve(false)
private readonly pingTask: PeriodicTask
private reconnectTries: number = 0
private handleConnection?: ConnectionChangeHandler
pingIntervalId: NodeJS.Timer | undefined

constructor(engine: EthereumEngine, config: BlockbookWsAdapterConfig) {
Expand All @@ -91,13 +92,14 @@ export class BlockbookWsAdapter extends NetworkAdapter<BlockbookWsAdapterConfig>
this.pingTask = makePeriodicTask(this.ping, KEEP_ALIVE_MS)
}

connect = (): void => {
connect = (handleConnection?: ConnectionChangeHandler): void => {
const wsUrl = this.getWsUrl()
// @ts-expect-error
const ws: StandardWebSocket = new WebSocket(wsUrl)

// Set the WebSocket instance
this.ws = ws
this.handleConnection = handleConnection ?? this.handleConnection

this.isConnected = new Promise(resolve => {
const connectionFailure = (): void => {
Expand All @@ -117,15 +119,20 @@ export class BlockbookWsAdapter extends NetworkAdapter<BlockbookWsAdapterConfig>
ws.addEventListener('open', () => {
this.reconnectTries = 0
this.pingTask.start()
if (this.handleConnection != null) this.handleConnection(true)
})

// Connection failure:
ws.addEventListener('error', () => {
ws.addEventListener('close', () => {
const delay = this.backoffInterval()
console.warn(
`WebSocket connection closed do to error. Retrying in ${delay}ms`
)
console.warn(`WebSocket connection closed. Retrying in ${delay}ms`)
setTimeout(() => this.reconnect(), delay)
if (this.handleConnection != null) this.handleConnection(false)
})

// Connection error:
ws.addEventListener('error', error => {
console.warn(`WebSocket error`, error)
})

// Connection message:
Expand Down Expand Up @@ -161,6 +168,7 @@ export class BlockbookWsAdapter extends NetworkAdapter<BlockbookWsAdapterConfig>
this.isConnected = Promise.resolve(false)
this.pingTask.stop()
this.subscriptions.clear()
if (this.handleConnection != null) this.handleConnection(false)
}

subscribeAddressSync = async (
Expand Down
4 changes: 3 additions & 1 deletion src/ethereum/networkAdapters/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export type NetworkAdapterUpdateMethod = keyof Pick<
| 'fetchTxs'
>

export type ConnectionChangeHandler = (isConnected: boolean) => void

export abstract class NetworkAdapter<
Config extends NetworkAdapterConfig = NetworkAdapterConfig
> {
Expand All @@ -56,7 +58,7 @@ export abstract class NetworkAdapter<
| ((tx: EdgeTransaction) => Promise<BroadcastResults>)
| null

abstract connect: (() => void) | null
abstract connect: ((cb?: ConnectionChangeHandler) => void) | null

abstract disconnect: (() => void) | null

Expand Down

0 comments on commit 337a876

Please sign in to comment.