diff --git a/docs/api.md b/docs/api.md index 5dcc1207..39b2b5fc 100644 --- a/docs/api.md +++ b/docs/api.md @@ -29,6 +29,7 @@ This returns the status of the indexer. }, "lastStakingBlockHeightExported": string | null "lastWasmBlockHeightExported": string | null + "lastBankBlockHeightExported": string | null } ``` diff --git a/src/core/env.ts b/src/core/env.ts index 2698651d..7b573c70 100644 --- a/src/core/env.ts +++ b/src/core/env.ts @@ -1,6 +1,7 @@ import { Op, Sequelize } from 'sequelize' import { + BankStateEvent, Contract, StakingSlashEvent, WasmStateEvent, @@ -13,6 +14,8 @@ import { Cache, Env, EnvOptions, + FormulaBalanceGetter, + FormulaBalancesGetter, FormulaCodeIdKeyForContractGetter, FormulaCodeIdsForKeysGetter, FormulaContractGetter, @@ -967,6 +970,129 @@ export const getEnv = ({ return txEvents.map((txEvent) => txEvent.toJSON()) } + const getBalance: FormulaBalanceGetter = async (address, denom) => { + const dependentKey = getDependentKey( + BankStateEvent.dependentKeyNamespace, + address, + denom + ) + dependentKeys?.push({ + key: dependentKey, + prefix: false, + }) + + // Check cache. + const cachedEvent = cache.events[dependentKey] + const event = + // If undefined, we haven't tried to fetch it yet. If not undefined, + // either it exists or it doesn't (null). + cachedEvent !== undefined + ? cachedEvent?.[0] + : await BankStateEvent.findOne({ + where: { + address, + denom, + blockHeight: blockHeightFilter, + }, + order: [['blockHeight', 'DESC']], + }) + + // Type-check. Should never happen assuming dependent key namespaces are + // unique across different event types. + if (event && !(event instanceof BankStateEvent)) { + throw new Error('Incorrect event type.') + } + + // Cache event, null if nonexistent. + if (cachedEvent === undefined) { + cache.events[dependentKey] = event ? [event] : null + } + + // If no event found, return undefined. + if (!event) { + return + } + + // Call hook. + await onFetch?.([event]) + + return BigInt(event.balance) + } + + const getBalances: FormulaBalancesGetter = async (address) => { + const dependentKey = getDependentKey( + BankStateEvent.dependentKeyNamespace, + address, + undefined + ) + dependentKeys?.push({ + key: dependentKey, + prefix: true, + }) + + // Check cache. + const cachedEvents = cache.events[dependentKey] + + const events = + // If undefined, we haven't tried to fetch them yet. If not undefined, + // either they exist or they don't (null). + cachedEvents !== undefined + ? ((cachedEvents ?? []) as BankStateEvent[]) + : await BankStateEvent.findAll({ + attributes: [ + // DISTINCT ON is not directly supported by Sequelize, so we need + // to cast to unknown and back to string to insert this at the + // beginning of the query. This ensures we use the most recent + // version of each denom. + Sequelize.literal( + 'DISTINCT ON("denom") \'\'' + ) as unknown as string, + 'denom', + 'address', + 'blockHeight', + 'blockTimeUnixMs', + 'balance', + ], + where: { + address, + blockHeight: blockHeightFilter, + }, + order: [ + // Needs to be first so we can use DISTINCT ON. + ['denom', 'ASC'], + ['blockHeight', 'DESC'], + ], + }) + + // Type-check. Should never happen assuming dependent key namespaces are + // unique across different event types. + if (events.some((event) => !(event instanceof BankStateEvent))) { + throw new Error('Incorrect event type.') + } + + // Cache events, null if nonexistent. + if (cachedEvents === undefined) { + cache.events[dependentKey] = events.length ? events : null + } + + // If no events found, return undefined. + if (!events.length) { + return + } + + // Call hook. + await onFetch?.(events) + + // Create denom balance map. + return events.reduce( + (acc, { denom, balance }) => ({ + ...acc, + [denom]: BigInt(balance), + }), + {} as Record + ) + } + return { chainId, block, @@ -993,5 +1119,8 @@ export const getEnv = ({ getSlashEvents, getTxEvents, + + getBalance, + getBalances, } } diff --git a/src/core/types.ts b/src/core/types.ts index 0afd2483..5ff00ce6 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -227,6 +227,15 @@ export type FormulaTxEventsGetter = ( | undefined > +export type FormulaBalanceGetter = ( + address: string, + denom: string +) => Promise + +export type FormulaBalancesGetter = ( + address: string +) => Promise | undefined> + export type Env = {}> = { chainId: string block: Block @@ -253,6 +262,8 @@ export type Env = {}> = { getCodeIdKeyForContract: FormulaCodeIdKeyForContractGetter getSlashEvents: FormulaSlashEventsGetter getTxEvents: FormulaTxEventsGetter + getBalance: FormulaBalanceGetter + getBalances: FormulaBalancesGetter } export interface EnvOptions { @@ -439,6 +450,15 @@ export type ParsedWasmTxEvent = { export type ParsedWasmEvent = ParsedWasmStateEvent | ParsedWasmTxEvent +export type ParsedBankStateEvent = { + address: string + blockHeight: string + blockTimeUnixMs: string + blockTimestamp: Date + denom: string + balance: string +} + type RequireAtLeastOne = Pick< T, Exclude diff --git a/src/data/formulas/wallet/bank.ts b/src/data/formulas/wallet/bank.ts new file mode 100644 index 00000000..cb672350 --- /dev/null +++ b/src/data/formulas/wallet/bank.ts @@ -0,0 +1,22 @@ +import { WalletFormula } from '@/core' + +export const balance: WalletFormula = { + compute: async ({ walletAddress, getBalance, args: { denom } }) => { + if (!denom) { + throw new Error('missing `denom`') + } + + return (await getBalance(walletAddress, denom))?.toString() + }, +} + +export const balances: WalletFormula> = { + compute: async ({ walletAddress, getBalances }) => + Object.entries((await getBalances(walletAddress)) || {}).reduce( + (acc, [denom, balance]) => ({ + ...acc, + [denom]: balance.toString(), + }), + {} as Record + ), +} diff --git a/src/data/formulas/wallet/index.ts b/src/data/formulas/wallet/index.ts index 817aada5..519082d1 100644 --- a/src/data/formulas/wallet/index.ts +++ b/src/data/formulas/wallet/index.ts @@ -1,3 +1,4 @@ +export * as bank from './bank' export * as daos from './daos' export * as nft from './nft' export * as proposals from './proposals' diff --git a/src/db/connection.ts b/src/db/connection.ts index dabf5b27..f705d21e 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -12,6 +12,7 @@ import { AccountWebhookCodeIdSet, AccountWebhookEvent, AccountWebhookEventAttempt, + BankStateEvent, Computation, ComputationDependency, Contract, @@ -36,6 +37,7 @@ type LoadDbOptions = { const getModelsForType = (type: DbType): SequelizeOptions['models'] => type === DbType.Data ? [ + BankStateEvent, Computation, ComputationDependency, Contract, diff --git a/src/db/models/BankStateEvent.ts b/src/db/models/BankStateEvent.ts new file mode 100644 index 00000000..db212f5b --- /dev/null +++ b/src/db/models/BankStateEvent.ts @@ -0,0 +1,161 @@ +import { Op, WhereOptions } from 'sequelize' +import { AllowNull, Column, DataType, Table } from 'sequelize-typescript' + +import { Block, ComputationDependentKey, getDependentKey } from '@/core' + +import { DependendableEventModel, DependentKeyNamespace } from '../types' + +@Table({ + timestamps: true, + indexes: [ + // Only one event can happen to a denom for a given address at a given block + // height. This ensures events are not duplicated if they attempt exporting + // multiple times. + { + unique: true, + fields: ['blockHeight', 'address', 'denom'], + }, + { + // Speeds up queries finding first newer dependent key to validate a + // computation. + fields: ['denom'], + }, + { + // Speed up ordering queries. + fields: ['blockHeight'], + }, + { + // Speed up ordering queries. + fields: ['blockTimeUnixMs'], + }, + ], +}) +export class BankStateEvent extends DependendableEventModel { + @AllowNull(false) + @Column(DataType.TEXT) + address!: string + + @AllowNull(false) + @Column(DataType.BIGINT) + blockHeight!: string + + @AllowNull(false) + @Column(DataType.BIGINT) + blockTimeUnixMs!: string + + @AllowNull(false) + @Column(DataType.DATE) + blockTimestamp!: Date + + @AllowNull(false) + @Column(DataType.TEXT) + denom!: string + + @AllowNull(false) + @Column(DataType.BIGINT) + balance!: string + + get block(): Block { + return { + height: BigInt(this.blockHeight), + timeUnixMs: BigInt(this.blockTimeUnixMs), + } + } + + get dependentKey(): string { + return getDependentKey( + BankStateEvent.dependentKeyNamespace, + this.address, + this.denom + ) + } + + // Get the previous event for this denom. If this is the first event for this + // denom, return null. Cache the result so it can be reused since this + // shouldn't change. + previousEvent?: BankStateEvent | null + async getPreviousEvent(cache = true): Promise { + if (this.previousEvent === undefined || !cache) { + this.previousEvent = await BankStateEvent.findOne({ + where: { + address: this.address, + denom: this.denom, + blockHeight: { + [Op.lt]: this.blockHeight, + }, + }, + order: [['blockHeight', 'DESC']], + }) + } + + return this.previousEvent + } + + static dependentKeyNamespace = DependentKeyNamespace.BankStateEvent + static blockHeightKey: string = 'blockHeight' + + // Returns a where clause that will match all events that are described by the + // dependent keys. + static getWhereClauseForDependentKeys( + dependentKeys: ComputationDependentKey[] + ): WhereOptions { + // Some keys (most likely those with wildcards) may not have an address. It + // is fine to group these together. + const dependentKeysByAddress = dependentKeys.reduce((acc, dependentKey) => { + // 1. Remove namespace from key. + let key = dependentKey.key.replace( + new RegExp(`^${this.dependentKeyNamespace}:`), + '' + ) + + // 2. Extract address from key. + // Dependent keys for any address start with "*:". + const address = key.startsWith('*:') ? '' : key.split(':')[0] + + key = key + // 3. Remove address from key. + .replace(new RegExp(`^${address || '\\*'}:`), '') + // 4. Replace wildcard symbol with LIKE wildcard for database query. + .replace(/\*/g, '%') + + return { + ...acc, + [address]: [ + ...(acc[address] ?? []), + { + key, + prefix: dependentKey.prefix, + }, + ], + } + }, {} as Record) + + return { + [Op.or]: Object.entries(dependentKeysByAddress).map(([address, keys]) => { + const exactKeys = keys + .filter(({ key, prefix }) => !prefix && !key.includes('%')) + .map(({ key }) => key) + const wildcardKeys = keys + .filter(({ key, prefix }) => prefix || key.includes('%')) + .map(({ key, prefix }) => key + (prefix ? '%' : '')) + + return { + // Only include if address is defined. + ...(address && { address }), + // Related logic in `makeComputationDependencyWhere` in + // `src/db/utils.ts`. + denom: { + [Op.or]: [ + // Exact matches. + ...(exactKeys.length > 0 ? [{ [Op.in]: exactKeys }] : []), + // Wildcards. May or may not be prefixes. + ...wildcardKeys.map((key) => ({ + [Op.like]: key, + })), + ], + }, + } + }), + } + } +} diff --git a/src/db/models/State.ts b/src/db/models/State.ts index 5be7703b..1e4a26e6 100644 --- a/src/db/models/State.ts +++ b/src/db/models/State.ts @@ -44,6 +44,10 @@ export class State extends Model { @Column(DataType.BIGINT) lastWasmBlockHeightExported!: string | null + @AllowNull + @Column(DataType.BIGINT) + lastBankBlockHeightExported!: string | null + get latestBlock(): Block { return { height: BigInt(this.latestBlockHeight), @@ -72,6 +76,7 @@ export class State extends Model { latestBlockTimeUnixMs: 0n, lastStakingBlockHeightExported: 0n, lastWasmBlockHeightExported: 0n, + lastBankBlockHeightExported: 0n, }) } diff --git a/src/db/models/index.ts b/src/db/models/index.ts index 3ebbc27a..51e31b59 100644 --- a/src/db/models/index.ts +++ b/src/db/models/index.ts @@ -6,6 +6,7 @@ export * from './AccountWebhook' export * from './AccountWebhookEvent' export * from './AccountWebhookEventAttempt' export * from './AccountWebhookCodeIdSet' +export * from './BankStateEvent' export * from './Computation' export * from './ComputationDependency' export * from './Contract' diff --git a/src/db/types.ts b/src/db/types.ts index 0920f672..da8b3349 100644 --- a/src/db/types.ts +++ b/src/db/types.ts @@ -12,6 +12,7 @@ export enum DependentKeyNamespace { WasmStateEventTransformation = 'wasm_state_transformation', WasmTxEvent = 'wasm_tx', StakingSlash = 'staking_slash', + BankStateEvent = 'bank_state', } // Interface that event models must implement to be depended on by computations. diff --git a/src/db/utils.ts b/src/db/utils.ts index d8373371..59193ccf 100644 --- a/src/db/utils.ts +++ b/src/db/utils.ts @@ -5,6 +5,7 @@ import { bigIntMax, bigIntMin } from '@/core/utils' import { loadDb } from './connection' import { + BankStateEvent, Computation, ComputationDependency, StakingSlashEvent, @@ -259,6 +260,7 @@ export const getDependableEventModels = WasmStateEventTransformation, WasmTxEvent, StakingSlashEvent, + BankStateEvent, ] // Get the dependable event model for a given key based on its namespace. diff --git a/src/scripts/export/handlers/bank.ts b/src/scripts/export/handlers/bank.ts new file mode 100644 index 00000000..408e96d8 --- /dev/null +++ b/src/scripts/export/handlers/bank.ts @@ -0,0 +1,220 @@ +import { fromBase64, fromUtf8, toBech32 } from '@cosmjs/encoding' +import retry from 'async-await-retry' +import { Sequelize } from 'sequelize' + +import { ParsedBankStateEvent, objectMatchesStructure } from '@/core' +import { + BankStateEvent, + State, + updateComputationValidityDependentOnChanges, +} from '@/db' + +import { Handler, HandlerMaker } from '../types' + +const STORE_NAME = 'bank' +const MAX_BATCH_SIZE = 5000 + +export const bank: HandlerMaker = async ({ + config, + dontUpdateComputations, + getBlockTimeUnixMs, +}) => { + const pending: ParsedBankStateEvent[] = [] + + const flush = async () => { + if (pending.length === 0) { + return + } + + // For state events with the same blockHeight, address, and denom, only keep + // the last event. This is because the indexer guarantees that events are + // emitted in order, and the last event is the most up-to-date. Multiple + // events may occur if the value is updated multiple times across different + // messages. The indexer can only maintain uniqueness within a message and + // its submessages, but different messages in the same block can write to + // the same key, and the indexer emits all the messages. + const uniqueIndexerEvents = pending.reduce((acc, event) => { + const key = event.blockHeight + event.address + event.denom + acc[key] = event + return acc + }, {} as Record) + const eventsToExport = Object.values(uniqueIndexerEvents) + + // Clear queue. + pending.length = 0 + + // Export events. + await exporter(eventsToExport) + } + + let lastBlockHeightSeen = 0 + let debouncedFlush: NodeJS.Timeout | undefined + + const handle: Handler['handle'] = async (trace) => { + // BalancesPrefix = 0x02 + // bank keys are formatted as: + // BalancesPrefix || len(addressBytes) || addressBytes || denomBytes + + const keyData = fromBase64(trace.key) + if (keyData[0] !== 0x02) { + return + } + + const length = keyData[1] + + const address = toBech32(config.bech32Prefix, keyData.slice(2, 2 + length)) + const denom = fromUtf8(keyData.slice(2 + length)) + + // If we reached the first event of the next block, flush the previous + // events to the DB. This ensures we batch all events from the same block + // together. + if (trace.metadata.blockHeight > lastBlockHeightSeen) { + await flush() + } + + // Get code ID and block timestamp from chain. + const blockHeight = BigInt(trace.metadata.blockHeight).toString() + const blockTimeUnixMsNum = await getBlockTimeUnixMs( + trace.metadata.blockHeight, + trace + ) + const blockTimeUnixMs = BigInt(blockTimeUnixMsNum).toString() + const blockTimestamp = new Date(blockTimeUnixMsNum) + + // Mimics behavior of `UnmarshalBalanceCompat` in `x/bank/keeper/view.go` to + // decode balance. + + let balance = '0' + // If write operation, balance is updated. Otherwise (delete), balance is 0. + if (trace.operation === 'write') { + let value + // Decode base64-encoded JSON. + try { + value = trace.value && JSON.parse(fromUtf8(fromBase64(trace.value))) + } catch { + // Ignore decoding errors. + return + } + + // If legacy format, extract balance. + if ( + objectMatchesStructure(value, { + denom: {}, + amount: {}, + }) + ) { + balance = BigInt(value.amount).toString() + // Otherwise it should be a number. + } else if (typeof value === 'number') { + balance = BigInt(value).toString() + } else { + // This should never happen. + return + } + } + + pending.push({ + address, + blockHeight, + blockTimeUnixMs, + blockTimestamp, + denom, + balance, + }) + lastBlockHeightSeen = trace.metadata.blockHeight + + // Debounce flush in 200ms. + if (debouncedFlush !== undefined) { + clearTimeout(debouncedFlush) + } + + // If batch size reached, flush immediately. + if (pending.length >= MAX_BATCH_SIZE) { + debouncedFlush = undefined + await flush() + return + } else { + debouncedFlush = setTimeout(flush, 200) + } + + return + } + + const exporter = async (parsedEvents: ParsedBankStateEvent[]) => { + const start = Date.now() + + const exportEvents = async () => + // Unique index on [blockHeight, address, denom] ensures that we don't + // insert duplicate events. If we encounter a duplicate, we update the + // `balance` field in case event processing for a block was batched + // separately. + parsedEvents.length > 0 + ? await BankStateEvent.bulkCreate(parsedEvents, { + updateOnDuplicate: ['balance'], + }) + : [] + + // Retry 3 times with exponential backoff starting at 100ms delay. + const events = (await retry(exportEvents, [], { + retriesMax: 3, + exponential: true, + interval: 100, + })) as BankStateEvent[] + + let computationsUpdated = 0 + let computationsDestroyed = 0 + if (!dontUpdateComputations) { + const computationUpdates = + await updateComputationValidityDependentOnChanges(events) + computationsUpdated = computationUpdates.updated + computationsDestroyed = computationUpdates.destroyed + } + + // Store last block height exported, and update latest block + // height/time if the last export is newer. + const lastBlockHeightExported = events[events.length - 1].blockHeight + const lastBlockTimeUnixMsExported = + events[events.length - 1].blockTimeUnixMs + await State.update( + { + lastBankBlockHeightExported: Sequelize.fn( + 'GREATEST', + Sequelize.col('lastBankBlockHeightExported'), + lastBlockHeightExported + ), + + latestBlockHeight: Sequelize.fn( + 'GREATEST', + Sequelize.col('latestBlockHeight'), + lastBlockHeightExported + ), + latestBlockTimeUnixMs: Sequelize.fn( + 'GREATEST', + Sequelize.col('latestBlockTimeUnixMs'), + lastBlockTimeUnixMsExported + ), + }, + { + where: { + singleton: true, + }, + } + ) + + const end = Date.now() + const duration = end - start + + // Log. + console.log( + `[bank] Exported: ${events.length.toLocaleString()}. Block: ${BigInt( + lastBlockHeightExported + ).toLocaleString()}. Computations updated/destroyed: ${computationsUpdated.toLocaleString()}/${computationsDestroyed.toLocaleString()}. Duration: ${duration.toLocaleString()}ms.` + ) + } + + return { + storeName: STORE_NAME, + handle, + flush, + } +} diff --git a/src/scripts/export/handlers/index.ts b/src/scripts/export/handlers/index.ts index 8ad28433..a7dd5f41 100644 --- a/src/scripts/export/handlers/index.ts +++ b/src/scripts/export/handlers/index.ts @@ -1,6 +1,8 @@ import { HandlerMaker } from '../types' +import { bank } from './bank' import { wasm } from './wasm' export const handlerMakers: Record = { + bank, wasm, } diff --git a/src/scripts/export/handlers/wasm.ts b/src/scripts/export/handlers/wasm.ts index d3166001..671da2b4 100644 --- a/src/scripts/export/handlers/wasm.ts +++ b/src/scripts/export/handlers/wasm.ts @@ -17,17 +17,18 @@ import { } from '@/db' import { updateIndexesForContracts } from '@/ms' -import { Handler, HandlerMaker, TracedEvent } from '../types' +import { Handler, HandlerMaker } from '../types' +const STORE_NAME = 'wasm' const MAX_BATCH_SIZE = 5000 const CONTRACT_BYTE_LENGTH = 32 export const wasm: HandlerMaker = async ({ - cosmWasmClient, config, - blockHeightToTimeCache, dontUpdateComputations, dontSendWebhooks, + cosmWasmClient, + getBlockTimeUnixMs, }) => { const chainId = await cosmWasmClient.getChainId() const pending: ParsedWasmStateEvent[] = [] @@ -74,13 +75,13 @@ export const wasm: HandlerMaker = async ({ const keyData = fromBase64(trace.key) if (keyData[0] !== 0x02 && keyData[0] !== 0x03) { - return false + return } // Ignore keys that are too short to be a wasm key. Needs at least one more // than the contract byte length for the prefix. if (keyData.length < CONTRACT_BYTE_LENGTH + 1) { - return false + return } const contractAddress = toBech32( @@ -114,12 +115,12 @@ export const wasm: HandlerMaker = async ({ contractInfo = ContractInfo.decode(protobufContractInfo) } catch { // If failed to decode, not contract info. - return false + return } if (!contractInfo.codeId) { // If no code ID found in JSON, ignore. - return false + return } const blockHeightFromContractInfo = @@ -154,7 +155,7 @@ export const wasm: HandlerMaker = async ({ }) } - return true + return } // Otherwise, save state event. @@ -205,12 +206,12 @@ export const wasm: HandlerMaker = async ({ if (pending.length >= MAX_BATCH_SIZE) { debouncedFlush = undefined await flush() - return true + return } else { debouncedFlush = setTimeout(flush, 200) } - return true + return } const exporter = async (parsedEvents: ParsedWasmStateEvent[]) => { @@ -496,60 +497,8 @@ export const wasm: HandlerMaker = async ({ return codeIdCache.get(contractAddress) ?? 0 } - // Get block time for height, cached in memory. - const getBlockTimeUnixMs = async ( - blockHeight: number, - trace: TracedEvent - ): Promise => { - if (blockHeightToTimeCache.has(blockHeight)) { - return blockHeightToTimeCache.get(blockHeight) ?? 0 - } - - const loadIntoCache = async () => { - const { - header: { time }, - } = await cosmWasmClient.getBlock(blockHeight) - blockHeightToTimeCache.set(blockHeight, Date.parse(time)) - } - - try { - // Retry 3 times with exponential backoff starting at 150ms delay. - await retry(loadIntoCache, [], { - retriesMax: 3, - exponential: true, - interval: 150, - }) - } catch (err) { - console.error( - '-------\nFailed to get block:\n', - err instanceof Error ? err.message : err, - '\nBlock height: ' + - BigInt(blockHeight).toLocaleString() + - '\nData: ' + - JSON.stringify(trace, null, 2) + - '\n-------' - ) - Sentry.captureException(err, { - tags: { - type: 'failed-get-block', - script: 'export', - handler: 'wasm', - chainId, - }, - extra: { - trace, - blockHeight, - }, - }) - - // Set to 0 on failure so we can continue. - blockHeightToTimeCache.set(blockHeight, 0) - } - - return blockHeightToTimeCache.get(blockHeight) ?? 0 - } - return { + storeName: STORE_NAME, handle, flush, } diff --git a/src/scripts/export/index.ts b/src/scripts/export/index.ts index f8c86873..e78e7350 100644 --- a/src/scripts/export/index.ts +++ b/src/scripts/export/index.ts @@ -164,12 +164,27 @@ const trace = async () => { metadata: { blockHeight: {}, txHash: {}, + store_name: {}, }, }) ) { return } + if (!tracedEvent.metadata.store_name) { + console.error('Found trace event missing store name.') + Sentry.captureMessage('Found trace event missing store name', { + tags: { + type: 'trace-missing-store-name', + script: 'export', + }, + extra: { + trace: tracedEvent, + }, + }) + return + } + // Only handle writes and deletes. if ( tracedEvent.operation !== 'write' && diff --git a/src/scripts/export/types.ts b/src/scripts/export/types.ts index f2c213d3..d4130d50 100644 --- a/src/scripts/export/types.ts +++ b/src/scripts/export/types.ts @@ -1,24 +1,25 @@ import { CosmWasmClient } from '@cosmjs/cosmwasm-stargate' -import { LRUCache } from 'lru-cache' import { Config } from '@/core' export type Handler = { - // The function that will be called for each trace in the trace file. If the - // trace was successfully handled, return true. Otherwise, return false. - handle: (trace: TracedEvent) => Promise + // What store name to filter by for events to handle. + storeName: string + // The function that will be called for each trace in the trace file. + handle: (trace: TracedEvent) => Promise // The function that will be called after reading the entire trace file. flush: () => Promise } export type HandlerMakerOptions = { - cosmWasmClient: CosmWasmClient config: Config - // Map block height to time. Populated with block heights from WebSocket's - // NewBlock event as soon as it occurs, which is before any state writes. - blockHeightToTimeCache: LRUCache dontUpdateComputations: boolean dontSendWebhooks: boolean + cosmWasmClient: CosmWasmClient + getBlockTimeUnixMs: ( + blockHeight: number, + trace: TracedEvent + ) => Promise } export type HandlerMaker = (options: HandlerMakerOptions) => Promise @@ -30,6 +31,7 @@ export type TracedEvent = { metadata: { blockHeight: number txHash: string + store_name: string } } diff --git a/src/scripts/export/worker.ts b/src/scripts/export/worker.ts index 632656ca..c22ddf18 100644 --- a/src/scripts/export/worker.ts +++ b/src/scripts/export/worker.ts @@ -11,7 +11,12 @@ import { DbType } from '@/core' import { State, loadDb } from '@/db' import { handlerMakers } from './handlers' -import { FromWorkerMessage, ToWorkerMessage, WorkerInitData } from './types' +import { + FromWorkerMessage, + ToWorkerMessage, + TracedEvent, + WorkerInitData, +} from './types' import { setUpWebSocketNewBlockListener } from './utils' const main = async () => { @@ -45,19 +50,71 @@ const main = async () => { // @ts-ignore const cosmWasmClient = new CosmWasmClient(tmClient) - // Setup handlers. + // Helper function that gets block time for height, cached in memory, which is + // filled in by the NewBlock WebSocket listener. const blockHeightToTimeCache = new LRUCache({ max: 100, }) + const getBlockTimeUnixMs = async ( + blockHeight: number, + trace: TracedEvent + ): Promise => { + if (blockHeightToTimeCache.has(blockHeight)) { + return blockHeightToTimeCache.get(blockHeight) ?? 0 + } + + const loadIntoCache = async () => { + const { + header: { time }, + } = await cosmWasmClient.getBlock(blockHeight) + blockHeightToTimeCache.set(blockHeight, Date.parse(time)) + } + + try { + // Retry 3 times with exponential backoff starting at 150ms delay. + await retry(loadIntoCache, [], { + retriesMax: 3, + exponential: true, + interval: 150, + }) + } catch (err) { + console.error( + '-------\nFailed to get block:\n', + err instanceof Error ? err.message : err, + '\nBlock height: ' + + BigInt(blockHeight).toLocaleString() + + '\nData: ' + + JSON.stringify(trace, null, 2) + + '\n-------' + ) + Sentry.captureException(err, { + tags: { + type: 'failed-get-block', + script: 'export', + }, + extra: { + trace, + blockHeight, + }, + }) + + // Set to 0 on failure so we can continue. + blockHeightToTimeCache.set(blockHeight, 0) + } + + return blockHeightToTimeCache.get(blockHeight) ?? 0 + } + + // Setup handlers. const handlers = await Promise.all( Object.entries(handlerMakers).map(async ([name, handlerMaker]) => ({ name, handler: await handlerMaker({ - blockHeightToTimeCache, - cosmWasmClient, config, dontUpdateComputations: !update, dontSendWebhooks: !webhooks, + cosmWasmClient, + getBlockTimeUnixMs, }), })) ) @@ -197,20 +254,23 @@ const main = async () => { // Handle event after previous event is handled. queueHandler = queueHandler.then(async () => { - // Try to handle with each module, and stop once handled. - for (const { name, handler } of handlers) { + // Try to handle with each module. + for (const { + name, + handler: { storeName, handle }, + } of handlers) { + // Filter by handler store. + if (storeName !== tracedEvent.metadata.store_name) { + continue + } + try { // Retry 3 times with exponential backoff starting at 100ms delay. - const handled = await retry(handler.handle, [tracedEvent], { + await retry(handle, [tracedEvent], { retriesMax: 3, exponential: true, interval: 100, }) - - // If handled, don't try other handlers. - if (handled) { - break - } } catch (err) { console.error( '-------\nFailed to handle:\n', diff --git a/src/server/routes/indexer/getStatus.ts b/src/server/routes/indexer/getStatus.ts index 4721447e..af1d15cd 100644 --- a/src/server/routes/indexer/getStatus.ts +++ b/src/server/routes/indexer/getStatus.ts @@ -10,6 +10,7 @@ type GetStatusResponse = latestBlock: SerializedBlock lastStakingBlockHeightExported: string | null lastWasmBlockHeightExported: string | null + lastBankBlockHeightExported: string | null } | { error: string @@ -36,5 +37,7 @@ export const getStatus: Router.Middleware< state.lastStakingBlockHeightExported?.toString() || null, lastWasmBlockHeightExported: state.lastWasmBlockHeightExported?.toString() || null, + lastBankBlockHeightExported: + state.lastBankBlockHeightExported?.toString() || null, } } diff --git a/src/server/test/indexer/getStatus.test.ts b/src/server/test/indexer/getStatus.test.ts index 530f69b0..28163dc9 100644 --- a/src/server/test/indexer/getStatus.test.ts +++ b/src/server/test/indexer/getStatus.test.ts @@ -20,6 +20,8 @@ describe('GET /status', () => { state!.lastStakingBlockHeightExported?.toString() || null, lastWasmBlockHeightExported: state!.lastWasmBlockHeightExported?.toString() || null, + lastBankBlockHeightExported: + state!.lastBankBlockHeightExported?.toString() || null, }) }) }) diff --git a/state-dump/dump.go b/state-dump/dump.go index 359ff072..e2f05af6 100644 --- a/state-dump/dump.go +++ b/state-dump/dump.go @@ -21,6 +21,9 @@ type ( Metadata struct { BlockHeight int64 `json:"blockHeight"` TxHash string `json:"txHash"` + // Snake case matches `storeNameCtxKey` in `store/cachemulti/store.go` in + // the Cosmos SDK. + StoreName string `json:"store_name"` } // traceOperation implements a traced KVStore operation @@ -33,23 +36,25 @@ type ( ) var ( + BalancesPrefix = []byte{0x02} ContractKeyPrefix = []byte{0x02} ContractStorePrefix = []byte{0x03} ) func main() { args := os.Args - if len(args) < 3 { - fmt.Println("Usage: dump [address]") + if len(args) < 4 { + fmt.Println("Usage: dump [address]") os.Exit(1) } home_dir := args[1] output := args[2] + storeName := args[3] var addressBech32Data []byte - if len(args) > 3 { - _, bech32Data, err := bech32.DecodeToBase256(args[3]) + if len(args) > 4 { + _, bech32Data, err := bech32.DecodeToBase256(args[4]) if err != nil { panic(err) } @@ -72,16 +77,16 @@ func main() { latestHeight := rootmulti.GetLatestVersion(db) fmt.Printf("Latest height: %d\n", latestHeight) - wasmKey := types.NewKVStoreKey("wasm") + storeKey := types.NewKVStoreKey(storeName) ms := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) - ms.MountStoreWithDB(wasmKey, types.StoreTypeIAVL, nil) + ms.MountStoreWithDB(storeKey, types.StoreTypeIAVL, nil) err = ms.LoadLatestVersion() if err != nil { panic(err) } - store := ms.GetCommitKVStore(wasmKey) + store := ms.GetCommitKVStore(storeKey) if store == nil { panic("Store is nil") } @@ -112,6 +117,7 @@ func main() { Metadata: Metadata{ BlockHeight: latestHeight, TxHash: "", + StoreName: storeName, }, }