From 88491e9833bb99d710a05861c260bcb3a09ff43c Mon Sep 17 00:00:00 2001 From: Hoon <48665813+sh-cha@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:47:36 +0900 Subject: [PATCH] Feat/split oracletx (#45) * find tx using txsearch with tx hash * change sequence * add oracle executor account to avoid executor sequence mismatch * query block header instead of block * handle tx failed * remove redunt sequence err * update readme * return sender when creating msgs & split msg queue every sender * set bridge info first * update authz grant checker * add sender * add broadcaster account checker * sender not set bug fix * delete debug print * add sender to pendingtx * add sender when unmarshalling processed msgs * remove redundant msg queue allocation * format * delete processed msgs from db when simulation failed and error is handled * change oracle tx sender * change proof query error (#46) * enable to query withdrawals that their tree is not finalized yet (#47) * introduce authz tx msg command (#48) * introduce authz tx msg command * update readme * format * update evm version * can disable to relay oracle data by emptying oracle-bridge-executor --- README.md | 2 +- challenger/README.md | 9 +- challenger/child/child.go | 2 +- cmd/opinitd/root.go | 1 + cmd/opinitd/tx.go | 124 +++++++++++ executor/README.md | 93 +++++++-- executor/batch/handler.go | 6 +- executor/batch/noop_da.go | 8 +- executor/celestia/celestia.go | 85 ++------ executor/celestia/msg.go | 58 ++++++ executor/celestia/node.go | 16 +- executor/child/child.go | 16 +- executor/child/handler.go | 26 ++- executor/child/query.go | 40 ++-- executor/child/withdraw.go | 4 +- executor/executor.go | 19 +- executor/host/batch.go | 2 +- executor/host/deposit.go | 8 +- executor/host/handler.go | 32 +-- executor/host/host.go | 22 +- executor/host/oracle.go | 4 +- executor/types/batch.go | 2 +- executor/types/config.go | 9 +- go.mod | 8 +- go.sum | 16 +- merkle/merkle.go | 8 +- merkle/types/errors.go | 5 + node/broadcaster/account.go | 283 +++++++++++++++++++++----- node/broadcaster/account_retriever.go | 70 +++++++ node/broadcaster/broadcaster.go | 115 +++++------ node/broadcaster/db.go | 23 ++- node/broadcaster/process.go | 48 ++++- node/broadcaster/status.go | 23 +++ node/broadcaster/tx.go | 188 ++--------------- node/broadcaster/types/config.go | 37 ++-- node/broadcaster/types/db.go | 9 +- node/broadcaster/types/keys.go | 4 +- node/broadcaster/types/status.go | 11 + node/node.go | 17 +- node/process.go | 6 +- node/status.go | 18 ++ node/types/status.go | 11 +- provider/child/child.go | 134 +++++++++++- provider/child/msgs.go | 37 ++-- provider/child/query.go | 89 ++++++-- provider/host/host.go | 41 +++- provider/host/msgs.go | 24 +-- provider/host/query.go | 17 -- types/const.go | 2 + 49 files changed, 1208 insertions(+), 624 deletions(-) create mode 100644 cmd/opinitd/tx.go create mode 100644 executor/celestia/msg.go create mode 100644 merkle/types/errors.go create mode 100644 node/broadcaster/account_retriever.go create mode 100644 node/broadcaster/status.go create mode 100644 node/broadcaster/types/status.go create mode 100644 node/status.go diff --git a/README.md b/README.md index 230ad34..d4e8504 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ To ensure compatibility with the node version, check the following versions: | L1 Node | MiniMove | MiniWasm | MiniEVM | | ------- | -------- | -------- | ------- | -| v0.5.3+ | v0.5.3+ | v0.5.2+ | v0.5.2+ | +| v0.6.1+ | v0.6.4+ | v0.6.4+ | v0.6.6+ | ### Build and Configure diff --git a/challenger/README.md b/challenger/README.md index d83488d..b066bdb 100644 --- a/challenger/README.md +++ b/challenger/README.md @@ -18,8 +18,13 @@ To configure the Challenger, fill in the values in the `~/.opinit/challenger.jso // Version is the version used to build output root. // Please refer to `spec_version.json` for the correct version for each network. "version": 1, - // ListenAddress is the address to listen for incoming requests. - "listen_address": "localhost:3001", + // Server is the configuration for the server. + "server": { + "address": "localhost:3000", + "allow_origins": "*", + "allow_headers": "Origin, Content-Type, Accept", + "allow_methods": "GET", + }, "l1_node": { "chain_id": "testnet-l1-1", "bech32_prefix": "init", diff --git a/challenger/child/child.go b/challenger/child/child.go index 473e8b5..1199ae6 100644 --- a/challenger/child/child.go +++ b/challenger/child/child.go @@ -57,7 +57,7 @@ func NewChildV1( } func (ch *Child) Initialize(ctx context.Context, processedHeight int64, startOutputIndex uint64, host hostNode, bridgeInfo ophosttypes.QueryBridgeResponse, challenger challenger) (time.Time, error) { - _, err := ch.BaseChild.Initialize(ctx, processedHeight, startOutputIndex, bridgeInfo, nil) + _, err := ch.BaseChild.Initialize(ctx, processedHeight, startOutputIndex, bridgeInfo, nil, nil) if err != nil { return time.Time{}, err } diff --git a/cmd/opinitd/root.go b/cmd/opinitd/root.go index 50f892a..c47e6eb 100644 --- a/cmd/opinitd/root.go +++ b/cmd/opinitd/root.go @@ -49,6 +49,7 @@ func NewRootCmd() *cobra.Command { resetHeightsCmd(ctx), resetHeightCmd(ctx), migration015Cmd(ctx), + txCmd(ctx), version.NewVersionCommand(), ) return rootCmd diff --git a/cmd/opinitd/tx.go b/cmd/opinitd/tx.go new file mode 100644 index 0000000..33d5163 --- /dev/null +++ b/cmd/opinitd/tx.go @@ -0,0 +1,124 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + "cosmossdk.io/errors" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" + + "github.com/cosmos/cosmos-sdk/x/authz" + + "github.com/initia-labs/opinit-bots/bot" + bottypes "github.com/initia-labs/opinit-bots/bot/types" + executortypes "github.com/initia-labs/opinit-bots/executor/types" + "github.com/initia-labs/opinit-bots/keys" + "github.com/initia-labs/opinit-bots/node/broadcaster" + broadcastertypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" + "github.com/initia-labs/opinit-bots/node/rpcclient" + "github.com/initia-labs/opinit-bots/provider/child" + "github.com/initia-labs/opinit-bots/types" + + sdk "github.com/cosmos/cosmos-sdk/types" +) + +// txCmd represents the tx command +func txCmd(ctx *cmdContext) *cobra.Command { + cmd := &cobra.Command{ + Use: "tx", + Short: "send a transaction", + } + + cmd.AddCommand( + txGrantOracleCmd(ctx), + ) + return cmd +} + +func txGrantOracleCmd(baseCtx *cmdContext) *cobra.Command { + cmd := &cobra.Command{ + Use: "grant-oracle [oracle-account-address]", + Args: cobra.ExactArgs(1), + Short: "Grant oracle permission to the given account", + Long: `Grant oracle permission to the given account on L2 chain`, + RunE: func(cmd *cobra.Command, args []string) error { + cmdCtx, botDone := context.WithCancel(cmd.Context()) + gracefulShutdown(botDone) + + errGrp, ctx := errgroup.WithContext(cmdCtx) + ctx = types.WithErrGrp(ctx, errGrp) + + account, err := l2BroadcasterAccount(baseCtx, cmd) + if err != nil { + return err + } + err = account.Load(ctx) + if err != nil { + return err + } + + oracleAddress, err := keys.DecodeBech32AccAddr(args[0], account.Bech32Prefix()) + if err != nil { + return err + } + + grantMsg, err := authz.NewMsgGrant(account.GetAddress(), oracleAddress, authz.NewGenericAuthorization(types.MsgUpdateOracleTypeUrl), nil) + if err != nil { + return err + } + + txBytes, _, err := account.BuildTxWithMessages(ctx, []sdk.Msg{grantMsg}) + if err != nil { + return errors.Wrapf(err, "simulation failed") + } + + res, err := account.BroadcastTxSync(ctx, txBytes) + if err != nil { + // TODO: handle error, may repeat sending tx + return fmt.Errorf("broadcast txs: %w", err) + } + bz, err := json.Marshal(res) + if err != nil { + return err + } + fmt.Println(string(bz)) + return nil + }, + } + + cmd = configFlag(baseCtx.v, cmd) + return cmd +} + +func l2BroadcasterAccount(ctx *cmdContext, cmd *cobra.Command) (*broadcaster.BroadcasterAccount, error) { + configPath, err := getConfigPath(cmd, ctx.homePath, string(bottypes.BotTypeExecutor)) + if err != nil { + return nil, err + } + + cfg := &executortypes.Config{} + err = bot.LoadJsonConfig(configPath, cfg) + if err != nil { + return nil, err + } + + l2Config := cfg.L2NodeConfig(ctx.homePath) + broadcasterConfig := l2Config.BroadcasterConfig + cdc, txConfig, err := child.GetCodec(broadcasterConfig.Bech32Prefix) + if err != nil { + return nil, err + } + + rpcClient, err := rpcclient.NewRPCClient(cdc, l2Config.RPC) + if err != nil { + return nil, err + } + + keyringConfig := broadcastertypes.KeyringConfig{ + Name: cfg.BridgeExecutor, + } + + return broadcaster.NewBroadcasterAccount(*broadcasterConfig, cdc, txConfig, rpcClient, keyringConfig) +} diff --git a/executor/README.md b/executor/README.md index 71dc1ee..848f4b5 100644 --- a/executor/README.md +++ b/executor/README.md @@ -16,8 +16,13 @@ To configure the Executor, fill in the values in the `~/.opinit/executor.json` f // Version is the version used to build output root. // Please refer to `spec_version.json` for the correct version for each network. "version": 1, - // ListenAddress is the address to listen for incoming requests. - "listen_address": "localhost:3000", + // Server is the configuration for the server. + "server": { + "address": "localhost:3000", + "allow_origins": "*", + "allow_headers": "Origin, Content-Type, Accept", + "allow_methods": "GET", + }, "l1_node": { "chain_id": "testnet-l1-1", "bech32_prefix": "init", @@ -47,6 +52,11 @@ To configure the Executor, fill in the values in the `~/.opinit/executor.json` f // // If you don't want to use the bridge executor feature, you can leave it empty. "bridge_executor": "", + // OracleBridgeExecutor is the key name in the keyring for the oracle bridge executor, + // which is used to relay oracle transaction from l1 to l2. + // + // If L2 is using oracle, you need to set this field. + "oracle_bridge_executor": "", // DisableOutputSubmitter is the flag to disable the output submitter. // If it is true, the output submitter will not be started. @@ -80,6 +90,14 @@ To configure the Executor, fill in the values in the `~/.opinit/executor.json` f } ``` +### Oracle config +If you want to enable to relay oracle data, the `oracle_bridge_executor` field must be set. The oracle data is stored in the 0th tx of each L1 block. The bridge executor submits a `MsgUpdateOracle` containing the 0th Tx of l1 block to l2 when a block in l1 is created. + +The `oracle_bridge_executor` must be an account that has received the authz grant from the executor. If it is not set, you can set the authz with the command below. +```bash +opinitd tx grant-oracle [oracle-account-address] +``` + ### Start height config examples If the latest height stored in the db is not 0, start height config is ignored. @@ -319,13 +337,18 @@ curl localhost:3000/status ```json { - "bridge_id": 1, + "bridge_id": 0, "host": { "node": { "last_block_height": 0, "broadcaster": { "pending_txs": 0, - "sequence": 0 + "accounts_status": [ + { + "address": "", + "sequence": 0 + } + ] } }, "last_proposed_output_index": 0, @@ -336,7 +359,16 @@ curl localhost:3000/status "last_block_height": 0, "broadcaster": { "pending_txs": 0, - "sequence": 0 + "accounts_status": [ + { + "address": "", + "sequence": 0 + }, + { + "address": "", + "sequence": 0 + } + ] } }, "last_updated_oracle_height": 0, @@ -350,7 +382,7 @@ curl localhost:3000/status }, "batch": { "node": { - "last_block_height": 0, + "last_block_height": 0 }, "batch_info": { "submitter": "", @@ -364,7 +396,12 @@ curl localhost:3000/status "da": { "broadcaster": { "pending_txs": 0, - "sequence": 0 + "accounts_status": [ + { + "address": "", + "sequence": 0 + } + ] } } } @@ -379,20 +416,32 @@ initiad tx ophost finalize-token-withdrawal ./withdrawal-info.json --gas= --gas- ```go type QueryWithdrawalResponse struct { - // fields required to withdraw funds - BridgeId uint64 `json:"bridge_id"` - OutputIndex uint64 `json:"output_index"` - WithdrawalProofs [][]byte `json:"withdrawal_proofs"` - Sender string `json:"sender"` - Sequence uint64 `json:"sequence"` - Amount string `json:"amount"` - Version []byte `json:"version"` - StorageRoot []byte `json:"storage_root"` - LatestBlockHash []byte `json:"latest_block_hash"` - - // extra info - BlockNumber int64 `json:"block_number"` - Receiver string `json:"receiver"` - WithdrawalHash []byte `json:"withdrawal_hash"` + Sequence uint64 `json:"sequence"` + To string `json:"to"` + From string `json:"from"` + Amount types.Coin `json:"amount"` + OutputIndex uint64 `json:"output_index"` + BridgeId uint64 `json:"bridge_id"` + WithdrawalProofs [][]byte `json:"withdrawal_proofs"` + Version []byte `json:"version"` + StorageRoot []byte `json:"storage_root"` + LastBlockHash []byte `json:"last_block_hash"` } ``` + +```bash +curl localhost:3000/withdrawals/{address} +``` +default options +- `limit`: 10 +- `offset`: 0 +- `order`: desc + + +```go +type QueryWithdrawalsResponse struct { + Withdrawals []QueryWithdrawalResponse `json:"withdrawals"` + Next uint64 `json:"next"` + Total uint64 `json:"total"` +} +``` \ No newline at end of file diff --git a/executor/batch/handler.go b/executor/batch/handler.go index b3b5cde..a86ab6f 100644 --- a/executor/batch/handler.go +++ b/executor/batch/handler.go @@ -201,11 +201,12 @@ func (bs *BatchSubmitter) finalizeBatch(ctx context.Context, blockHeight int64) checksums, ) - msg, err := bs.da.CreateBatchMsg(headerData) + msg, sender, err := bs.da.CreateBatchMsg(headerData) if err != nil { return err } else if msg != nil { bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{ + Sender: sender, Msgs: []sdk.Msg{msg}, Timestamp: time.Now().UnixNano(), Save: true, @@ -220,11 +221,12 @@ func (bs *BatchSubmitter) finalizeBatch(ctx context.Context, blockHeight int64) types.MustInt64ToUint64(int64(len(checksums))), chunk, ) - msg, err := bs.da.CreateBatchMsg(chunkData) + msg, sender, err := bs.da.CreateBatchMsg(chunkData) if err != nil { return err } else if msg != nil { bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{ + Sender: sender, Msgs: []sdk.Msg{msg}, Timestamp: time.Now().UnixNano(), Save: true, diff --git a/executor/batch/noop_da.go b/executor/batch/noop_da.go index 3c7c047..6a03be6 100644 --- a/executor/batch/noop_da.go +++ b/executor/batch/noop_da.go @@ -20,10 +20,10 @@ func NewNoopDA() *NoopDA { return &NoopDA{} } -func (n NoopDA) Start(_ context.Context) {} -func (n NoopDA) HasKey() bool { return false } -func (n NoopDA) CreateBatchMsg(_ []byte) (sdk.Msg, error) { return nil, nil } -func (n NoopDA) BroadcastMsgs(nil btypes.ProcessedMsgs) {} +func (n NoopDA) Start(_ context.Context) {} +func (n NoopDA) HasKey() bool { return false } +func (n NoopDA) CreateBatchMsg(_ []byte) (sdk.Msg, string, error) { return nil, "", nil } +func (n NoopDA) BroadcastMsgs(nil btypes.ProcessedMsgs) {} func (n NoopDA) ProcessedMsgsToRawKV(_ []btypes.ProcessedMsgs, _ bool) ([]types.RawKV, error) { return nil, nil } diff --git a/executor/celestia/celestia.go b/executor/celestia/celestia.go index 0343c6d..ffe5835 100644 --- a/executor/celestia/celestia.go +++ b/executor/celestia/celestia.go @@ -7,14 +7,10 @@ import ( "go.uber.org/zap" - "github.com/cometbft/cometbft/crypto/merkle" - "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/x/auth" - inclusion "github.com/celestiaorg/go-square/v2/inclusion" sh "github.com/celestiaorg/go-square/v2/share" executortypes "github.com/initia-labs/opinit-bots/executor/types" @@ -45,9 +41,6 @@ type Celestia struct { cfg nodetypes.NodeConfig db types.DB logger *zap.Logger - - processedMsgs []btypes.ProcessedMsgs - msgQueue []sdk.Msg } func NewDACelestia( @@ -60,9 +53,6 @@ func NewDACelestia( cfg: cfg, db: db, logger: logger, - - processedMsgs: make([]btypes.ProcessedMsgs, 0), - msgQueue: make([]sdk.Msg, 0), } appCodec, txConfig, err := createCodec(cfg.Bech32Prefix) @@ -70,11 +60,6 @@ func NewDACelestia( panic(err) } - if cfg.BroadcasterConfig != nil { - cfg.BroadcasterConfig.BuildTxWithMessages = c.BuildTxWithMessages - cfg.BroadcasterConfig.PendingTxToProcessedMsgs = c.PendingTxToProcessedMsgs - } - node, err := node.NewNode(cfg, db, logger, appCodec, txConfig) if err != nil { panic(err) @@ -95,7 +80,7 @@ func createCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) { } func (c *Celestia) Initialize(ctx context.Context, batch batchNode, bridgeId uint64, keyringConfig *btypes.KeyringConfig) error { - err := c.node.Initialize(ctx, 0, keyringConfig) + err := c.node.Initialize(ctx, 0, c.keyringConfigs(keyringConfig)) if err != nil { return err } @@ -142,66 +127,32 @@ func (c Celestia) GetHeight() int64 { return c.node.GetHeight() } -func (c Celestia) CreateBatchMsg(rawBlob []byte) (sdk.Msg, error) { - submitter, err := c.GetAddressStr() - if err != nil { - if errors.Is(err, types.ErrKeyNotSet) { - return nil, nil - } - return nil, err - } - blob, err := sh.NewV0Blob(c.namespace, rawBlob) - if err != nil { - return nil, err - } - commitment, err := inclusion.CreateCommitment(blob, - merkle.HashFromByteSlices, - // https://github.com/celestiaorg/celestia-app/blob/4f4d0f7ff1a43b62b232726e52d1793616423df7/pkg/appconsts/v1/app_consts.go#L6 - 64, - ) - if err != nil { - return nil, err - } - - dataLength, err := types.SafeIntToUint32(len(blob.Data())) - if err != nil { - return nil, err - } - - return &celestiatypes.MsgPayForBlobsWithBlob{ - MsgPayForBlobs: &celestiatypes.MsgPayForBlobs{ - Signer: submitter, - Namespaces: [][]byte{c.namespace.Bytes()}, - ShareCommitments: [][]byte{commitment}, - BlobSizes: []uint32{dataLength}, - ShareVersions: []uint32{uint32(blob.ShareVersion())}, - }, - Blob: &celestiatypes.Blob{ - NamespaceId: blob.Namespace().ID(), - Data: blob.Data(), - ShareVersion: uint32(blob.ShareVersion()), - NamespaceVersion: uint32(blob.Namespace().Version()), - }, - }, nil -} - func (c Celestia) NamespaceID() []byte { chainIDhash := sha256.Sum256([]byte(c.batch.ChainID())) return chainIDhash[:10] } -func (c Celestia) GetAddress() (sdk.AccAddress, error) { +func (c Celestia) BaseAccountAddress() (string, error) { broadcaster, err := c.node.GetBroadcaster() if err != nil { - return nil, err + if errors.Is(err, types.ErrKeyNotSet) { + return "", nil + } + return "", err + } + account, err := broadcaster.AccountByIndex(0) + if err != nil { + return "", err } - return broadcaster.GetAddress(), nil + + sender := account.GetAddressString() + return sender, nil } -func (c Celestia) GetAddressStr() (string, error) { - broadcaster, err := c.node.GetBroadcaster() - if err != nil { - return "", err +func (c Celestia) keyringConfigs(baseConfig *btypes.KeyringConfig) []btypes.KeyringConfig { + var configs []btypes.KeyringConfig + if baseConfig != nil { + configs = append(configs, *baseConfig) } - return broadcaster.GetAddressString() + return configs } diff --git a/executor/celestia/msg.go b/executor/celestia/msg.go new file mode 100644 index 0000000..59038ec --- /dev/null +++ b/executor/celestia/msg.go @@ -0,0 +1,58 @@ +package celestia + +import ( + "errors" + + "github.com/cometbft/cometbft/crypto/merkle" + + sdk "github.com/cosmos/cosmos-sdk/types" + + inclusion "github.com/celestiaorg/go-square/v2/inclusion" + sh "github.com/celestiaorg/go-square/v2/share" + + "github.com/initia-labs/opinit-bots/types" + celestiatypes "github.com/initia-labs/opinit-bots/types/celestia" +) + +func (c Celestia) CreateBatchMsg(rawBlob []byte) (sdk.Msg, string, error) { + submitter, err := c.BaseAccountAddress() + if err != nil { + if errors.Is(err, types.ErrKeyNotSet) { + return nil, "", nil + } + return nil, "", err + } + blob, err := sh.NewV0Blob(c.namespace, rawBlob) + if err != nil { + return nil, "", err + } + commitment, err := inclusion.CreateCommitment(blob, + merkle.HashFromByteSlices, + // https://github.com/celestiaorg/celestia-app/blob/4f4d0f7ff1a43b62b232726e52d1793616423df7/pkg/appconsts/v1/app_consts.go#L6 + 64, + ) + if err != nil { + return nil, "", err + } + + dataLength, err := types.SafeIntToUint32(len(blob.Data())) + if err != nil { + return nil, "", err + } + + return &celestiatypes.MsgPayForBlobsWithBlob{ + MsgPayForBlobs: &celestiatypes.MsgPayForBlobs{ + Signer: submitter, + Namespaces: [][]byte{c.namespace.Bytes()}, + ShareCommitments: [][]byte{commitment}, + BlobSizes: []uint32{dataLength}, + ShareVersions: []uint32{uint32(blob.ShareVersion())}, + }, + Blob: &celestiatypes.Blob{ + NamespaceId: blob.Namespace().ID(), + Data: blob.Data(), + ShareVersion: uint32(blob.ShareVersion()), + NamespaceVersion: uint32(blob.Namespace().Version()), + }, + }, submitter, nil +} diff --git a/executor/celestia/node.go b/executor/celestia/node.go index 7491663..89d596b 100644 --- a/executor/celestia/node.go +++ b/executor/celestia/node.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/cosmos/cosmos-sdk/client/tx" sdk "github.com/cosmos/cosmos-sdk/types" btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" @@ -34,25 +33,14 @@ func (c *Celestia) BuildTxWithMessages( blobMsgs = append(blobMsgs, withBlobMsg.Blob) } - b := c.node.MustGetBroadcaster() - txf := b.GetTxf() - - _, adjusted, err := b.CalculateGas(ctx, txf, pfbMsgs...) + broadcasterAccount, err := c.node.MustGetBroadcaster().AccountByIndex(0) if err != nil { return nil, "", err } - - txf = txf.WithGas(adjusted) - txb, err := txf.BuildUnsignedTx(pfbMsgs...) + tx, err := broadcasterAccount.SimulateAndSignTx(ctx, pfbMsgs...) if err != nil { return nil, "", err } - - if err = tx.Sign(ctx, txf, b.KeyName(), txb, false); err != nil { - return nil, "", err - } - - tx := txb.GetTx() txConfig := c.node.GetTxConfig() txBytes, err = txutils.EncodeTx(txConfig, tx) if err != nil { diff --git a/executor/child/child.go b/executor/child/child.go index 845c1c2..f0775af 100644 --- a/executor/child/child.go +++ b/executor/child/child.go @@ -19,14 +19,14 @@ import ( ) type hostNode interface { - GetAddressStr() (string, error) HasKey() bool + BaseAccountAddressString() (string, error) BroadcastMsgs(btypes.ProcessedMsgs) ProcessedMsgsToRawKV([]btypes.ProcessedMsgs, bool) ([]types.RawKV, error) QueryLastOutput(context.Context, uint64) (*ophosttypes.QueryOutputProposalResponse, error) QueryOutput(context.Context, uint64, uint64, int64) (*ophosttypes.QueryOutputProposalResponse, error) - GetMsgProposeOutput(uint64, uint64, int64, []byte) (sdk.Msg, error) + GetMsgProposeOutput(uint64, uint64, int64, []byte) (sdk.Msg, string, error) } type Child struct { @@ -58,8 +58,16 @@ func NewChildV1( } } -func (ch *Child) Initialize(ctx context.Context, processedHeight int64, startOutputIndex uint64, host hostNode, bridgeInfo ophosttypes.QueryBridgeResponse, keyringConfig *btypes.KeyringConfig) error { - l2Sequence, err := ch.BaseChild.Initialize(ctx, processedHeight, startOutputIndex, bridgeInfo, keyringConfig) +func (ch *Child) Initialize( + ctx context.Context, + processedHeight int64, + startOutputIndex uint64, + host hostNode, + bridgeInfo ophosttypes.QueryBridgeResponse, + keyringConfig *btypes.KeyringConfig, + oracleKeyringConfig *btypes.KeyringConfig, +) error { + l2Sequence, err := ch.BaseChild.Initialize(ctx, processedHeight, startOutputIndex, bridgeInfo, keyringConfig, oracleKeyringConfig) if err != nil { return err } diff --git a/executor/child/handler.go b/executor/child/handler.go index 0846305..749730b 100644 --- a/executor/child/handler.go +++ b/executor/child/handler.go @@ -58,19 +58,23 @@ func (ch *Child) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) // if has key, then process the messages if ch.host.HasKey() { - msgQueue := ch.GetMsgQueue() + msgQueues := ch.GetMsgQueue() - for i := 0; i < len(msgQueue); i += 5 { - end := i + 5 - if end > len(msgQueue) { - end = len(msgQueue) - } + for sender := range msgQueues { + msgQueue := msgQueues[sender] + for i := 0; i < len(msgQueue); i += 5 { + end := i + 5 + if end > len(msgQueue) { + end = len(msgQueue) + } - ch.AppendProcessedMsgs(btypes.ProcessedMsgs{ - Msgs: slices.Clone(msgQueue[i:end]), - Timestamp: time.Now().UnixNano(), - Save: true, - }) + ch.AppendProcessedMsgs(btypes.ProcessedMsgs{ + Sender: sender, + Msgs: slices.Clone(msgQueue[i:end]), + Timestamp: time.Now().UnixNano(), + Save: true, + }) + } } msgKVs, err := ch.host.ProcessedMsgsToRawKV(ch.GetProcessedMsgs(), false) diff --git a/executor/child/query.go b/executor/child/query.go index 9520669..2dcdbbc 100644 --- a/executor/child/query.go +++ b/executor/child/query.go @@ -2,12 +2,14 @@ package child import ( "encoding/json" + "errors" "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" executortypes "github.com/initia-labs/opinit-bots/executor/types" + merkletypes "github.com/initia-labs/opinit-bots/merkle/types" ) func (ch Child) QueryWithdrawal(sequence uint64) (executortypes.QueryWithdrawalResponse, error) { @@ -16,33 +18,35 @@ func (ch Child) QueryWithdrawal(sequence uint64) (executortypes.QueryWithdrawalR return executortypes.QueryWithdrawalResponse{}, err } + amount := sdk.NewCoin(withdrawal.BaseDenom, math.NewIntFromUint64(withdrawal.Amount)) + + res := executortypes.QueryWithdrawalResponse{ + BridgeId: ch.BridgeId(), + From: withdrawal.From, + To: withdrawal.To, + Sequence: sequence, + Amount: amount, + Version: []byte{ch.Version()}, + } + proofs, outputIndex, outputRoot, extraDataBytes, err := ch.Merkle().GetProofs(sequence) - if err != nil { + if errors.Is(err, merkletypes.ErrUnfinalizedTree) { + // if the tree is not finalized, we just return only withdrawal info + return res, nil + } else if err != nil { return executortypes.QueryWithdrawalResponse{}, err } - amount := sdk.NewCoin(withdrawal.BaseDenom, math.NewIntFromUint64(withdrawal.Amount)) - treeExtraData := executortypes.TreeExtraData{} err = json.Unmarshal(extraDataBytes, &treeExtraData) if err != nil { return executortypes.QueryWithdrawalResponse{}, err } - - return executortypes.QueryWithdrawalResponse{ - BridgeId: ch.BridgeId(), - OutputIndex: outputIndex, - WithdrawalProofs: proofs, - From: withdrawal.From, - To: withdrawal.To, - Sequence: sequence, - Amount: amount, - Version: []byte{ch.Version()}, - StorageRoot: outputRoot, - LastBlockHash: treeExtraData.BlockHash, - // BlockNumber: treeExtraData.BlockNumber, - // WithdrawalHash: withdrawal.WithdrawalHash, - }, nil + res.WithdrawalProofs = proofs + res.OutputIndex = outputIndex + res.StorageRoot = outputRoot + res.LastBlockHash = treeExtraData.BlockHash + return res, nil } func (ch Child) QueryWithdrawals(address string, offset uint64, limit uint64, descOrder bool) (executortypes.QueryWithdrawalsResponse, error) { diff --git a/executor/child/withdraw.go b/executor/child/withdraw.go index a75a080..ebf118a 100644 --- a/executor/child/withdraw.go +++ b/executor/child/withdraw.go @@ -178,7 +178,7 @@ func (ch *Child) handleTree(blockHeight int64, latestHeight int64, blockId []byt func (ch *Child) handleOutput(blockHeight int64, version uint8, blockId []byte, outputIndex uint64, storageRoot []byte) error { outputRoot := ophosttypes.GenerateOutputRoot(version, storageRoot, blockId) - msg, err := ch.host.GetMsgProposeOutput( + msg, sender, err := ch.host.GetMsgProposeOutput( ch.BridgeId(), outputIndex, blockHeight, @@ -187,7 +187,7 @@ func (ch *Child) handleOutput(blockHeight int64, version uint8, blockId []byte, if err != nil { return err } else if msg != nil { - ch.AppendMsgQueue(msg) + ch.AppendMsgQueue(msg, sender) } return nil } diff --git a/executor/executor.go b/executor/executor.go index 5cee2a9..22a98e5 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -98,13 +98,13 @@ func (ex *Executor) Initialize(ctx context.Context) error { return err } - hostKeyringConfig, childKeyringConfig, daKeyringConfig := ex.getKeyringConfigs(*bridgeInfo) + hostKeyringConfig, childKeyringConfig, childOracleKeyringConfig, daKeyringConfig := ex.getKeyringConfigs(*bridgeInfo) err = ex.host.Initialize(ctx, hostProcessedHeight, ex.child, ex.batch, *bridgeInfo, hostKeyringConfig) if err != nil { return err } - err = ex.child.Initialize(ctx, childProcessedHeight, processedOutputIndex+1, ex.host, *bridgeInfo, childKeyringConfig) + err = ex.child.Initialize(ctx, childProcessedHeight, processedOutputIndex+1, ex.host, *bridgeInfo, childKeyringConfig, childOracleKeyringConfig) if err != nil { return err } @@ -221,7 +221,7 @@ func (ex *Executor) makeDANode(ctx context.Context, bridgeInfo ophosttypes.Query switch batchInfo.BatchInfo.ChainType { case ophosttypes.BatchInfo_CHAIN_TYPE_INITIA: // might not exist - hostAddrStr, err := ex.host.GetAddressStr() + hostAddrStr, err := ex.host.BaseAccountAddressString() if err != nil && !errors.Is(err, types.ErrKeyNotSet) { return nil, err } else if err == nil && hostAddrStr == batchInfo.BatchInfo.Submitter { @@ -308,7 +308,12 @@ func (ex *Executor) getProcessedHeights(ctx context.Context, bridgeId uint64) (l return l1ProcessedHeight, l2ProcessedHeight, processedOutputIndex, batchProcessedHeight, err } -func (ex *Executor) getKeyringConfigs(bridgeInfo ophosttypes.QueryBridgeResponse) (hostKeyringConfig *btypes.KeyringConfig, childKeyringConfig *btypes.KeyringConfig, daKeyringConfig *btypes.KeyringConfig) { +func (ex *Executor) getKeyringConfigs(bridgeInfo ophosttypes.QueryBridgeResponse) ( + hostKeyringConfig *btypes.KeyringConfig, + childKeyringConfig *btypes.KeyringConfig, + childOracleKeyringConfig *btypes.KeyringConfig, + daKeyringConfig *btypes.KeyringConfig, +) { if !ex.cfg.DisableOutputSubmitter { hostKeyringConfig = &btypes.KeyringConfig{ Address: bridgeInfo.BridgeConfig.Proposer, @@ -319,6 +324,12 @@ func (ex *Executor) getKeyringConfigs(bridgeInfo ophosttypes.QueryBridgeResponse childKeyringConfig = &btypes.KeyringConfig{ Name: ex.cfg.BridgeExecutor, } + + if bridgeInfo.BridgeConfig.OracleEnabled && ex.cfg.OracleBridgeExecutor != "" { + childOracleKeyringConfig = &btypes.KeyringConfig{ + Name: ex.cfg.OracleBridgeExecutor, + } + } } if !ex.cfg.DisableBatchSubmitter { diff --git a/executor/host/batch.go b/executor/host/batch.go index b4009fd..b7145e0 100644 --- a/executor/host/batch.go +++ b/executor/host/batch.go @@ -11,7 +11,7 @@ import ( ) func (h *Host) recordBatchHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { - hostAddress, err := h.GetAddressStr() + hostAddress, err := h.BaseAccountAddressString() if err != nil { if errors.Is(err, types.ErrKeyNotSet) { return nil diff --git a/executor/host/deposit.go b/executor/host/deposit.go index 3e3be4c..5d4e742 100644 --- a/executor/host/deposit.go +++ b/executor/host/deposit.go @@ -25,7 +25,7 @@ func (h *Host) initiateDepositHandler(_ context.Context, args nodetypes.EventHan return nil } - msg, err := h.handleInitiateDeposit( + msg, sender, err := h.handleInitiateDeposit( l1Sequence, args.BlockHeight, from, @@ -38,7 +38,7 @@ func (h *Host) initiateDepositHandler(_ context.Context, args nodetypes.EventHan if err != nil { return err } else if msg != nil { - h.AppendMsgQueue(msg) + h.AppendMsgQueue(msg, sender) } return nil } @@ -52,10 +52,10 @@ func (h *Host) handleInitiateDeposit( l2Denom string, amount string, data []byte, -) (sdk.Msg, error) { +) (sdk.Msg, string, error) { coinAmount, ok := math.NewIntFromString(amount) if !ok { - return nil, errors.New("invalid amount") + return nil, "", errors.New("invalid amount") } coin := sdk.NewCoin(l2Denom, coinAmount) diff --git a/executor/host/handler.go b/executor/host/handler.go index 620a699..9f7fb75 100644 --- a/executor/host/handler.go +++ b/executor/host/handler.go @@ -22,23 +22,27 @@ func (h *Host) beginBlockHandler(_ context.Context, args nodetypes.BeginBlockArg func (h *Host) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) error { // collect more msgs if block height is not latest blockHeight := args.Block.Header.Height - msgQueue := h.GetMsgQueue() + msgQueues := h.GetMsgQueue() batchKVs := []types.RawKV{ h.Node().SyncInfoToRawKV(blockHeight), } - if h.Node().HasBroadcaster() { - for i := 0; i < len(msgQueue); i += 5 { - end := i + 5 - if end > len(msgQueue) { - end = len(msgQueue) - } + if h.child.HasKey() { + for sender := range msgQueues { + msgQueue := msgQueues[sender] + for i := 0; i < len(msgQueue); i += 5 { + end := i + 5 + if end > len(msgQueue) { + end = len(msgQueue) + } - h.AppendProcessedMsgs(btypes.ProcessedMsgs{ - Msgs: slices.Clone(msgQueue[i:end]), - Timestamp: time.Now().UnixNano(), - Save: true, - }) + h.AppendProcessedMsgs(btypes.ProcessedMsgs{ + Sender: sender, + Msgs: slices.Clone(msgQueue[i:end]), + Timestamp: time.Now().UnixNano(), + Save: true, + }) + } } msgkvs, err := h.child.ProcessedMsgsToRawKV(h.GetProcessedMsgs(), false) @@ -61,10 +65,12 @@ func (h *Host) endBlockHandler(_ context.Context, args nodetypes.EndBlockArgs) e func (h *Host) txHandler(_ context.Context, args nodetypes.TxHandlerArgs) error { if args.BlockHeight == args.LatestHeight && args.TxIndex == 0 { - if msg, err := h.oracleTxHandler(args.BlockHeight, args.Tx); err != nil { + msg, sender, err := h.oracleTxHandler(args.BlockHeight, args.Tx) + if err != nil { return err } else if msg != nil { h.AppendProcessedMsgs(btypes.ProcessedMsgs{ + Sender: sender, Msgs: []sdk.Msg{msg}, Timestamp: time.Now().UnixNano(), Save: false, diff --git a/executor/host/host.go b/executor/host/host.go index b7b78a9..c957e2a 100644 --- a/executor/host/host.go +++ b/executor/host/host.go @@ -18,14 +18,15 @@ import ( ) type childNode interface { - GetAddressStr() (string, error) HasKey() bool BroadcastMsgs(btypes.ProcessedMsgs) ProcessedMsgsToRawKV([]btypes.ProcessedMsgs, bool) ([]types.RawKV, error) QueryNextL1Sequence(context.Context, int64) (uint64, error) + BaseAccountAddressString() (string, error) + OracleAccountAddressString() (string, error) - GetMsgFinalizeTokenDeposit(string, string, sdk.Coin, uint64, int64, string, []byte) (sdk.Msg, error) - GetMsgUpdateOracle(int64, []byte) (sdk.Msg, error) + GetMsgFinalizeTokenDeposit(string, string, sdk.Coin, uint64, int64, string, []byte) (sdk.Msg, string, error) + GetMsgUpdateOracle(int64, []byte) (sdk.Msg, string, error) } type batchNode interface { @@ -56,7 +57,14 @@ func NewHostV1( } } -func (h *Host) Initialize(ctx context.Context, processedHeight int64, child childNode, batch batchNode, bridgeInfo ophosttypes.QueryBridgeResponse, keyringConfig *btypes.KeyringConfig) error { +func (h *Host) Initialize( + ctx context.Context, + processedHeight int64, + child childNode, + batch batchNode, + bridgeInfo ophosttypes.QueryBridgeResponse, + keyringConfig *btypes.KeyringConfig, +) error { err := h.BaseHost.Initialize(ctx, processedHeight, bridgeInfo, keyringConfig) if err != nil { return err @@ -71,7 +79,11 @@ func (h *Host) Initialize(ctx context.Context, processedHeight int64, child chil return nil } -func (h *Host) InitializeDA(ctx context.Context, bridgeInfo ophosttypes.QueryBridgeResponse, keyringConfig *btypes.KeyringConfig) error { +func (h *Host) InitializeDA( + ctx context.Context, + bridgeInfo ophosttypes.QueryBridgeResponse, + keyringConfig *btypes.KeyringConfig, +) error { err := h.BaseHost.Initialize(ctx, 0, bridgeInfo, keyringConfig) if err != nil { return err diff --git a/executor/host/oracle.go b/executor/host/oracle.go index 41a95e5..7dc1029 100644 --- a/executor/host/oracle.go +++ b/executor/host/oracle.go @@ -8,9 +8,9 @@ import ( // If the relay oracle is enabled and the extended commit info contains votes, create a new MsgUpdateOracle message. // Else return nil. -func (h *Host) oracleTxHandler(blockHeight int64, extCommitBz comettypes.Tx) (sdk.Msg, error) { +func (h *Host) oracleTxHandler(blockHeight int64, extCommitBz comettypes.Tx) (sdk.Msg, string, error) { if !h.OracleEnabled() { - return nil, nil + return nil, "", nil } return h.child.GetMsgUpdateOracle( blockHeight, diff --git a/executor/types/batch.go b/executor/types/batch.go index 4a0a8d8..826d9ef 100644 --- a/executor/types/batch.go +++ b/executor/types/batch.go @@ -17,7 +17,7 @@ import ( type DANode interface { Start(context.Context) HasKey() bool - CreateBatchMsg([]byte) (sdk.Msg, error) + CreateBatchMsg([]byte) (sdk.Msg, string, error) BroadcastMsgs(btypes.ProcessedMsgs) ProcessedMsgsToRawKV(processedMsgs []btypes.ProcessedMsgs, delete bool) ([]types.RawKV, error) GetNodeStatus() (nodetypes.Status, error) diff --git a/executor/types/config.go b/executor/types/config.go index 1617a69..bc3a2ec 100644 --- a/executor/types/config.go +++ b/executor/types/config.go @@ -52,6 +52,12 @@ type Config struct { // If you don't want to use the bridge executor feature, you can leave it empty. BridgeExecutor string `json:"bridge_executor"` + // OracleBridgeExecutor is the key name in the keyring for the oracle bridge executor, + // which is used to relay oracle transaction from l1 to l2. + // + // If L2 is using oracle, you need to set this field. + OracleBridgeExecutor string `json:"oracle_bridge_executor"` + // DisableOutputSubmitter is the flag to disable the output submitter. // If it is true, the output submitter will not be started. DisableOutputSubmitter bool `json:"disable_output_submitter"` @@ -123,6 +129,7 @@ func DefaultConfig() *Config { }, BridgeExecutor: "", + OracleBridgeExecutor: "", DisableOutputSubmitter: false, DisableBatchSubmitter: false, @@ -216,7 +223,7 @@ func (cfg Config) L2NodeConfig(homePath string) nodetypes.NodeConfig { Bech32Prefix: cfg.L2Node.Bech32Prefix, } - if cfg.BridgeExecutor != "" { + if cfg.BridgeExecutor != "" || cfg.OracleBridgeExecutor != "" { nc.BroadcasterConfig = &btypes.BroadcasterConfig{ ChainID: cfg.L2Node.ChainID, GasPrice: cfg.L2Node.GasPrice, diff --git a/go.mod b/go.mod index aa9bdb8..5e8696b 100644 --- a/go.mod +++ b/go.mod @@ -6,14 +6,14 @@ require ( cosmossdk.io/core v0.11.1 cosmossdk.io/errors v1.0.1 cosmossdk.io/math v1.3.0 - cosmossdk.io/x/tx v0.13.4 + cosmossdk.io/x/tx v0.13.5 github.com/celestiaorg/go-square/v2 v2.0.0 github.com/cometbft/cometbft v0.38.12 - github.com/cosmos/cosmos-sdk v0.50.9 + github.com/cosmos/cosmos-sdk v0.50.10 github.com/cosmos/go-bip39 v1.0.0 github.com/cosmos/gogoproto v1.7.0 github.com/gofiber/fiber/v2 v2.52.5 - github.com/initia-labs/OPinit v0.5.2 + github.com/initia-labs/OPinit v0.6.1 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 @@ -112,7 +112,7 @@ require ( github.com/iancoleman/strcase v0.3.0 // indirect github.com/improbable-eng/grpc-web v0.15.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/initia-labs/OPinit/api v0.5.1 // indirect + github.com/initia-labs/OPinit/api v0.6.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/kr/pretty v0.3.1 // indirect diff --git a/go.sum b/go.sum index ad491ba..91b3013 100644 --- a/go.sum +++ b/go.sum @@ -786,8 +786,8 @@ cosmossdk.io/x/evidence v0.1.1 h1:Ks+BLTa3uftFpElLTDp9L76t2b58htjVbSZ86aoK/E4= cosmossdk.io/x/evidence v0.1.1/go.mod h1:OoDsWlbtuyqS70LY51aX8FBTvguQqvFrt78qL7UzeNc= cosmossdk.io/x/feegrant v0.1.1 h1:EKFWOeo/pup0yF0svDisWWKAA9Zags6Zd0P3nRvVvw8= cosmossdk.io/x/feegrant v0.1.1/go.mod h1:2GjVVxX6G2fta8LWj7pC/ytHjryA6MHAJroBWHFNiEQ= -cosmossdk.io/x/tx v0.13.4 h1:Eg0PbJgeO0gM8p5wx6xa0fKR7hIV6+8lC56UrsvSo0Y= -cosmossdk.io/x/tx v0.13.4/go.mod h1:BkFqrnGGgW50Y6cwTy+JvgAhiffbGEKW6KF9ufcDpvk= +cosmossdk.io/x/tx v0.13.5 h1:FdnU+MdmFWn1pTsbfU0OCf2u6mJ8cqc1H4OMG418MLw= +cosmossdk.io/x/tx v0.13.5/go.mod h1:V6DImnwJMTq5qFjeGWpXNiT/fjgE4HtmclRmTqRVM3w= cosmossdk.io/x/upgrade v0.1.4 h1:/BWJim24QHoXde8Bc64/2BSEB6W4eTydq0X/2f8+g38= cosmossdk.io/x/upgrade v0.1.4/go.mod h1:9v0Aj+fs97O+Ztw+tG3/tp5JSlrmT7IcFhAebQHmOPo= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= @@ -940,8 +940,8 @@ github.com/cosmos/cosmos-db v1.0.2 h1:hwMjozuY1OlJs/uh6vddqnk9j7VamLv+0DBlbEXbAK github.com/cosmos/cosmos-db v1.0.2/go.mod h1:Z8IXcFJ9PqKK6BIsVOB3QXtkKoqUOp1vRvPT39kOXEA= github.com/cosmos/cosmos-proto v1.0.0-beta.5 h1:eNcayDLpip+zVLRLYafhzLvQlSmyab+RC5W7ZfmxJLA= github.com/cosmos/cosmos-proto v1.0.0-beta.5/go.mod h1:hQGLpiIUloJBMdQMMWb/4wRApmI9hjHH05nefC0Ojec= -github.com/cosmos/cosmos-sdk v0.50.9 h1:gt2usjz0H0qW6KwAxWw7ZJ3XU8uDwmhN+hYG3nTLeSg= -github.com/cosmos/cosmos-sdk v0.50.9/go.mod h1:TMH6wpoYBcg7Cp5BEg8fneLr+8XloNQkf2MRNF9V6JE= +github.com/cosmos/cosmos-sdk v0.50.10 h1:zXfeu/z653tWZARr/jESzAEiCUYjgJwwG4ytnYWMoDM= +github.com/cosmos/cosmos-sdk v0.50.10/go.mod h1:6Eesrx3ZE7vxBZWpK++30H+Uc7Q4ahQWCL7JKU/LEdU= github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY= github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw= github.com/cosmos/gogogateway v1.2.0 h1:Ae/OivNhp8DqBi/sh2A8a1D0y638GpL3tkmLQAiKxTE= @@ -1327,10 +1327,10 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/initia-labs/OPinit v0.5.2 h1:KEiC72KGBa+X5DDMRrzVJCqNEGi7KX5cOYH/zAoDaQk= -github.com/initia-labs/OPinit v0.5.2/go.mod h1:lx1amLMszculwPu8ln+btJno38UV28fd2nP7XC88ZeE= -github.com/initia-labs/OPinit/api v0.5.1 h1:zwyJf7HtKJCKvLJ1R9PjVfJO1L+d/jKoeFyT7WTLHFI= -github.com/initia-labs/OPinit/api v0.5.1/go.mod h1:gHK6DEWb3/DqQD5LjKirUx9jilAh2UioXanoQdgqVfU= +github.com/initia-labs/OPinit v0.6.1 h1:G9ebeYeqPlV9Z2s3JdSWfwQAUgIM+nhkcA8xSJUMR7M= +github.com/initia-labs/OPinit v0.6.1/go.mod h1:gDpCh4Zx94mihwgzP/PLav8eVHLroZBu3dFyzCy8iIs= +github.com/initia-labs/OPinit/api v0.6.0 h1:Q3hDHpTd9EqlDfY/OryCKIwuXYWJxGJdGfJicV1RjL4= +github.com/initia-labs/OPinit/api v0.6.0/go.mod h1:gHK6DEWb3/DqQD5LjKirUx9jilAh2UioXanoQdgqVfU= github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls= github.com/jhump/protoreflect v1.15.3/go.mod h1:4ORHmSBmlCW8fh3xHmJMGyul1zNqZK4Elxc8qKP+p1k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= diff --git a/merkle/merkle.go b/merkle/merkle.go index 66e368f..14a0048 100644 --- a/merkle/merkle.go +++ b/merkle/merkle.go @@ -300,7 +300,9 @@ func (m *Merkle) InsertLeaf(data []byte) error { // GetProofs returns the proofs for the leaf with the given index. func (m *Merkle) GetProofs(leafIndex uint64) (proofs [][]byte, treeIndex uint64, rootData []byte, extraData []byte, err error) { _, value, err := m.db.SeekPrevInclusiveKey(merkletypes.FinalizedTreeKey, merkletypes.PrefixedFinalizedTreeKey(leafIndex)) - if err != nil { + if errors.Is(err, dbtypes.ErrNotFound) { + return nil, 0, nil, nil, merkletypes.ErrUnfinalizedTree + } else if err != nil { return nil, 0, nil, nil, err } @@ -310,8 +312,10 @@ func (m *Merkle) GetProofs(leafIndex uint64) (proofs [][]byte, treeIndex uint64, } // Check if the leaf index is in the tree - if leafIndex < treeInfo.StartLeafIndex || leafIndex-treeInfo.StartLeafIndex >= treeInfo.LeafCount { + if leafIndex < treeInfo.StartLeafIndex { return nil, 0, nil, nil, fmt.Errorf("leaf (`%d`) is not found in tree (`%d`)", leafIndex, treeInfo.TreeIndex) + } else if leafIndex-treeInfo.StartLeafIndex >= treeInfo.LeafCount { + return nil, 0, nil, nil, merkletypes.ErrUnfinalizedTree } height := uint8(0) diff --git a/merkle/types/errors.go b/merkle/types/errors.go new file mode 100644 index 0000000..59e0585 --- /dev/null +++ b/merkle/types/errors.go @@ -0,0 +1,5 @@ +package types + +import "errors" + +var ErrUnfinalizedTree = errors.New("unfinalized tree") diff --git a/node/broadcaster/account.go b/node/broadcaster/account.go index f0866f6..c1908d9 100644 --- a/node/broadcaster/account.go +++ b/node/broadcaster/account.go @@ -3,23 +3,115 @@ package broadcaster import ( "context" "fmt" - "strconv" + "math" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" + abci "github.com/cometbft/cometbft/abci/types" + "github.com/initia-labs/opinit-bots/keys" + btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" + "github.com/initia-labs/opinit-bots/node/rpcclient" + "github.com/initia-labs/opinit-bots/txutils" + + ctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/crypto/keyring" sdk "github.com/cosmos/cosmos-sdk/types" - grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" - authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" - - "github.com/initia-labs/opinit-bots/keys" + txtypes "github.com/cosmos/cosmos-sdk/types/tx" + "github.com/cosmos/cosmos-sdk/types/tx/signing" + authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" ) -var _ client.AccountRetriever = &Broadcaster{} +type BroadcasterAccount struct { + cfg btypes.BroadcasterConfig + txf tx.Factory + cdc codec.Codec + txConfig client.TxConfig + rpcClient *rpcclient.RPCClient + + keyName string + keyBase keyring.Keyring + keyringRecord *keyring.Record + address sdk.AccAddress + addressString string + + BuildTxWithMessages btypes.BuildTxWithMessagesFn + PendingTxToProcessedMsgs btypes.PendingTxToProcessedMsgsFn +} + +func NewBroadcasterAccount(cfg btypes.BroadcasterConfig, cdc codec.Codec, txConfig client.TxConfig, rpcClient *rpcclient.RPCClient, keyringConfig btypes.KeyringConfig) (*BroadcasterAccount, error) { + err := keyringConfig.Validate() + if err != nil { + return nil, err + } + + // setup keyring + keyBase, keyringRecord, err := cfg.GetKeyringRecord(cdc, &keyringConfig) + if err != nil { + return nil, err + } + + addr, err := keyringRecord.GetAddress() + if err != nil { + return nil, err + } -func (b *Broadcaster) loadAccount(ctx context.Context) error { - account, err := b.GetAccount(b.getClientCtx(ctx), b.keyAddress) + addrStr, err := keys.EncodeBech32AccAddr(addr, cfg.Bech32Prefix) + if err != nil { + return nil, err + } + b := &BroadcasterAccount{ + cfg: cfg, + + cdc: cdc, + txConfig: txConfig, + rpcClient: rpcClient, + + keyName: keyringRecord.Name, + keyBase: keyBase, + keyringRecord: keyringRecord, + address: addr, + addressString: addrStr, + + BuildTxWithMessages: keyringConfig.BuildTxWithMessages, + PendingTxToProcessedMsgs: keyringConfig.PendingTxToProcessedMsgs, + } + + if b.BuildTxWithMessages == nil { + b.BuildTxWithMessages = b.DefaultBuildTxWithMessages + } + + if b.PendingTxToProcessedMsgs == nil { + b.PendingTxToProcessedMsgs = b.DefaultPendingTxToProcessedMsgs + } + + b.txf = tx.Factory{}. + WithAccountRetriever(b). + WithChainID(cfg.ChainID). + WithTxConfig(txConfig). + WithGasAdjustment(cfg.GasAdjustment). + WithGasPrices(cfg.GasPrice). + WithKeybase(keyBase). + WithSignMode(signing.SignMode_SIGN_MODE_DIRECT) + + return b, nil +} + +func (b BroadcasterAccount) GetAddress() sdk.AccAddress { + return b.address +} + +func (b BroadcasterAccount) GetAddressString() string { + return b.addressString +} + +func (b BroadcasterAccount) Bech32Prefix() string { + return b.cfg.Bech32Prefix +} + +func (b *BroadcasterAccount) Load(ctx context.Context) error { + account, err := b.GetAccount(b.getClientCtx(ctx), b.address) if err != nil { return err } @@ -27,69 +119,162 @@ func (b *Broadcaster) loadAccount(ctx context.Context) error { return nil } -func (b Broadcaster) GetAddress() sdk.AccAddress { - return b.keyAddress +func (b BroadcasterAccount) GetLatestSequence(ctx context.Context) (uint64, error) { + account, err := b.GetAccount(b.getClientCtx(ctx), b.address) + if err != nil { + return 0, err + } + return account.GetAccountNumber(), nil } -func (b Broadcaster) GetAddressString() (string, error) { - return keys.EncodeBech32AccAddr(b.keyAddress, b.cfg.Bech32Prefix) +func (b BroadcasterAccount) getClientCtx(ctx context.Context) client.Context { + return client.Context{}.WithClient(b.rpcClient). + WithInterfaceRegistry(b.cdc.InterfaceRegistry()). + WithChainID(b.cfg.ChainID). + WithCodec(b.cdc). + WithFromAddress(b.address). + WithCmdContext(ctx) } -// GetAccount queries for an account given an address and a block height. An -// error is returned if the query or decoding fails. -func (b *Broadcaster) GetAccount(clientCtx client.Context, addr sdk.AccAddress) (client.Account, error) { - account, _, err := b.GetAccountWithHeight(clientCtx, addr) - return account, err +func (b BroadcasterAccount) Sequence() uint64 { + return b.txf.Sequence() } -// GetAccountWithHeight queries for an account given an address. Returns the -// height of the query with the account. An error is returned if the query -// or decoding fails. -func (b *Broadcaster) GetAccountWithHeight(clienCtx client.Context, addr sdk.AccAddress) (client.Account, int64, error) { - var header metadata.MD - address, err := keys.EncodeBech32AccAddr(addr, b.cfg.Bech32Prefix) +func (b *BroadcasterAccount) IncreaseSequence() { + b.txf = b.txf.WithSequence(b.txf.Sequence() + 1) +} + +func (b *BroadcasterAccount) UpdateSequence(sequence uint64) { + b.txf = b.txf.WithSequence(sequence) +} + +func (b BroadcasterAccount) BroadcastTxSync(ctx context.Context, txBytes []byte) (*ctypes.ResultBroadcastTx, error) { + return b.rpcClient.BroadcastTxSync(ctx, txBytes) +} + +// BuildSimTx creates an unsigned tx with an empty single signature and returns +// the encoded transaction or an error if the unsigned transaction cannot be built. +func (b BroadcasterAccount) buildSimTx(msgs ...sdk.Msg) ([]byte, error) { + txb, err := b.txf.BuildUnsignedTx(msgs...) if err != nil { - return nil, 0, err + return nil, err } - queryClient := authtypes.NewQueryClient(b.rpcClient) - res, err := queryClient.Account(clienCtx.CmdContext, &authtypes.QueryAccountRequest{Address: address}, grpc.Header(&header)) + pk, err := b.keyringRecord.GetPubKey() if err != nil { - return nil, 0, err + return nil, err } - blockHeight := header.Get(grpctypes.GRPCBlockHeightHeader) - if l := len(blockHeight); l != 1 { - return nil, 0, fmt.Errorf("unexpected '%s' header length; got %d, expected: %d", grpctypes.GRPCBlockHeightHeader, l, 1) + // Create an empty signature literal as the ante handler will populate with a + // sentinel pubkey. + sig := signing.SignatureV2{ + PubKey: pk, + Data: &signing.SingleSignatureData{ + SignMode: b.txf.SignMode(), + }, + Sequence: b.txf.Sequence(), } + if err := txb.SetSignatures(sig); err != nil { + return nil, err + } + + return txutils.EncodeTx(b.txConfig, txb.GetTx()) +} - nBlockHeight, err := strconv.Atoi(blockHeight[0]) +// CalculateGas simulates a tx to generate the appropriate gas settings before broadcasting a tx. +func (b BroadcasterAccount) CalculateGas(ctx context.Context, msgs ...sdk.Msg) (txtypes.SimulateResponse, uint64, error) { + txBytes, err := b.buildSimTx(msgs...) if err != nil { - return nil, 0, fmt.Errorf("failed to parse block height: %w", err) + return txtypes.SimulateResponse{}, 0, err } - var acc sdk.AccountI - if err := b.cdc.UnpackAny(res.Account, &acc); err != nil { - return nil, 0, err + simReq := txtypes.SimulateRequest{TxBytes: txBytes} + reqBytes, err := simReq.Marshal() + if err != nil { + return txtypes.SimulateResponse{}, 0, err } - return acc, int64(nBlockHeight), nil + simQuery := abci.RequestQuery{ + Path: "/cosmos.tx.v1beta1.Service/Simulate", + Data: reqBytes, + } + + res, err := b.rpcClient.QueryABCI(ctx, simQuery) + if err != nil { + return txtypes.SimulateResponse{}, 0, err + } + + var simRes txtypes.SimulateResponse + if err := simRes.Unmarshal(res.Value); err != nil { + return txtypes.SimulateResponse{}, 0, err + } + + gas, err := b.adjustEstimatedGas(simRes.GasInfo.GasUsed) + return simRes, gas, err } -// EnsureExists returns an error if no account exists for the given address else nil. -func (b *Broadcaster) EnsureExists(clientCtx client.Context, addr sdk.AccAddress) error { - if _, err := b.GetAccount(clientCtx, addr); err != nil { - return err +// AdjustEstimatedGas adjusts the estimated gas usage by multiplying it by the gas adjustment factor +// and return estimated gas is higher than max gas error. If the gas usage is zero, the adjusted gas +// is also zero. +func (b BroadcasterAccount) adjustEstimatedGas(gasUsed uint64) (uint64, error) { + if gasUsed == 0 { + return gasUsed, nil } - return nil + + gas := b.cfg.GasAdjustment * float64(gasUsed) + if math.IsInf(gas, 1) { + return 0, fmt.Errorf("infinite gas used") + } + + return uint64(gas), nil } -// GetAccountNumberSequence returns sequence and account number for the given address. -// It returns an error if the account couldn't be retrieved from the state. -func (b *Broadcaster) GetAccountNumberSequence(clientCtx client.Context, addr sdk.AccAddress) (uint64, uint64, error) { - acc, err := b.GetAccount(clientCtx, addr) +func (b BroadcasterAccount) SimulateAndSignTx(ctx context.Context, msgs ...sdk.Msg) (authsigning.Tx, error) { + _, adjusted, err := b.CalculateGas(ctx, msgs...) + if err != nil { + return nil, err + } + + b.txf = b.txf.WithGas(adjusted) + txb, err := b.txf.BuildUnsignedTx(msgs...) if err != nil { - return 0, 0, err + return nil, err } - return acc.GetAccountNumber(), acc.GetSequence(), nil + + if err = tx.Sign(ctx, b.txf, b.keyName, txb, false); err != nil { + return nil, err + } + return txb.GetTx(), nil +} + +// buildTxWithMessages creates a transaction from the given messages. +func (b *BroadcasterAccount) DefaultBuildTxWithMessages( + ctx context.Context, + msgs []sdk.Msg, +) ( + txBytes []byte, + txHash string, + err error, +) { + tx, err := b.SimulateAndSignTx(ctx, msgs...) + if err != nil { + return nil, "", err + } + + txBytes, err = txutils.EncodeTx(b.txConfig, tx) + if err != nil { + return nil, "", err + } + return txBytes, btypes.TxHash(txBytes), nil +} + +func (b *BroadcasterAccount) DefaultPendingTxToProcessedMsgs( + txBytes []byte, +) ([]sdk.Msg, error) { + tx, err := txutils.DecodeTx(b.txConfig, txBytes) + if err != nil { + return nil, err + } + + return tx.GetMsgs(), nil } diff --git a/node/broadcaster/account_retriever.go b/node/broadcaster/account_retriever.go new file mode 100644 index 0000000..01ccd86 --- /dev/null +++ b/node/broadcaster/account_retriever.go @@ -0,0 +1,70 @@ +package broadcaster + +import ( + "fmt" + "strconv" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + "github.com/cosmos/cosmos-sdk/client" + sdk "github.com/cosmos/cosmos-sdk/types" + grpctypes "github.com/cosmos/cosmos-sdk/types/grpc" + authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" +) + +var _ client.AccountRetriever = &BroadcasterAccount{} + +// GetAccount queries for an account given an address and a block height. An +// error is returned if the query or decoding fails. +func (b *BroadcasterAccount) GetAccount(clientCtx client.Context, addr sdk.AccAddress) (client.Account, error) { + account, _, err := b.GetAccountWithHeight(clientCtx, addr) + return account, err +} + +// GetAccountWithHeight queries for an account given an address. Returns the +// height of the query with the account. An error is returned if the query +// or decoding fails. +func (b *BroadcasterAccount) GetAccountWithHeight(clienCtx client.Context, addr sdk.AccAddress) (client.Account, int64, error) { + var header metadata.MD + queryClient := authtypes.NewQueryClient(b.rpcClient) + res, err := queryClient.Account(clienCtx.CmdContext, &authtypes.QueryAccountRequest{Address: b.addressString}, grpc.Header(&header)) + if err != nil { + return nil, 0, err + } + + blockHeight := header.Get(grpctypes.GRPCBlockHeightHeader) + if l := len(blockHeight); l != 1 { + return nil, 0, fmt.Errorf("unexpected '%s' header length; got %d, expected: %d", grpctypes.GRPCBlockHeightHeader, l, 1) + } + + nBlockHeight, err := strconv.Atoi(blockHeight[0]) + if err != nil { + return nil, 0, fmt.Errorf("failed to parse block height: %w", err) + } + + var acc sdk.AccountI + if err := b.cdc.UnpackAny(res.Account, &acc); err != nil { + return nil, 0, err + } + + return acc, int64(nBlockHeight), nil +} + +// EnsureExists returns an error if no account exists for the given address else nil. +func (b *BroadcasterAccount) EnsureExists(clientCtx client.Context, addr sdk.AccAddress) error { + if _, err := b.GetAccount(clientCtx, addr); err != nil { + return err + } + return nil +} + +// GetAccountNumberSequence returns sequence and account number for the given address. +// It returns an error if the account couldn't be retrieved from the state. +func (b *BroadcasterAccount) GetAccountNumberSequence(clientCtx client.Context, addr sdk.AccAddress) (uint64, uint64, error) { + acc, err := b.GetAccount(clientCtx, addr) + if err != nil { + return 0, 0, err + } + return acc.GetAccountNumber(), acc.GetSequence(), nil +} diff --git a/node/broadcaster/broadcaster.go b/node/broadcaster/broadcaster.go index b922120..e80b84d 100644 --- a/node/broadcaster/broadcaster.go +++ b/node/broadcaster/broadcaster.go @@ -2,6 +2,7 @@ package broadcaster import ( "context" + "fmt" "slices" "sync" "time" @@ -13,11 +14,7 @@ import ( "github.com/pkg/errors" "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/codec" - "github.com/cosmos/cosmos-sdk/crypto/keyring" - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/cosmos/cosmos-sdk/types/tx/signing" btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" "github.com/initia-labs/opinit-bots/node/rpcclient" @@ -32,12 +29,10 @@ type Broadcaster struct { logger *zap.Logger rpcClient *rpcclient.RPCClient - txf tx.Factory - txConfig client.TxConfig - - keyBase keyring.Keyring - keyName string - keyAddress sdk.AccAddress + txConfig client.TxConfig + accounts []*BroadcasterAccount + addressAccountMap map[string]int + accountMu *sync.Mutex txChannel chan btypes.ProcessedMsgs txChannelStopped chan struct{} @@ -65,8 +60,10 @@ func NewBroadcaster( db: db, rpcClient: rpcClient, - // txf will be initialized in prepareBroadcaster - txConfig: txConfig, + txConfig: txConfig, + accounts: make([]*BroadcasterAccount, 0), + addressAccountMap: make(map[string]int), + accountMu: &sync.Mutex{}, txChannel: make(chan btypes.ProcessedMsgs), txChannelStopped: make(chan struct{}), @@ -76,14 +73,6 @@ func NewBroadcaster( pendingProcessedMsgs: make([]btypes.ProcessedMsgs, 0), } - // fill cfg with default functions - if cfg.PendingTxToProcessedMsgs == nil { - cfg.WithPendingTxToProcessedMsgsFn(b.DefaultPendingTxToProcessedMsgs) - } - if cfg.BuildTxWithMessages == nil { - cfg.WithBuildTxWithMessagesFn(b.DefaultBuildTxWithMessages) - } - // validate broadcaster config if err := cfg.Validate(); err != nil { return nil, errors.Wrap(err, "failed to validate broadcaster config") @@ -99,64 +88,40 @@ func NewBroadcaster( return b, nil } -func (b *Broadcaster) Initialize(ctx context.Context, status *rpccoretypes.ResultStatus, keyringConfig *btypes.KeyringConfig) error { - err := keyringConfig.Validate() - if err != nil { - return err - } - - // setup keyring - keyBase, keyringRecord, err := b.cfg.GetKeyringRecord(b.cdc, keyringConfig) - if err != nil { - return err - } - b.keyBase = keyBase - addr, err := keyringRecord.GetAddress() - if err != nil { - return err +func (b *Broadcaster) Initialize(ctx context.Context, status *rpccoretypes.ResultStatus, keyringConfigs []btypes.KeyringConfig) error { + for _, keyringConfig := range keyringConfigs { + account, err := NewBroadcasterAccount(b.cfg, b.cdc, b.txConfig, b.rpcClient, keyringConfig) + if err != nil { + return err + } + err = account.Load(ctx) + if err != nil { + return err + } + b.accounts = append(b.accounts, account) + b.addressAccountMap[account.GetAddressString()] = len(b.accounts) - 1 } - b.keyAddress = addr - b.keyName = keyringRecord.Name // prepare broadcaster - return b.prepareBroadcaster(ctx, status.SyncInfo.LatestBlockTime) + err := b.prepareBroadcaster(ctx, status.SyncInfo.LatestBlockTime) + return errors.Wrap(err, "failed to prepare broadcaster") } -func (b Broadcaster) getClientCtx(ctx context.Context) client.Context { - return client.Context{}.WithClient(b.rpcClient). - WithInterfaceRegistry(b.cdc.InterfaceRegistry()). - WithChainID(b.cfg.ChainID). - WithCodec(b.cdc). - WithFromAddress(b.keyAddress). - WithCmdContext(ctx) +func (b Broadcaster) GetHeight() int64 { + return b.lastProcessedBlockHeight + 1 } -func (b Broadcaster) GetTxf() tx.Factory { - return b.txf +func (b *Broadcaster) SetSyncInfo(height int64) { + b.lastProcessedBlockHeight = height } func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time.Time) error { - b.txf = tx.Factory{}. - WithAccountRetriever(b). - WithChainID(b.cfg.ChainID). - WithTxConfig(b.txConfig). - WithGasAdjustment(b.cfg.GasAdjustment). - WithGasPrices(b.cfg.GasPrice). - WithKeybase(b.keyBase). - WithSignMode(signing.SignMode_SIGN_MODE_DIRECT) - - err := b.loadAccount(ctx) - if err != nil { - return err - } - dbBatchKVs := make([]types.RawKV, 0) loadedPendingTxs, err := b.loadPendingTxs() if err != nil { return err } - if len(loadedPendingTxs) > 0 { pendingTxTime := time.Unix(0, loadedPendingTxs[0].Timestamp) @@ -183,7 +148,11 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time // convert pending txs to pending msgs for i, txInfo := range loadedPendingTxs { - msgs, err := b.cfg.PendingTxToProcessedMsgs(txInfo.Tx) + account, err := b.AccountByAddress(txInfo.Sender) + if err != nil { + return err + } + msgs, err := account.PendingTxToProcessedMsgs(txInfo.Tx) if err != nil { return err } @@ -196,6 +165,7 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time } b.pendingProcessedMsgs = append(b.pendingProcessedMsgs, btypes.ProcessedMsgs{ + Sender: txInfo.Sender, Msgs: slices.Clone(msgs[i:end]), Timestamp: time.Now().UnixNano(), Save: true, @@ -239,14 +209,23 @@ func (b *Broadcaster) prepareBroadcaster(ctx context.Context, lastBlockTime time if err != nil { return err } - return nil } -func (b *Broadcaster) SetSyncInfo(height int64) { - b.lastProcessedBlockHeight = height +func (b Broadcaster) AccountByIndex(index int) (*BroadcasterAccount, error) { + b.accountMu.Lock() + defer b.accountMu.Unlock() + if len(b.accounts) <= index { + return nil, fmt.Errorf("broadcaster account not found; length: %d, index: %d", len(b.accounts), index) + } + return b.accounts[index], nil } -func (b Broadcaster) KeyName() string { - return b.keyName +func (b Broadcaster) AccountByAddress(address string) (*BroadcasterAccount, error) { + b.accountMu.Lock() + defer b.accountMu.Unlock() + if _, ok := b.addressAccountMap[address]; !ok { + return nil, fmt.Errorf("broadcaster account not found; address: %s", address) + } + return b.accounts[b.addressAccountMap[address]], nil } diff --git a/node/broadcaster/db.go b/node/broadcaster/db.go index 3c501de..9781b31 100644 --- a/node/broadcaster/db.go +++ b/node/broadcaster/db.go @@ -11,16 +11,16 @@ import ( // PendingTx // /////////////// -func (b Broadcaster) savePendingTx(sequence uint64, txInfo btypes.PendingTxInfo) error { - data, err := txInfo.Marshal() +func (b Broadcaster) savePendingTx(pendingTx btypes.PendingTxInfo) error { + data, err := pendingTx.Marshal() if err != nil { return err } - return b.db.Set(btypes.PrefixedPendingTx(sequence), data) + return b.db.Set(btypes.PrefixedPendingTx(types.MustInt64ToUint64(pendingTx.Timestamp)), data) } -func (b Broadcaster) deletePendingTx(sequence uint64) error { - return b.db.Delete(btypes.PrefixedPendingTx(sequence)) +func (b Broadcaster) deletePendingTx(pendingTx btypes.PendingTxInfo) error { + return b.db.Delete(btypes.PrefixedPendingTx(types.MustInt64ToUint64(pendingTx.Timestamp))) } func (b Broadcaster) loadPendingTxs() (txs []btypes.PendingTxInfo, err error) { @@ -49,14 +49,17 @@ func (b Broadcaster) PendingTxsToRawKV(txInfos []btypes.PendingTxInfo, delete bo var data []byte var err error - if !delete && txInfo.Save { + if !delete { + if !txInfo.Save { + continue + } data, err = txInfo.Marshal() if err != nil { return nil, err } } kvs = append(kvs, types.RawKV{ - Key: b.db.PrefixedKey(btypes.PrefixedPendingTx(txInfo.Sequence)), + Key: b.db.PrefixedKey(btypes.PrefixedPendingTx(types.MustInt64ToUint64(txInfo.Timestamp))), Value: data, }) } @@ -75,7 +78,11 @@ func (b Broadcaster) ProcessedMsgsToRawKV(ProcessedMsgs []btypes.ProcessedMsgs, var data []byte var err error - if !delete && processedMsgs.Save { + if !delete { + if !processedMsgs.Save { + continue + } + data, err = processedMsgs.MarshalInterfaceJSON(b.cdc) if err != nil { return nil, err diff --git a/node/broadcaster/process.go b/node/broadcaster/process.go index 16c10da..2bf5803 100644 --- a/node/broadcaster/process.go +++ b/node/broadcaster/process.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "math" + "strings" "time" "github.com/pkg/errors" @@ -16,8 +17,8 @@ import ( "github.com/initia-labs/opinit-bots/types" ) -func (b Broadcaster) GetHeight() int64 { - return b.lastProcessedBlockHeight + 1 +func IsTxNotFoundErr(err error, txHash string) bool { + return strings.Contains(err.Error(), fmt.Sprintf("tx (%s) not found", txHash)) } // CheckPendingTx query tx info to check if pending tx is processed. @@ -26,8 +27,9 @@ func (b *Broadcaster) CheckPendingTx(ctx context.Context, pendingTx btypes.Pendi if err != nil { return nil, time.Time{}, err } + res, txerr := b.rpcClient.QueryTx(ctx, txHash) - if txerr != nil { + if txerr != nil && IsTxNotFoundErr(txerr, pendingTx.TxHash) { // if the tx is not found, it means the tx is not processed yet // or the tx is not indexed by the node in rare cases. lastHeader, err := b.rpcClient.Header(ctx, nil) @@ -42,18 +44,24 @@ func (b *Broadcaster) CheckPendingTx(ctx context.Context, pendingTx btypes.Pendi return nil, time.Time{}, types.ErrTxNotFound } else { // timeout case - account, err := b.GetAccount(b.getClientCtx(ctx), b.keyAddress) + account, err := b.AccountByAddress(pendingTx.Sender) + if err != nil { + return nil, time.Time{}, err + } + accountSequence, err := account.GetLatestSequence(ctx) if err != nil { return nil, time.Time{}, err } // if sequence is larger than the sequence of the pending tx, // handle it as the tx has already been processed - if pendingTx.Sequence < account.GetSequence() { + if pendingTx.Sequence < accountSequence { return nil, time.Time{}, nil } panic(fmt.Errorf("something wrong, pending txs are not processed for a long time; current block time: %s, pending tx processing time: %s", time.Now().UTC().String(), pendingTxTime.UTC().String())) } + } else if txerr != nil { + return nil, time.Time{}, txerr } else if res.TxResult.Code != 0 { panic(fmt.Errorf("tx failed, tx hash: %s, code: %d, log: %s; you might need to check gas adjustment config or balance", pendingTx.TxHash, res.TxResult.Code, res.TxResult.Log)) } @@ -67,8 +75,8 @@ func (b *Broadcaster) CheckPendingTx(ctx context.Context, pendingTx btypes.Pendi // RemovePendingTx remove pending tx from local pending txs. // It is called when the pending tx is included in the block. -func (b *Broadcaster) RemovePendingTx(sequence uint64) error { - err := b.deletePendingTx(sequence) +func (b *Broadcaster) RemovePendingTx(pendingTx btypes.PendingTxInfo) error { + err := b.deletePendingTx(pendingTx) if err != nil { return err } @@ -87,13 +95,23 @@ func (b *Broadcaster) Start(ctx context.Context) error { return nil case data := <-b.txChannel: var err error + broadcasterAccount, err := b.AccountByAddress(data.Sender) + if err != nil { + return err + } for retry := 1; retry <= types.MaxRetryCount; retry++ { - err = b.handleProcessedMsgs(ctx, data) + err = b.handleProcessedMsgs(ctx, data, broadcasterAccount) if err == nil { break - } else if err = b.handleMsgError(err); err == nil { + } else if err = b.handleMsgError(err, broadcasterAccount); err == nil { + // if the error is handled, we can delete the processed msgs + err = b.deleteProcessedMsgs(data.Timestamp) + if err != nil { + return err + } break } else if !data.Save { + b.logger.Warn("discard msgs: failed to handle processed msgs", zap.String("error", err.Error())) // if the message does not need to be saved, we can skip retry err = nil break @@ -116,3 +134,15 @@ func (b Broadcaster) BroadcastPendingProcessedMsgs() { b.BroadcastMsgs(processedMsg) } } + +// BroadcastTxSync broadcasts transaction bytes to txBroadcastLooper. +func (b Broadcaster) BroadcastMsgs(msgs btypes.ProcessedMsgs) { + if b.txChannel == nil { + return + } + + select { + case <-b.txChannelStopped: + case b.txChannel <- msgs: + } +} diff --git a/node/broadcaster/status.go b/node/broadcaster/status.go new file mode 100644 index 0000000..4e4d473 --- /dev/null +++ b/node/broadcaster/status.go @@ -0,0 +1,23 @@ +package broadcaster + +import ( + btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" +) + +func (b Broadcaster) GetStatus() btypes.BroadcasterStatus { + return btypes.BroadcasterStatus{ + PendingTxs: b.LenLocalPendingTx(), + AccountsStatus: b.getAccountsStatus(), + } +} + +func (b Broadcaster) getAccountsStatus() []btypes.BroadcasterAccountStatus { + accountsStatus := make([]btypes.BroadcasterAccountStatus, 0, len(b.accounts)) + for _, account := range b.accounts { + accountsStatus = append(accountsStatus, btypes.BroadcasterAccountStatus{ + Address: account.addressString, + Sequence: account.Sequence(), + }) + } + return accountsStatus +} diff --git a/node/broadcaster/tx.go b/node/broadcaster/tx.go index e16f7dc..1b761c3 100644 --- a/node/broadcaster/tx.go +++ b/node/broadcaster/tx.go @@ -3,7 +3,6 @@ package broadcaster import ( "context" "fmt" - "math" "regexp" "strconv" "strings" @@ -12,18 +11,7 @@ import ( "github.com/pkg/errors" - sdkerrors "cosmossdk.io/errors" - abci "github.com/cometbft/cometbft/abci/types" - - "github.com/cosmos/cosmos-sdk/client/tx" - "github.com/cosmos/cosmos-sdk/crypto/keyring" - cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" - sdk "github.com/cosmos/cosmos-sdk/types" - txtypes "github.com/cosmos/cosmos-sdk/types/tx" - "github.com/cosmos/cosmos-sdk/types/tx/signing" - btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" - "github.com/initia-labs/opinit-bots/txutils" opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" ) @@ -37,7 +25,7 @@ var ignoringErrors = []error{ var accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)") var outputIndexRegex = regexp.MustCompile("expected ([0-9]+), got ([0-9]+): invalid output index") -func (b *Broadcaster) handleMsgError(err error) error { +func (b *Broadcaster) handleMsgError(err error, broadcasterAccount *BroadcasterAccount) error { if strs := accountSeqRegex.FindStringSubmatch(err.Error()); strs != nil { expected, parseErr := strconv.ParseUint(strs[1], 10, 64) if parseErr != nil { @@ -49,9 +37,8 @@ func (b *Broadcaster) handleMsgError(err error) error { } if expected > got { - b.txf = b.txf.WithSequence(expected) + broadcasterAccount.UpdateSequence(expected) } - return err } @@ -86,11 +73,12 @@ func (b *Broadcaster) handleMsgError(err error) error { // HandleProcessedMsgs handles processed messages by broadcasting them to the network. // It stores the transaction in the database and local memory and keep track of the successful broadcast. -func (b *Broadcaster) handleProcessedMsgs(ctx context.Context, data btypes.ProcessedMsgs) error { - sequence := b.txf.Sequence() - txBytes, txHash, err := b.cfg.BuildTxWithMessages(ctx, data.Msgs) +func (b *Broadcaster) handleProcessedMsgs(ctx context.Context, data btypes.ProcessedMsgs, broadcasterAccount *BroadcasterAccount) error { + sequence := broadcasterAccount.Sequence() + + txBytes, txHash, err := broadcasterAccount.BuildTxWithMessages(ctx, data.Msgs) if err != nil { - return sdkerrors.Wrapf(err, "simulation failed") + return errors.Wrapf(err, "simulation failed") } res, err := b.rpcClient.BroadcastTxSync(ctx, txBytes) @@ -104,16 +92,14 @@ func (b *Broadcaster) handleProcessedMsgs(ctx context.Context, data btypes.Proce b.logger.Debug("broadcast tx", zap.String("tx_hash", txHash), zap.Uint64("sequence", sequence)) - // @sh-cha: maybe we should use data.Save? - if data.Timestamp != 0 { - err = b.deleteProcessedMsgs(data.Timestamp) - if err != nil { - return err - } + err = b.deleteProcessedMsgs(data.Timestamp) + if err != nil { + return err } - b.txf = b.txf.WithSequence(b.txf.Sequence() + 1) + broadcasterAccount.IncreaseSequence() pendingTx := btypes.PendingTxInfo{ + Sender: data.Sender, ProcessedHeight: b.GetHeight(), Sequence: sequence, Tx: txBytes, @@ -123,113 +109,19 @@ func (b *Broadcaster) handleProcessedMsgs(ctx context.Context, data btypes.Proce Save: data.Save, } - // save pending transaction to the database for handling after restart - err = b.savePendingTx(sequence, pendingTx) - if err != nil { - return err + if pendingTx.Save { + // save pending transaction to the database for handling after restart + err = b.savePendingTx(pendingTx) + if err != nil { + return err + } } // save pending tx to local memory to handle this tx in this session b.enqueueLocalPendingTx(pendingTx) - return nil } -// BroadcastTxSync broadcasts transaction bytes to txBroadcastLooper. -func (b Broadcaster) BroadcastMsgs(msgs btypes.ProcessedMsgs) { - if b.txChannel == nil { - return - } - - select { - case <-b.txChannelStopped: - case b.txChannel <- msgs: - } -} - -// CalculateGas simulates a tx to generate the appropriate gas settings before broadcasting a tx. -func (b Broadcaster) CalculateGas(ctx context.Context, txf tx.Factory, msgs ...sdk.Msg) (txtypes.SimulateResponse, uint64, error) { - keyInfo, err := b.keyBase.Key(b.keyName) - if err != nil { - return txtypes.SimulateResponse{}, 0, err - } - - txBytes, err := b.buildSimTx(keyInfo, txf, msgs...) - if err != nil { - return txtypes.SimulateResponse{}, 0, err - } - - simReq := txtypes.SimulateRequest{TxBytes: txBytes} - reqBytes, err := simReq.Marshal() - if err != nil { - return txtypes.SimulateResponse{}, 0, err - } - - simQuery := abci.RequestQuery{ - Path: "/cosmos.tx.v1beta1.Service/Simulate", - Data: reqBytes, - } - - res, err := b.rpcClient.QueryABCI(ctx, simQuery) - if err != nil { - return txtypes.SimulateResponse{}, 0, err - } - - var simRes txtypes.SimulateResponse - if err := simRes.Unmarshal(res.Value); err != nil { - return txtypes.SimulateResponse{}, 0, err - } - - gas, err := b.adjustEstimatedGas(simRes.GasInfo.GasUsed) - return simRes, gas, err -} - -// AdjustEstimatedGas adjusts the estimated gas usage by multiplying it by the gas adjustment factor -// and return estimated gas is higher than max gas error. If the gas usage is zero, the adjusted gas -// is also zero. -func (b Broadcaster) adjustEstimatedGas(gasUsed uint64) (uint64, error) { - if gasUsed == 0 { - return gasUsed, nil - } - - gas := b.cfg.GasAdjustment * float64(gasUsed) - if math.IsInf(gas, 1) { - return 0, fmt.Errorf("infinite gas used") - } - - return uint64(gas), nil -} - -// BuildSimTx creates an unsigned tx with an empty single signature and returns -// the encoded transaction or an error if the unsigned transaction cannot be built. -func (b Broadcaster) buildSimTx(info *keyring.Record, txf tx.Factory, msgs ...sdk.Msg) ([]byte, error) { - txb, err := txf.BuildUnsignedTx(msgs...) - if err != nil { - return nil, err - } - - var pk cryptotypes.PubKey - pk, err = info.GetPubKey() - if err != nil { - return nil, err - } - - // Create an empty signature literal as the ante handler will populate with a - // sentinel pubkey. - sig := signing.SignatureV2{ - PubKey: pk, - Data: &signing.SingleSignatureData{ - SignMode: txf.SignMode(), - }, - Sequence: txf.Sequence(), - } - if err := txb.SetSignatures(sig); err != nil { - return nil, err - } - - return txutils.EncodeTx(b.txConfig, txb.GetTx()) -} - func (b *Broadcaster) enqueueLocalPendingTx(tx btypes.PendingTxInfo) { b.pendingTxMu.Lock() defer b.pendingTxMu.Unlock() @@ -260,47 +152,3 @@ func (b *Broadcaster) dequeueLocalPendingTx() { b.pendingTxs = b.pendingTxs[1:] } - -// buildTxWithMessages creates a transaction from the given messages. -func (b *Broadcaster) DefaultBuildTxWithMessages( - ctx context.Context, - msgs []sdk.Msg, -) ( - txBytes []byte, - txHash string, - err error, -) { - txf := b.txf - _, adjusted, err := b.CalculateGas(ctx, txf, msgs...) - if err != nil { - return nil, "", err - } - - txf = txf.WithGas(adjusted) - txb, err := txf.BuildUnsignedTx(msgs...) - if err != nil { - return nil, "", err - } - - if err = tx.Sign(ctx, txf, b.keyName, txb, false); err != nil { - return nil, "", err - } - - tx := txb.GetTx() - txBytes, err = txutils.EncodeTx(b.txConfig, tx) - if err != nil { - return nil, "", err - } - return txBytes, btypes.TxHash(txBytes), nil -} - -func (b *Broadcaster) DefaultPendingTxToProcessedMsgs( - txBytes []byte, -) ([]sdk.Msg, error) { - tx, err := txutils.DecodeTx(b.txConfig, txBytes) - if err != nil { - return nil, err - } - - return tx.GetMsgs(), nil -} diff --git a/node/broadcaster/types/config.go b/node/broadcaster/types/config.go index 45ec3e9..899810e 100644 --- a/node/broadcaster/types/config.go +++ b/node/broadcaster/types/config.go @@ -31,24 +31,10 @@ type BroadcasterConfig struct { // Bech32Prefix is the Bech32 prefix. Bech32Prefix string - // BuildTxWithMessages is the function to build a transaction with messages. - BuildTxWithMessages BuildTxWithMessagesFn - - // PendingTxToProcessedMsgs is the function to convert pending tx to processed messages. - PendingTxToProcessedMsgs PendingTxToProcessedMsgsFn - // HomePath is the path to the keyring. HomePath string } -func (bc *BroadcasterConfig) WithPendingTxToProcessedMsgsFn(fn PendingTxToProcessedMsgsFn) { - bc.PendingTxToProcessedMsgs = fn -} - -func (bc *BroadcasterConfig) WithBuildTxWithMessagesFn(fn BuildTxWithMessagesFn) { - bc.BuildTxWithMessages = fn -} - func (bc BroadcasterConfig) Validate() error { if bc.ChainID == "" { return fmt.Errorf("chain id is empty") @@ -63,14 +49,6 @@ func (bc BroadcasterConfig) Validate() error { return fmt.Errorf("bech32 prefix is empty") } - if bc.BuildTxWithMessages == nil { - return fmt.Errorf("build tx with messages is nil") - } - - if bc.PendingTxToProcessedMsgs == nil { - return fmt.Errorf("pending tx to processed msgs is nil") - } - if bc.GasAdjustment == 0 { return fmt.Errorf("gas adjustment is zero") } @@ -110,6 +88,12 @@ type KeyringConfig struct { // Address of key in keyring Address string `json:"address"` + + // BuildTxWithMessages is the function to build a transaction with messages. + BuildTxWithMessages BuildTxWithMessagesFn + + // PendingTxToProcessedMsgs is the function to convert pending tx to processed messages. + PendingTxToProcessedMsgs PendingTxToProcessedMsgsFn } func (kc KeyringConfig) GetKeyRecord(keyBase keyring.Keyring, bech32Prefix string) (*keyring.Record, error) { @@ -136,10 +120,17 @@ func (kc KeyringConfig) GetKeyRecord(keyBase keyring.Keyring, bech32Prefix strin return nil, fmt.Errorf("keyring config is invalid") } +func (kc *KeyringConfig) WithPendingTxToProcessedMsgsFn(fn PendingTxToProcessedMsgsFn) { + kc.PendingTxToProcessedMsgs = fn +} + +func (kc *KeyringConfig) WithBuildTxWithMessagesFn(fn BuildTxWithMessagesFn) { + kc.BuildTxWithMessages = fn +} + func (kc KeyringConfig) Validate() error { if kc.Name == "" && kc.Address == "" { return fmt.Errorf("keyring config is invalid") } - return nil } diff --git a/node/broadcaster/types/db.go b/node/broadcaster/types/db.go index e6805cc..ddeac8b 100644 --- a/node/broadcaster/types/db.go +++ b/node/broadcaster/types/db.go @@ -11,6 +11,7 @@ import ( ) type PendingTxInfo struct { + Sender string `json:"sender"` ProcessedHeight int64 `json:"height"` Sequence uint64 `json:"sequence"` Tx []byte `json:"tx"` @@ -34,10 +35,11 @@ func (p *PendingTxInfo) Unmarshal(data []byte) error { func (p PendingTxInfo) String() string { tsStr := time.Unix(0, p.Timestamp).UTC().String() - return fmt.Sprintf("Pending tx: %s, msgs: %s, sequence: %d at height: %d, %s", p.TxHash, strings.Join(p.MsgTypes, ","), p.Sequence, p.ProcessedHeight, tsStr) + return fmt.Sprintf("Pending tx: %s, sender: %s, msgs: %s, sequence: %d at height: %d, %s", p.TxHash, p.Sender, strings.Join(p.MsgTypes, ","), p.Sequence, p.ProcessedHeight, tsStr) } type ProcessedMsgs struct { + Sender string `json:"sender"` Msgs []sdk.Msg `json:"msgs"` Timestamp int64 `json:"timestamp"` @@ -49,6 +51,7 @@ type ProcessedMsgs struct { // processedMsgsJSON is a helper struct to JSON encode ProcessedMsgs type processedMsgsJSON struct { + Sender string `json:"sender"` Msgs []string `json:"msgs"` Timestamp int64 `json:"timestamp"` Save bool `json:"save"` @@ -56,6 +59,7 @@ type processedMsgsJSON struct { func (p ProcessedMsgs) MarshalInterfaceJSON(cdc codec.Codec) ([]byte, error) { pms := processedMsgsJSON{ + Sender: p.Sender, Msgs: make([]string, len(p.Msgs)), Timestamp: p.Timestamp, Save: p.Save, @@ -79,6 +83,7 @@ func (p *ProcessedMsgs) UnmarshalInterfaceJSON(cdc codec.Codec, data []byte) err return err } + p.Sender = pms.Sender p.Timestamp = pms.Timestamp p.Save = pms.Save @@ -94,7 +99,7 @@ func (p *ProcessedMsgs) UnmarshalInterfaceJSON(cdc codec.Codec, data []byte) err func (p ProcessedMsgs) String() string { tsStr := time.Unix(0, p.Timestamp).UTC().String() - return fmt.Sprintf("Pending msgs: %s at %s", strings.Join(p.GetMsgStrings(), ","), tsStr) + return fmt.Sprintf("Pending msgs: sender: %s, %s at %s", p.Sender, strings.Join(p.GetMsgTypes(), ","), tsStr) } func (p ProcessedMsgs) GetMsgStrings() []string { diff --git a/node/broadcaster/types/keys.go b/node/broadcaster/types/keys.go index c004e60..7f77ba3 100644 --- a/node/broadcaster/types/keys.go +++ b/node/broadcaster/types/keys.go @@ -10,8 +10,8 @@ var ( ProcessedMsgsKey = []byte("processed_msgs") ) -func PrefixedPendingTx(sequence uint64) []byte { - return append(append(PendingTxsKey, dbtypes.Splitter), dbtypes.FromUint64Key(sequence)...) +func PrefixedPendingTx(timestamp uint64) []byte { + return append(append(PendingTxsKey, dbtypes.Splitter), dbtypes.FromUint64Key(timestamp)...) } func PrefixedProcessedMsgs(timestamp uint64) []byte { diff --git a/node/broadcaster/types/status.go b/node/broadcaster/types/status.go new file mode 100644 index 0000000..aed1fca --- /dev/null +++ b/node/broadcaster/types/status.go @@ -0,0 +1,11 @@ +package types + +type BroadcasterStatus struct { + PendingTxs int `json:"pending_txs"` + AccountsStatus []BroadcasterAccountStatus `json:"accounts_status"` +} + +type BroadcasterAccountStatus struct { + Address string `json:"address"` + Sequence uint64 `json:"sequence"` +} diff --git a/node/node.go b/node/node.go index 8b877d0..9d5a67c 100644 --- a/node/node.go +++ b/node/node.go @@ -86,7 +86,7 @@ func NewNode(cfg nodetypes.NodeConfig, db types.DB, logger *zap.Logger, cdc code // StartHeight is the height to start processing. // If it is 0, the latest height is used. // If the latest height exists in the database, this is ignored. -func (n *Node) Initialize(ctx context.Context, processedHeight int64, keyringConfig *btypes.KeyringConfig) (err error) { +func (n *Node) Initialize(ctx context.Context, processedHeight int64, keyringConfig []btypes.KeyringConfig) (err error) { // check if node is catching up status, err := n.rpcClient.Status(ctx) if err != nil { @@ -195,21 +195,6 @@ func (n Node) MustGetBroadcaster() *broadcaster.Broadcaster { return n.broadcaster } -func (n Node) GetStatus() nodetypes.Status { - s := nodetypes.Status{} - if n.cfg.ProcessType != nodetypes.PROCESS_TYPE_ONLY_BROADCAST { - s.LastBlockHeight = n.GetHeight() - } - - if n.broadcaster != nil { - s.Broadcaster = &nodetypes.BroadcasterStatus{ - PendingTxs: n.broadcaster.LenLocalPendingTx(), - Sequence: n.broadcaster.GetTxf().Sequence(), - } - } - return s -} - func (n Node) GetRPCClient() *rpcclient.RPCClient { return n.rpcClient } diff --git a/node/process.go b/node/process.go index 9613d46..1d33af2 100644 --- a/node/process.go +++ b/node/process.go @@ -246,7 +246,7 @@ func (n *Node) txChecker(ctx context.Context, enableEventHandler bool) error { // tx not found continue } else if err != nil { - return err + return errors.Wrap(err, "failed to check pending tx") } else if res != nil { // tx found height = res.Height @@ -269,9 +269,9 @@ func (n *Node) txChecker(ctx context.Context, enableEventHandler bool) error { } } - err = n.broadcaster.RemovePendingTx(pendingTx.Sequence) + err = n.broadcaster.RemovePendingTx(pendingTx) if err != nil { - return err + return errors.Wrap(err, "failed to remove pending tx") } n.logger.Info("tx inserted", zap.Int64("height", height), diff --git a/node/status.go b/node/status.go new file mode 100644 index 0000000..f145128 --- /dev/null +++ b/node/status.go @@ -0,0 +1,18 @@ +package node + +import ( + nodetypes "github.com/initia-labs/opinit-bots/node/types" +) + +func (n Node) GetStatus() nodetypes.Status { + s := nodetypes.Status{} + if n.cfg.ProcessType != nodetypes.PROCESS_TYPE_ONLY_BROADCAST { + s.LastBlockHeight = n.GetHeight() + } + + if n.broadcaster != nil { + broadcasterStatus := n.broadcaster.GetStatus() + s.Broadcaster = &broadcasterStatus + } + return s +} diff --git a/node/types/status.go b/node/types/status.go index 41b1425..01b0dbd 100644 --- a/node/types/status.go +++ b/node/types/status.go @@ -1,11 +1,10 @@ package types -type BroadcasterStatus struct { - PendingTxs int `json:"pending_txs"` - Sequence uint64 `json:"sequence"` -} +import ( + btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" +) type Status struct { - LastBlockHeight int64 `json:"last_block_height,omitempty"` - Broadcaster *BroadcasterStatus `json:"broadcaster,omitempty"` + LastBlockHeight int64 `json:"last_block_height,omitempty"` + Broadcaster *btypes.BroadcasterStatus `json:"broadcaster,omitempty"` } diff --git a/provider/child/child.go b/provider/child/child.go index 3002c61..8e99bb2 100644 --- a/provider/child/child.go +++ b/provider/child/child.go @@ -1,6 +1,7 @@ package child import ( + "bytes" "context" "errors" @@ -10,6 +11,7 @@ import ( "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/x/auth" + "github.com/cosmos/cosmos-sdk/x/authz" "github.com/initia-labs/OPinit/x/opchild" opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" @@ -40,7 +42,11 @@ type BaseChild struct { opchildQueryClient opchildtypes.QueryClient processedMsgs []btypes.ProcessedMsgs - msgQueue []sdk.Msg + msgQueue map[string][]sdk.Msg + + baseAccountIndex int + oracleAccountIndex int + oracleAccountGranter string } func NewBaseChildV1( @@ -75,7 +81,10 @@ func NewBaseChildV1( opchildQueryClient: opchildtypes.NewQueryClient(node.GetRPCClient()), processedMsgs: make([]btypes.ProcessedMsgs, 0), - msgQueue: make([]sdk.Msg, 0), + msgQueue: make(map[string][]sdk.Msg), + + baseAccountIndex: -1, + oracleAccountIndex: -1, } return ch } @@ -86,12 +95,22 @@ func GetCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) { return keys.CreateCodec([]keys.RegisterInterfaces{ auth.AppModuleBasic{}.RegisterInterfaces, + authz.RegisterInterfaces, opchild.AppModuleBasic{}.RegisterInterfaces, }) } -func (b *BaseChild) Initialize(ctx context.Context, processedHeight int64, startOutputIndex uint64, bridgeInfo ophosttypes.QueryBridgeResponse, keyringConfig *btypes.KeyringConfig) (uint64, error) { - err := b.node.Initialize(ctx, processedHeight, keyringConfig) +func (b *BaseChild) Initialize( + ctx context.Context, + processedHeight int64, + startOutputIndex uint64, + bridgeInfo ophosttypes.QueryBridgeResponse, + keyringConfig *btypes.KeyringConfig, + oracleKeyringConfig *btypes.KeyringConfig, +) (uint64, error) { + b.SetBridgeInfo(bridgeInfo) + + err := b.node.Initialize(ctx, processedHeight, b.keyringConfigs(keyringConfig, oracleKeyringConfig)) if err != nil { return 0, err } @@ -126,7 +145,41 @@ func (b *BaseChild) Initialize(ctx context.Context, processedHeight int64, start return false, nil } } - b.SetBridgeInfo(bridgeInfo) + + if b.OracleEnabled() && oracleKeyringConfig != nil { + executors, err := b.QueryExecutors(ctx) + if err != nil { + return 0, err + } + + oracleAddr, err := b.OracleAccountAddressString() + if err != nil { + return 0, err + } + + grants, err := b.QueryGranteeGrants(ctx, oracleAddr) + if err != nil { + return 0, err + } + GRANTLOOP: + for _, grant := range grants { + if grant.Authorization.TypeUrl != "/cosmos.authz.v1beta1.GenericAuthorization" || + !bytes.Contains(grant.Authorization.Value, []byte(types.MsgUpdateOracleTypeUrl)) { + continue + } + + for _, executor := range executors { + if grant.Granter == executor && grant.Grantee == oracleAddr { + b.oracleAccountGranter = grant.Granter + break GRANTLOOP + } + } + } + + if b.oracleAccountGranter == "" { + return 0, errors.New("oracle account has no grant") + } + } return l2Sequence, nil } @@ -193,16 +246,18 @@ func (b BaseChild) DB() types.DB { /// MsgQueue -func (b BaseChild) GetMsgQueue() []sdk.Msg { +func (b BaseChild) GetMsgQueue() map[string][]sdk.Msg { return b.msgQueue } -func (b *BaseChild) AppendMsgQueue(msg sdk.Msg) { - b.msgQueue = append(b.msgQueue, msg) +func (b *BaseChild) AppendMsgQueue(msg sdk.Msg, sender string) { + b.msgQueue[sender] = append(b.msgQueue[sender], msg) } func (b *BaseChild) EmptyMsgQueue() { - b.msgQueue = b.msgQueue[:0] + for sender := range b.msgQueue { + b.msgQueue[sender] = b.msgQueue[sender][:0] + } } /// ProcessedMsgs @@ -245,3 +300,64 @@ func (b BaseChild) GetWorkingTreeLeafCount() (uint64, error) { } return b.mk.GetWorkingTreeLeafCount() } + +func (b *BaseChild) keyringConfigs(baseConfig *btypes.KeyringConfig, oracleConfig *btypes.KeyringConfig) []btypes.KeyringConfig { + var configs []btypes.KeyringConfig + if baseConfig != nil { + configs = append(configs, *baseConfig) + b.baseAccountIndex = len(configs) - 1 + } + if oracleConfig != nil { + configs = append(configs, *oracleConfig) + b.oracleAccountIndex = len(configs) - 1 + } + return configs +} + +func (b BaseChild) BaseAccountAddressString() (string, error) { + broadcaster, err := b.node.GetBroadcaster() + if err != nil { + return "", err + } + if b.baseAccountIndex == -1 { + return "", types.ErrKeyNotSet + } + account, err := broadcaster.AccountByIndex(b.baseAccountIndex) + if err != nil { + return "", err + } + sender := account.GetAddressString() + return sender, nil +} + +func (b BaseChild) OracleAccountAddressString() (string, error) { + broadcaster, err := b.node.GetBroadcaster() + if err != nil { + return "", err + } + if b.oracleAccountIndex == -1 { + return "", types.ErrKeyNotSet + } + account, err := broadcaster.AccountByIndex(b.oracleAccountIndex) + if err != nil { + return "", err + } + sender := account.GetAddressString() + return sender, nil +} + +func (b BaseChild) OracleAccountAddress() (sdk.AccAddress, error) { + broadcaster, err := b.node.GetBroadcaster() + if err != nil { + return nil, err + } + if b.oracleAccountIndex == -1 { + return nil, types.ErrKeyNotSet + } + account, err := broadcaster.AccountByIndex(b.oracleAccountIndex) + if err != nil { + return nil, err + } + sender := account.GetAddress() + return sender, nil +} diff --git a/provider/child/msgs.go b/provider/child/msgs.go index 85f9ad1..54201de 100644 --- a/provider/child/msgs.go +++ b/provider/child/msgs.go @@ -7,6 +7,7 @@ import ( "github.com/initia-labs/opinit-bots/types" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/x/authz" ) func (b BaseChild) GetMsgFinalizeTokenDeposit( @@ -17,13 +18,13 @@ func (b BaseChild) GetMsgFinalizeTokenDeposit( blockHeight int64, l1Denom string, data []byte, -) (sdk.Msg, error) { - sender, err := b.GetAddressStr() +) (sdk.Msg, string, error) { + sender, err := b.BaseAccountAddressString() if err != nil { if errors.Is(err, types.ErrKeyNotSet) { - return nil, nil + return nil, "", nil } - return nil, err + return nil, "", err } msg := opchildtypes.NewMsgFinalizeTokenDeposit( @@ -38,31 +39,41 @@ func (b BaseChild) GetMsgFinalizeTokenDeposit( ) err = msg.Validate(b.node.AccountCodec()) if err != nil { - return nil, err + return nil, "", err } - return msg, nil + return msg, sender, nil } func (b BaseChild) GetMsgUpdateOracle( height int64, data []byte, -) (sdk.Msg, error) { - sender, err := b.GetAddressStr() +) (sdk.Msg, string, error) { + oracleAddress, err := b.OracleAccountAddress() if err != nil { if errors.Is(err, types.ErrKeyNotSet) { - return nil, nil + return nil, "", nil } - return nil, err + return nil, "", err + } + oracleAddressString, err := b.OracleAccountAddressString() + if err != nil { + return nil, "", err + } + + if b.oracleAccountGranter == "" { + return nil, "", errors.New("oracle account granter is not set") } msg := opchildtypes.NewMsgUpdateOracle( - sender, + b.oracleAccountGranter, types.MustInt64ToUint64(height), data, ) err = msg.Validate(b.node.AccountCodec()) if err != nil { - return nil, err + return nil, "", err } - return msg, nil + + authzMsgExec := authz.NewMsgExec(oracleAddress, []sdk.Msg{msg}) + return &authzMsgExec, oracleAddressString, nil } diff --git a/provider/child/query.go b/provider/child/query.go index e829de7..cdeea9e 100644 --- a/provider/child/query.go +++ b/provider/child/query.go @@ -2,30 +2,17 @@ package child import ( "context" + "time" opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" - sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/query" + "github.com/cosmos/cosmos-sdk/x/authz" "github.com/initia-labs/opinit-bots/node/rpcclient" + "github.com/initia-labs/opinit-bots/types" ) -func (b BaseChild) GetAddress() (sdk.AccAddress, error) { - broadcaster, err := b.node.GetBroadcaster() - if err != nil { - return nil, err - } - return broadcaster.GetAddress(), nil -} - -func (b BaseChild) GetAddressStr() (string, error) { - broadcaster, err := b.node.GetBroadcaster() - if err != nil { - return "", err - } - return broadcaster.GetAddressString() -} - func (b BaseChild) QueryBridgeInfo(ctx context.Context) (opchildtypes.BridgeInfo, error) { req := &opchildtypes.QueryBridgeInfoRequest{} ctx, cancel := rpcclient.GetQueryContext(ctx, 0) @@ -61,3 +48,71 @@ func (b BaseChild) QueryNextL2Sequence(ctx context.Context, height int64) (uint6 } return res.NextL2Sequence, nil } + +func (b BaseChild) QueryExecutors(ctx context.Context) ([]string, error) { + req := &opchildtypes.QueryParamsRequest{} + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) + defer cancel() + + res, err := b.opchildQueryClient.Params(ctx, req) + if err != nil { + return nil, err + } + return res.Params.BridgeExecutors, nil +} + +func (b BaseChild) QueryGrantsRequest(ctx context.Context, granter, grantee, msgTypeUrl string) (*authz.QueryGrantsResponse, error) { + req := &authz.QueryGrantsRequest{ + Granter: granter, + Grantee: grantee, + MsgTypeUrl: msgTypeUrl, + } + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) + defer cancel() + + authzClient := authz.NewQueryClient(b.node.GetRPCClient()) + res, err := authzClient.Grants(ctx, req) + if err != nil { + return nil, err + } + return res, nil +} + +func (b BaseChild) QueryGranteeGrants(ctx context.Context, grantee string) ([]*authz.GrantAuthorization, error) { + req := &authz.QueryGranteeGrantsRequest{ + Grantee: grantee, + Pagination: &query.PageRequest{ + Limit: 100, + }, + } + ctx, cancel := rpcclient.GetQueryContext(ctx, 0) + defer cancel() + + authzClient := authz.NewQueryClient(b.node.GetRPCClient()) + + ticker := time.NewTicker(types.PollingInterval(ctx)) + defer ticker.Stop() + + result := make([]*authz.GrantAuthorization, 0) + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + } + + res, err := authzClient.GranteeGrants(ctx, req) + if err != nil { + return nil, err + } + + result = append(result, res.Grants...) + if res.Pagination.NextKey == nil { + break + } + req.Pagination.Key = res.Pagination.NextKey + } + + return result, nil +} diff --git a/provider/host/host.go b/provider/host/host.go index 4bf2277..e06614f 100644 --- a/provider/host/host.go +++ b/provider/host/host.go @@ -2,6 +2,7 @@ package host import ( "context" + "errors" "go.uber.org/zap" @@ -34,7 +35,7 @@ type BaseHost struct { ophostQueryClient ophosttypes.QueryClient processedMsgs []btypes.ProcessedMsgs - msgQueue []sdk.Msg + msgQueue map[string][]sdk.Msg } func NewBaseHostV1(cfg nodetypes.NodeConfig, @@ -62,7 +63,7 @@ func NewBaseHostV1(cfg nodetypes.NodeConfig, ophostQueryClient: ophosttypes.NewQueryClient(node.GetRPCClient()), processedMsgs: make([]btypes.ProcessedMsgs, 0), - msgQueue: make([]sdk.Msg, 0), + msgQueue: make(map[string][]sdk.Msg), } return h @@ -79,7 +80,7 @@ func GetCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) { } func (b *BaseHost) Initialize(ctx context.Context, processedHeight int64, bridgeInfo ophosttypes.QueryBridgeResponse, keyringConfig *btypes.KeyringConfig) error { - err := b.node.Initialize(ctx, processedHeight, keyringConfig) + err := b.node.Initialize(ctx, processedHeight, b.keyringConfigs(keyringConfig)) if err != nil { return err } @@ -154,16 +155,18 @@ func (b BaseHost) DB() types.DB { /// MsgQueue -func (b BaseHost) GetMsgQueue() []sdk.Msg { +func (b BaseHost) GetMsgQueue() map[string][]sdk.Msg { return b.msgQueue } -func (b *BaseHost) AppendMsgQueue(msg sdk.Msg) { - b.msgQueue = append(b.msgQueue, msg) +func (b *BaseHost) AppendMsgQueue(msg sdk.Msg, sender string) { + b.msgQueue[sender] = append(b.msgQueue[sender], msg) } func (b *BaseHost) EmptyMsgQueue() { - b.msgQueue = b.msgQueue[:0] + for sender := range b.msgQueue { + b.msgQueue[sender] = b.msgQueue[sender][:0] + } } /// ProcessedMsgs @@ -179,3 +182,27 @@ func (b *BaseHost) AppendProcessedMsgs(msgs btypes.ProcessedMsgs) { func (b *BaseHost) EmptyProcessedMsgs() { b.processedMsgs = b.processedMsgs[:0] } + +func (b BaseHost) BaseAccountAddressString() (string, error) { + broadcaster, err := b.node.GetBroadcaster() + if err != nil { + if errors.Is(err, types.ErrKeyNotSet) { + return "", nil + } + return "", err + } + account, err := broadcaster.AccountByIndex(0) + if err != nil { + return "", err + } + sender := account.GetAddressString() + return sender, nil +} + +func (b BaseHost) keyringConfigs(baseConfig *btypes.KeyringConfig) []btypes.KeyringConfig { + var configs []btypes.KeyringConfig + if baseConfig != nil { + configs = append(configs, *baseConfig) + } + return configs +} diff --git a/provider/host/msgs.go b/provider/host/msgs.go index 8604a93..62ff290 100644 --- a/provider/host/msgs.go +++ b/provider/host/msgs.go @@ -14,13 +14,13 @@ func (b BaseHost) GetMsgProposeOutput( outputIndex uint64, l2BlockNumber int64, outputRoot []byte, -) (sdk.Msg, error) { - sender, err := b.GetAddressStr() +) (sdk.Msg, string, error) { + sender, err := b.BaseAccountAddressString() if err != nil { if errors.Is(err, types.ErrKeyNotSet) { - return nil, nil + return nil, "", nil } - return nil, err + return nil, "", err } msg := ophosttypes.NewMsgProposeOutput( @@ -32,18 +32,18 @@ func (b BaseHost) GetMsgProposeOutput( ) err = msg.Validate(b.node.AccountCodec()) if err != nil { - return nil, err + return nil, "", err } - return msg, nil + return msg, sender, nil } -func (b BaseHost) CreateBatchMsg(batchBytes []byte) (sdk.Msg, error) { - submitter, err := b.GetAddressStr() +func (b BaseHost) CreateBatchMsg(batchBytes []byte) (sdk.Msg, string, error) { + submitter, err := b.BaseAccountAddressString() if err != nil { if errors.Is(err, types.ErrKeyNotSet) { - return nil, nil + return nil, "", nil } - return nil, err + return nil, "", err } msg := ophosttypes.NewMsgRecordBatch( @@ -53,7 +53,7 @@ func (b BaseHost) CreateBatchMsg(batchBytes []byte) (sdk.Msg, error) { ) err = msg.Validate(b.node.AccountCodec()) if err != nil { - return nil, err + return nil, "", err } - return msg, nil + return msg, submitter, nil } diff --git a/provider/host/query.go b/provider/host/query.go index 39bb83b..1391eae 100644 --- a/provider/host/query.go +++ b/provider/host/query.go @@ -8,7 +8,6 @@ import ( "strings" "time" - sdk "github.com/cosmos/cosmos-sdk/types" query "github.com/cosmos/cosmos-sdk/types/query" ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" @@ -17,22 +16,6 @@ import ( "github.com/initia-labs/opinit-bots/types" ) -func (b BaseHost) GetAddress() (sdk.AccAddress, error) { - broadcaster, err := b.node.GetBroadcaster() - if err != nil { - return nil, err - } - return broadcaster.GetAddress(), nil -} - -func (b BaseHost) GetAddressStr() (string, error) { - broadcaster, err := b.node.GetBroadcaster() - if err != nil { - return "", err - } - return broadcaster.GetAddressString() -} - func (b BaseHost) QueryBridgeConfig(ctx context.Context, bridgeId uint64) (*ophosttypes.QueryBridgeResponse, error) { req := &ophosttypes.QueryBridgeRequest{ BridgeId: bridgeId, diff --git a/types/const.go b/types/const.go index beac621..eacbcc8 100644 --- a/types/const.go +++ b/types/const.go @@ -8,4 +8,6 @@ const ( DAHostName = "da_host" DACelestiaName = "da_celestia" + + MsgUpdateOracleTypeUrl = "/opinit.opchild.v1.MsgUpdateOracle" )