diff --git a/indexer/abci.go b/indexer/abci.go index 9a2e723..e9229c7 100644 --- a/indexer/abci.go +++ b/indexer/abci.go @@ -18,7 +18,6 @@ import ( rpctypes "github.com/initia-labs/minievm/jsonrpc/types" "github.com/initia-labs/minievm/x/evm/keeper" - "github.com/initia-labs/minievm/x/evm/types" ) func (e *EVMIndexerImpl) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*storetypes.StoreKVPair) error { @@ -111,7 +110,6 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque receipts = append(receipts, &receipt) } - chainId := types.ConvertCosmosChainIDToEthereumChainID(sdkCtx.ChainID()) blockGasMeter := sdkCtx.BlockGasMeter() blockHeight := sdkCtx.BlockHeight() @@ -145,7 +143,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque receipt := receipts[idx] // store tx - rpcTx := rpctypes.NewRPCTransaction(ethTx, blockHash, uint64(blockHeight), uint64(receipt.TransactionIndex), chainId) + rpcTx := rpctypes.NewRPCTransaction(ethTx, blockHash, uint64(blockHeight), uint64(receipt.TransactionIndex), ethTx.ChainId()) if err := e.TxMap.Set(sdkCtx, txHash.Bytes(), *rpcTx); err != nil { e.logger.Error("failed to store rpcTx", "err", err) return err diff --git a/indexer/indexer.go b/indexer/indexer.go index c63f081..e904e4f 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -52,7 +52,7 @@ type EVMIndexer interface { // mempool MempoolWrapper(mempool mempool.Mempool) mempool.Mempool - TxInMempool(hash common.Hash) bool + TxInMempool(hash common.Hash) *rpctypes.RPCTransaction } // EVMIndexerImpl implements EVMIndexer. @@ -80,7 +80,7 @@ type EVMIndexerImpl struct { pendingChans []chan *rpctypes.RPCTransaction // txPendingMap is a map to store tx hashes in pending state. - txPendingMap *ttlcache.Cache[common.Hash, bool] + txPendingMap *ttlcache.Cache[common.Hash, *rpctypes.RPCTransaction] } func NewEVMIndexer( @@ -128,7 +128,7 @@ func NewEVMIndexer( // Use ttlcache to cope with abnormal cases like tx not included in a block txPendingMap: ttlcache.New( // pending tx lifetime is 1 minute in indexer - ttlcache.WithTTL[common.Hash, bool](time.Minute), + ttlcache.WithTTL[common.Hash, *rpctypes.RPCTransaction](time.Minute), ), } diff --git a/indexer/mempool.go b/indexer/mempool.go index 6917097..644582e 100644 --- a/indexer/mempool.go +++ b/indexer/mempool.go @@ -11,7 +11,6 @@ import ( rpctypes "github.com/initia-labs/minievm/jsonrpc/types" evmkeeper "github.com/initia-labs/minievm/x/evm/keeper" - evmtypes "github.com/initia-labs/minievm/x/evm/types" ) var _ mempool.Mempool = (*MempoolWrapper)(nil) @@ -27,8 +26,13 @@ func (indexer *EVMIndexerImpl) MempoolWrapper(mempool mempool.Mempool) mempool.M } // TxInMempool returns true if the transaction with the given hash is in the mempool. -func (indexer *EVMIndexerImpl) TxInMempool(hash common.Hash) bool { - return indexer.txPendingMap.Has(hash) +func (indexer *EVMIndexerImpl) TxInMempool(hash common.Hash) *rpctypes.RPCTransaction { + item := indexer.txPendingMap.Get(hash) + if item == nil { + return nil + } + + return item.Value() } // CountTx implements mempool.Mempool. @@ -46,14 +50,11 @@ func (m *MempoolWrapper) Insert(ctx context.Context, tx sdk.Tx) error { } if ethTx != nil { - sdkCtx := sdk.UnwrapSDKContext(ctx) - chainId := evmtypes.ConvertCosmosChainIDToEthereumChainID(sdkCtx.ChainID()) - rpcTx := rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, chainId) - ethTxHash := ethTx.Hash() + rpcTx := rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId()) m.indexer.logger.Debug("inserting tx into mempool", "pending len", m.indexer.txPendingMap.Len(), "ethTxHash", ethTxHash) - m.indexer.txPendingMap.Set(ethTxHash, true, ttlcache.DefaultTTL) + m.indexer.txPendingMap.Set(ethTxHash, rpcTx, ttlcache.DefaultTTL) go func() { // emit the transaction to all pending channels diff --git a/jsonrpc/backend/tx.go b/jsonrpc/backend/tx.go index 5902466..e0dff9b 100644 --- a/jsonrpc/backend/tx.go +++ b/jsonrpc/backend/tx.go @@ -87,7 +87,7 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error { cacheKey := fmt.Sprintf("%s-%d", senderHex, txSeq) txHash := tx.Hash() - b.queuedTxHashes.Store(txHash, true) + b.queuedTxHashes.Store(txHash, cacheKey) _ = b.queuedTxs.Add(cacheKey, txQueueItem{hash: txHash, bytes: txBytes, body: tx, sender: senderHex}) // check if there are queued txs which can be sent @@ -133,7 +133,12 @@ func (b *JSONRPCBackend) getQueryCtxWithHeight(height uint64) (context.Context, // GetTransactionByHash returns the transaction with the given hash. func (b *JSONRPCBackend) GetTransactionByHash(hash common.Hash) (*rpctypes.RPCTransaction, error) { - return b.getTransaction(hash) + rpcTx, err := b.getTransaction(hash) + if rpcTx == nil { + return nil, err + } + + return rpcTx, nil } // GetTransactionCount returns the number of transactions at the given block number. @@ -181,21 +186,23 @@ func (b *JSONRPCBackend) GetTransactionCount(address common.Address, blockNrOrHa // GetTransactionReceipt returns the transaction receipt for the given transaction hash. func (b *JSONRPCBackend) GetTransactionReceipt(hash common.Hash) (map[string]interface{}, error) { - receipt, err := b.getReceipt(hash) - if err != nil { - return nil, err - } else if receipt == nil { - return nil, nil + rpcTx, err := b.getTransaction(hash) + if rpcTx == nil && err == nil { + return nil, nil // tx is not found + } else if rpcTx != nil && err != nil { + return nil, NewTxIndexingError() // tx is not fully indexed + } else if err != nil { + return nil, err // just error case } - tx, err := b.getTransaction(hash) + receipt, err := b.getReceipt(hash) if err != nil { return nil, err - } else if tx == nil { + } else if receipt == nil { return nil, nil } - return marshalReceipt(receipt, tx), nil + return marshalReceipt(receipt, rpcTx), nil } // GetTransactionByBlockHashAndIndex returns the transaction at the given block hash and index. @@ -270,13 +277,8 @@ func (b *JSONRPCBackend) GetBlockTransactionCountByNumber(blockNum rpc.BlockNumb // GetRawTransactionByHash returns the bytes of the transaction for the given hash. func (b *JSONRPCBackend) GetRawTransactionByHash(hash common.Hash) (hexutil.Bytes, error) { rpcTx, err := b.getTransaction(hash) - if err != nil && errors.Is(err, collections.ErrNotFound) { - return nil, nil - } else if err != nil { - b.logger.Error("failed to get raw transaction by hash", "err", err) - return nil, NewTxIndexingError() - } else if rpcTx == nil { - return nil, nil + if rpcTx == nil { + return nil, err } return rpcTx.ToTransaction().MarshalBinary() @@ -295,11 +297,6 @@ func (b *JSONRPCBackend) GetRawTransactionByBlockHashAndIndex(blockHash common.H } func (b *JSONRPCBackend) PendingTransactions() ([]*rpctypes.RPCTransaction, error) { - chainID, err := b.ChainID() - if err != nil { - return nil, err - } - queryCtx, err := b.getQueryCtx() if err != nil { return nil, err @@ -330,7 +327,7 @@ func (b *JSONRPCBackend) PendingTransactions() ([]*rpctypes.RPCTransaction, erro if ethTx != nil { result = append( result, - rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, chainID), + rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId()), ) } } @@ -369,18 +366,32 @@ func (b *JSONRPCBackend) GetBlockReceipts(ctx context.Context, blockNrOrHash rpc return result, nil } +// getTransaction retrieves the lookup along with the transaction itself associate +// with the given transaction hash. +// +// An error will be returned if the transaction is not found, and background +// indexing for transactions is still in progress. The error is used to indicate the +// scenario explicitly that the transaction might be reachable shortly. +// +// A null will be returned in the transaction is not found and background transaction +// indexing is already finished. The transaction is not existent from the perspective +// of node. func (b *JSONRPCBackend) getTransaction(hash common.Hash) (*rpctypes.RPCTransaction, error) { if tx, ok := b.txLookupCache.Get(hash); ok { return tx, nil } // check if the transaction is in the queued txs - if _, ok := b.queuedTxHashes.Load(hash); ok { - return nil, NewTxIndexingError() + if cacheKey, ok := b.queuedTxHashes.Load(hash); ok { + if cacheItem, ok := b.queuedTxs.Get(cacheKey.(string)); ok { + rpcTx := rpctypes.NewRPCTransaction(cacheItem.body, common.Hash{}, 0, 0, cacheItem.body.ChainId()) + return rpcTx, NewTxIndexingError() + } } + // check if the transaction is in the pending txs - if ok := b.app.EVMIndexer().TxInMempool(hash); ok { - return nil, NewTxIndexingError() + if tx := b.app.EVMIndexer().TxInMempool(hash); tx != nil { + return tx, NewTxIndexingError() } queryCtx, err := b.getQueryCtx() diff --git a/jsonrpc/backend/txpool.go b/jsonrpc/backend/txpool.go index 746b006..a5c6619 100644 --- a/jsonrpc/backend/txpool.go +++ b/jsonrpc/backend/txpool.go @@ -28,11 +28,6 @@ func (b *JSONRPCBackend) TxPoolContent() (map[string]map[string]map[string]*rpct return nil, err } - chainID, err := b.ChainID() - if err != nil { - return nil, err - } - txUtils := keeper.NewTxUtils(b.app.EVMKeeper) for _, tx := range pending.Txs { cosmosTx, err := b.app.TxDecode(tx) @@ -54,7 +49,7 @@ func (b *JSONRPCBackend) TxPoolContent() (map[string]map[string]map[string]*rpct content["pending"][account.Hex()] = dump } - dump[fmt.Sprintf("%d", ethTx.Nonce())] = rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, chainID) + dump[fmt.Sprintf("%d", ethTx.Nonce())] = rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId()) } // load queued txs @@ -72,7 +67,7 @@ func (b *JSONRPCBackend) TxPoolContent() (map[string]map[string]map[string]*rpct content["queued"][sender] = dump } - dump[fmt.Sprintf("%d", ethTx.Nonce())] = rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, chainID) + dump[fmt.Sprintf("%d", ethTx.Nonce())] = rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId()) } return content, nil