Skip to content

Commit

Permalink
fix: add missing mutex lock. (#145)
Browse files Browse the repository at this point in the history
* fix: add missing mutex in query proof

* chore: add retry logic in wasm query method

* fix: add delay for pathprocessor sync

* fix: remove tp when getting client state

* fix: adding defer in queryWasmProof

* fix: change to processheight

* fix: limit max block fetch at a time in wasm processor

* chore: add height to event

---------

Co-authored-by: izyak <[email protected]>
  • Loading branch information
viveksharmapoudel and izyak authored Sep 1, 2023
1 parent bdf1cc2 commit cf3dc3e
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 16 deletions.
9 changes: 6 additions & 3 deletions relayer/chains/icon/icon_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *quer
return err
}
}
// }

icp.log.Info("Start to query from height", zap.Int64("height", processedheight))
// subscribe to monitor block
Expand All @@ -297,7 +296,7 @@ func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *quer
icp.firstTime = true

blockReq := &types.BlockRequest{
Height: types.NewHexInt(int64(icp.chainProvider.PCfg.StartHeight)),
Height: types.NewHexInt(int64(processedheight)),
EventFilters: GetMonitorEventFilters(icp.chainProvider.PCfg.IbcHandlerAddress),
}

Expand Down Expand Up @@ -379,6 +378,9 @@ loop:
break
}
time.Sleep(10 * time.Millisecond)
if icp.firstTime {
time.Sleep(4000 * time.Millisecond)
}
icp.firstTime = false
if br = nil; len(btpBlockRespCh) > 0 {
br = <-btpBlockRespCh
Expand All @@ -400,7 +402,7 @@ loop:
if err != nil {
return err
} else if height != processedheight+i {
icp.log.Warn("Reconnect: missing block notification ",
icp.log.Warn("Reconnect: missing block notification",
zap.Int64("got", height),
zap.Int64("expected", processedheight+i),
)
Expand Down Expand Up @@ -697,6 +699,7 @@ func (icp *IconChainProcessor) clientState(ctx context.Context, clientID string)
if state, ok := icp.latestClientState[clientID]; ok {
return state, nil
}

cs, err := icp.chainProvider.QueryClientStateWithoutProof(ctx, int64(icp.latestBlock.Height), clientID)
if err != nil {
return provider.ClientState{}, err
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/icon/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (icp *IconProvider) NewClientState(
return nil, fmt.Errorf("Blockinterval cannot be empty in Icon config")
}

trustingBlockPeriod := uint64(dstTrustingPeriod) / (icp.PCfg.BlockInterval * uint64(common.NanosecondRatio))
trustingBlockPeriod := uint64(dstTrustingPeriod) / (icp.PCfg.BlockInterval * uint64(common.NanoToMilliRatio))

return &icon.ClientState{
// In case of Icon: Trusting Period is block Difference // see: light.proto in ibc-integration
Expand Down
30 changes: 23 additions & 7 deletions relayer/chains/wasm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"
"github.com/avast/retry-go/v4"
abci "github.com/cometbft/cometbft/abci/types"
rpcclient "github.com/cometbft/cometbft/rpc/client"
tmtypes "github.com/cometbft/cometbft/types"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/cosmos/gogoproto/proto"
tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint"
"github.com/icon-project/IBC-Integration/libraries/go/common/icon"
"go.uber.org/zap"

querytypes "github.com/cosmos/cosmos-sdk/types/query"
bankTypes "github.com/cosmos/cosmos-sdk/x/bank/types"
Expand Down Expand Up @@ -333,13 +335,25 @@ func (ap *WasmProvider) QueryClientConsensusState(ctx context.Context, chainHeig
return clienttypes.NewQueryConsensusStateResponse(anyConsensusState, nil, clienttypes.NewHeight(0, uint64(chainHeight))), nil
}

func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmtypes.RawContractMessage) (*wasmtypes.QuerySmartContractStateResponse, error) {
done := ap.SetSDKContext()
defer done()
return ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{
Address: ap.PCfg.IbcHandlerAddress,
QueryData: param,
})
func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmtypes.RawContractMessage) (op *wasmtypes.QuerySmartContractStateResponse, err error) {
return op, retry.Do(func() error {
done := ap.SetSDKContext()
defer done()
op, err = ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{
Address: ap.PCfg.IbcHandlerAddress,
QueryData: param,
})
return err
}, retry.Context(ctx), retry.Attempts(latestHeightQueryRetries), retry.Delay(50*time.Millisecond), retry.LastErrorOnly(true), retry.OnRetry(func(n uint, err error) {
ap.log.Error(
"Failed to query",
zap.Uint("attempt", n+1),
zap.Uint("max_attempts", latestHeightQueryRetries),
zap.Any("Param", param),
zap.Error(err),
)
}))

}

func (ap *WasmProvider) QueryIBCHandlerContractProcessed(ctx context.Context, param wasmtypes.RawContractMessage) ([]byte, error) {
Expand Down Expand Up @@ -492,6 +506,8 @@ func (ap *WasmProvider) QueryConnection(ctx context.Context, height int64, conne
}

func (ap *WasmProvider) QueryWasmProof(ctx context.Context, storageKey []byte, height int64) ([]byte, error) {
done := ap.SetSDKContext()
defer done()
ibcAddr, err := sdk.AccAddressFromBech32(ap.PCfg.IbcHandlerAddress)
if err != nil {
return nil, err
Expand Down
16 changes: 12 additions & 4 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
defaultMinQueryLoopDuration = 1 * time.Second
defaultBalanceUpdateWaitDuration = 60 * time.Second
inSyncNumBlocksThreshold = 2
MaxBlockFetch = 100
)

// latestClientState is a map of clientID to the latest clientInfo for that client.
Expand Down Expand Up @@ -176,7 +177,7 @@ func (ccp *WasmChainProcessor) nodeStatusWithRetry(ctx context.Context) (status
// clientState will return the most recent client state if client messages
// have already been observed for the clientID, otherwise it will query for it.
func (ccp *WasmChainProcessor) clientState(ctx context.Context, clientID string) (provider.ClientState, error) {
if state, ok := ccp.latestClientState[clientID]; ok && state.TrustingPeriod > 0 {
if state, ok := ccp.latestClientState[clientID]; ok {
return state, nil
}
cs, err := ccp.chainProvider.QueryClientState(ctx, int64(ccp.latestBlock.Height), clientID)
Expand Down Expand Up @@ -221,7 +222,7 @@ func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int {
func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
// this will be used for persistence across query cycle loop executions
persistence := queryCyclePersistence{
minQueryLoopDuration: time.Duration(ccp.chainProvider.PCfg.BlockInterval * uint64(common.NanosecondRatio)),
minQueryLoopDuration: time.Duration(ccp.chainProvider.PCfg.BlockInterval * uint64(common.NanoToMilliRatio)),
lastBalanceUpdate: time.Unix(0, 0),
balanceUpdateWaitDuration: defaultBalanceUpdateWaitDuration,
}
Expand Down Expand Up @@ -410,7 +411,14 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
chainID := ccp.chainProvider.ChainId()
var latestHeader provider.IBCHeader

for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
syncUpHeight := func() int64 {
if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch {
return persistence.latestQueriedBlock + MaxBlockFetch
}
return persistence.latestHeight
}

for i := persistence.latestQueriedBlock + 1; i <= syncUpHeight(); i++ {
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
var lightBlock *types.LightBlock
Expand Down Expand Up @@ -460,7 +468,7 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, ccp.chainProvider.PCfg.IbcHandlerAddress, base64Encoded)

for _, m := range messages {
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType))
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", heightUint64))
ccp.handleMessage(ctx, m, ibcMessagesCache)
}
}
Expand Down
2 changes: 1 addition & 1 deletion relayer/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
ConnectionKey = "connection"
ChannelKey = "channel"
ONE_HOUR = 60 * 60 * 1000
NanosecondRatio = 1000_000
NanoToMilliRatio = 1000_000
)

var (
Expand Down

0 comments on commit cf3dc3e

Please sign in to comment.