Skip to content

Commit

Permalink
Merge pull request #636 from onflow/janez/fixes
Browse files Browse the repository at this point in the history
Refactor  event subscriber
  • Loading branch information
janezpodhostnik committed Nov 12, 2024
2 parents e95b413 + a87e370 commit da24a0d
Show file tree
Hide file tree
Showing 70 changed files with 4,083 additions and 2,823 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @janezpodhostnik @peterargue @m-Peter @zhangchiqing @ramtinms
* @janezpodhostnik @peterargue @m-Peter @zhangchiqing
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ generate:
mockery --dir=storage --name=BlockIndexer --output=storage/mocks
mockery --dir=storage --name=ReceiptIndexer --output=storage/mocks
mockery --dir=storage --name=TransactionIndexer --output=storage/mocks
mockery --dir=storage --name=AccountIndexer --output=storage/mocks
mockery --dir=storage --name=TraceIndexer --output=storage/mocks
mockery --all --dir=services/traces --output=services/traces/mocks
mockery --all --dir=services/ingestion --output=services/ingestion/mocks
mockery --dir=models --name=Engine --output=models/mocks

Expand Down
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,10 @@ The application can be configured using the following flags at runtime:
| `stream-limit` | `10` | Rate-limit for client events sent per second |
| `rate-limit` | `50` | Requests per second limit for clients over any protocol (ws/http) |
| `address-header` | `""` | Header for client IP when server is behind a proxy |
| `heartbeat-interval` | `100` | Interval for AN event subscription heartbeats |
| `stream-timeout` | `3` | Timeout in seconds for sending events to clients |
| `force-start-height` | `0` | Force-set starting Cadence height (local/testing use only) |
| `wallet-api-key` | `""` | ECDSA private key for wallet APIs (local/testing use only) |
| `filter-expiry` | `5m` | Expiry time for idle filters |
| `traces-gcp-bucket` | `""` | GCP bucket name for transaction traces |
| `traces-backfill-start-height` | `0` | Start height for backfilling transaction traces |
| `traces-backfill-end-height` | `0` | End height for backfilling transaction traces |
| `index-only` | `false` | Run in index-only mode, allowing state queries and indexing but no transaction sending |
| `profiler-enabled` | `false` | Enable the pprof profiler server |
| `profiler-host` | `localhost` | Host for the pprof profiler |
Expand Down
153 changes: 29 additions & 124 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ package api

import (
"context"
_ "embed"
"encoding/hex"
"errors"
"fmt"
"math/big"
"strings"

"github.com/onflow/go-ethereum/common"
"github.com/onflow/go-ethereum/common/hexutil"
Expand All @@ -30,8 +26,12 @@ import (
"github.com/onflow/flow-evm-gateway/storage"
)

const BlockGasLimit uint64 = 120_000_000

const maxFeeHistoryBlockCount = 1024

var baseFeesPerGas = big.NewInt(1)

// A map containing all the valid method names that are found
// in the Ethereum JSON-RPC API specification.
// Update accordingly if any new methods are added/removed.
Expand Down Expand Up @@ -74,6 +74,7 @@ var validMethods = map[string]struct{}{
"debug_traceTransaction": {},
"debug_traceBlockByNumber": {},
"debug_traceBlockByHash": {},
"debug_traceCall": {},

// web3 namespace
"web3_clientVersion": {},
Expand Down Expand Up @@ -153,7 +154,6 @@ type BlockChainAPI struct {
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
receipts storage.ReceiptIndexer
accounts storage.AccountIndexer
indexingResumedHeight uint64
limiter limiter.Store
collector metrics.Collector
Expand All @@ -166,7 +166,6 @@ func NewBlockChainAPI(
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
accounts storage.AccountIndexer,
ratelimiter limiter.Store,
collector metrics.Collector,
) (*BlockChainAPI, error) {
Expand All @@ -183,7 +182,6 @@ func NewBlockChainAPI(
blocks: blocks,
transactions: transactions,
receipts: receipts,
accounts: accounts,
indexingResumedHeight: indexingResumedHeight,
limiter: ratelimiter,
collector: collector,
Expand Down Expand Up @@ -280,12 +278,12 @@ func (b *BlockChainAPI) GetBalance(
return nil, err
}

evmHeight, err := b.getBlockNumber(&blockNumberOrHash)
height, err := resolveBlockTag(&blockNumberOrHash, b.blocks, b.logger)
if err != nil {
return handleError[*hexutil.Big](err, l, b.collector)
}

balance, err := b.evm.GetBalance(ctx, address, evmHeight)
balance, err := b.evm.GetBalance(address, height)
if err != nil {
return handleError[*hexutil.Big](err, l, b.collector)
}
Expand Down Expand Up @@ -516,21 +514,12 @@ func (b *BlockChainAPI) GetBlockReceipts(
return nil, err
}

var (
block *models.Block
err error
)
if blockNumberOrHash.BlockHash != nil {
block, err = b.blocks.GetByID(*blockNumberOrHash.BlockHash)
} else if blockNumberOrHash.BlockNumber != nil {
block, err = b.blocks.GetByHeight(uint64(blockNumberOrHash.BlockNumber.Int64()))
} else {
return handleError[[]map[string]interface{}](
fmt.Errorf("%w: block number or hash not provided", errs.ErrInvalid),
l,
b.collector,
)
height, err := resolveBlockTag(&blockNumberOrHash, b.blocks, b.logger)
if err != nil {
return handleError[[]map[string]interface{}](err, l, b.collector)
}

block, err := b.blocks.GetByHeight(height)
if err != nil {
return handleError[[]map[string]interface{}](err, l, b.collector)
}
Expand Down Expand Up @@ -642,7 +631,7 @@ func (b *BlockChainAPI) Call(
blockNumberOrHash = &latestBlockNumberOrHash
}

evmHeight, err := b.getBlockNumber(blockNumberOrHash)
height, err := resolveBlockTag(blockNumberOrHash, b.blocks, b.logger)
if err != nil {
return handleError[hexutil.Bytes](err, l, b.collector)
}
Expand All @@ -658,7 +647,7 @@ func (b *BlockChainAPI) Call(
from = *args.From
}

res, err := b.evm.Call(ctx, tx, from, evmHeight)
res, err := b.evm.Call(tx, from, height)
if err != nil {
return handleError[hexutil.Bytes](err, l, b.collector)
}
Expand Down Expand Up @@ -760,29 +749,16 @@ func (b *BlockChainAPI) GetTransactionCount(
return nil, err
}

evmHeight, err := b.getBlockNumber(&blockNumberOrHash)
height, err := resolveBlockTag(&blockNumberOrHash, b.blocks, b.logger)
if err != nil {
return handleError[*hexutil.Uint64](err, l, b.collector)
}

networkNonce, err := b.evm.GetNonce(ctx, address, evmHeight)
networkNonce, err := b.evm.GetNonce(address, height)
if err != nil {
return handleError[*hexutil.Uint64](err, l, b.collector)
}

nonce, err := b.accounts.GetNonce(address)
if err != nil {
return handleError[*hexutil.Uint64](errs.ErrInternal, l, b.collector)
}

// compare both until we gain confidence in db nonce tracking working correctly
if nonce != networkNonce {
l.Error().
Uint64("network-nonce", networkNonce).
Uint64("db-nonce", nonce).
Msg("network nonce does not equal db nonce")
}

return (*hexutil.Uint64)(&networkNonce), nil
}

Expand Down Expand Up @@ -813,7 +789,7 @@ func (b *BlockChainAPI) EstimateGas(

tx, err := encodeTxFromArgs(args)
if err != nil {
return hexutil.Uint64(blockGasLimit), nil // return block gas limit
return hexutil.Uint64(BlockGasLimit), nil // return block gas limit
}

// Default address in case user does not provide one
Expand All @@ -826,12 +802,12 @@ func (b *BlockChainAPI) EstimateGas(
blockNumberOrHash = &latestBlockNumberOrHash
}

evmHeight, err := b.getBlockNumber(blockNumberOrHash)
height, err := resolveBlockTag(blockNumberOrHash, b.blocks, b.logger)
if err != nil {
return handleError[hexutil.Uint64](err, l, b.collector)
}

estimatedGas, err := b.evm.EstimateGas(ctx, tx, from, evmHeight)
estimatedGas, err := b.evm.EstimateGas(tx, from, height)
if err != nil {
return handleError[hexutil.Uint64](err, l, b.collector)
}
Expand All @@ -855,12 +831,12 @@ func (b *BlockChainAPI) GetCode(
return nil, err
}

evmHeight, err := b.getBlockNumber(&blockNumberOrHash)
height, err := resolveBlockTag(&blockNumberOrHash, b.blocks, b.logger)
if err != nil {
return handleError[hexutil.Bytes](err, l, b.collector)
}

code, err := b.evm.GetCode(ctx, address, evmHeight)
code, err := b.evm.GetCode(address, height)
if err != nil {
return handleError[hexutil.Bytes](err, l, b.collector)
}
Expand Down Expand Up @@ -937,11 +913,11 @@ func (b *BlockChainAPI) FeeHistory(
oldestBlock = (*hexutil.Big)(big.NewInt(int64(block.Height)))
}

baseFees = append(baseFees, (*hexutil.Big)(big.NewInt(0)))
baseFees = append(baseFees, (*hexutil.Big)(baseFeesPerGas))

rewards = append(rewards, blockRewards)

gasUsedRatio := float64(block.TotalGasUsed) / float64(blockGasLimit)
gasUsedRatio := float64(block.TotalGasUsed) / float64(BlockGasLimit)
gasUsedRatios = append(gasUsedRatios, gasUsedRatio)
}

Expand Down Expand Up @@ -971,7 +947,7 @@ func (b *BlockChainAPI) GetStorageAt(
return nil, err
}

key, _, err := decodeHash(storageSlot)
key, err := decodeHash(storageSlot)
if err != nil {
return handleError[hexutil.Bytes](
fmt.Errorf("%w: %w", errs.ErrInvalid, err),
Expand All @@ -980,12 +956,12 @@ func (b *BlockChainAPI) GetStorageAt(
)
}

evmHeight, err := b.getBlockNumber(&blockNumberOrHash)
height, err := resolveBlockTag(&blockNumberOrHash, b.blocks, b.logger)
if err != nil {
return handleError[hexutil.Bytes](err, l, b.collector)
}

result, err := b.evm.GetStorageAt(ctx, address, key, evmHeight)
result, err := b.evm.GetStorageAt(address, key, height)
if err != nil {
return handleError[hexutil.Bytes](err, l, b.collector)
}
Expand Down Expand Up @@ -1050,10 +1026,10 @@ func (b *BlockChainAPI) prepareBlockResponse(
TransactionsRoot: block.TransactionHashRoot,
Transactions: block.TransactionHashes,
Uncles: []common.Hash{},
GasLimit: hexutil.Uint64(blockGasLimit),
GasLimit: hexutil.Uint64(BlockGasLimit),
Nonce: types.BlockNonce{0x1},
Timestamp: hexutil.Uint64(block.Timestamp),
BaseFeePerGas: hexutil.Big(*big.NewInt(0)),
BaseFeePerGas: hexutil.Big(*baseFeesPerGas),
LogsBloom: types.LogsBloom([]*types.Log{}),
Miner: evmTypes.CoinbaseAddress.ToCommon(),
Sha3Uncles: types.EmptyUncleHash,
Expand Down Expand Up @@ -1095,76 +1071,6 @@ func (b *BlockChainAPI) prepareBlockResponse(
return blockResponse, nil
}

func (b *BlockChainAPI) getBlockNumber(blockNumberOrHash *rpc.BlockNumberOrHash) (int64, error) {
err := fmt.Errorf("%w: neither block number nor hash specified", errs.ErrInvalid)
if blockNumberOrHash == nil {
return 0, err
}
if number, ok := blockNumberOrHash.Number(); ok {
return number.Int64(), nil
}

if hash, ok := blockNumberOrHash.Hash(); ok {
evmHeight, err := b.blocks.GetHeightByID(hash)
if err != nil {
b.logger.Error().Err(err).Msg("failed to get block by hash")
return 0, err
}
return int64(evmHeight), nil
}

return 0, err
}

// handleError takes in an error and in case the error is of type ErrEntityNotFound
// it returns nil instead of an error since that is according to the API spec,
// if the error is not of type ErrEntityNotFound it will return the error and the generic
// empty type.
func handleError[T any](err error, log zerolog.Logger, collector metrics.Collector) (T, error) {
var (
zero T
revertedErr *errs.RevertError
)

switch {
// as per specification returning nil and nil for not found resources
case errors.Is(err, errs.ErrEntityNotFound):
return zero, nil
case errors.Is(err, errs.ErrInvalid):
return zero, err
case errors.Is(err, errs.ErrFailedTransaction):
return zero, err
case errors.As(err, &revertedErr):
return zero, revertedErr
default:
collector.ApiErrorOccurred()
log.Error().Err(err).Msg("api error")
return zero, errs.ErrInternal
}
}

// decodeHash parses a hex-encoded 32-byte hash. The input may optionally
// be prefixed by 0x and can have a byte length up to 32.
func decodeHash(s string) (h common.Hash, inputLength int, err error) {
if strings.HasPrefix(s, "0x") || strings.HasPrefix(s, "0X") {
s = s[2:]
}
if (len(s) & 1) > 0 {
s = "0" + s
}
b, err := hex.DecodeString(s)
if err != nil {
return common.Hash{}, 0, fmt.Errorf("invalid hex string: %s", s)
}
if len(b) > common.HashLength {
return common.Hash{}, len(b), fmt.Errorf(
"hex string too long, want at most 32 bytes, have %d bytes",
len(b),
)
}
return common.BytesToHash(b), len(b), nil
}

/*
Static responses section
Expand Down Expand Up @@ -1230,8 +1136,7 @@ func (b *BlockChainAPI) GetUncleByBlockNumberAndIndex(

// MaxPriorityFeePerGas returns a suggestion for a gas tip cap for dynamic fee transactions.
func (b *BlockChainAPI) MaxPriorityFeePerGas(ctx context.Context) (*hexutil.Big, error) {
fee := hexutil.Big(*big.NewInt(1))
return &fee, nil
return (*hexutil.Big)(b.config.GasPrice), nil
}

// Mining returns true if client is actively mining new blocks.
Expand Down
Loading

0 comments on commit da24a0d

Please sign in to comment.