Skip to content

Commit

Permalink
misc(state/core_accessor): add retry logic for creating a txClient (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Jan 23, 2025
1 parent ef9d5a2 commit 72ef0ac
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 25 deletions.
74 changes: 49 additions & 25 deletions state/core_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type CoreAccessor struct {
keyring keyring.Keyring
client *user.TxClient

// TODO: remove in scope of https://github.com/celestiaorg/celestia-node/issues/3515
defaultSignerAccount string
defaultSignerAddress AccAddress

Expand Down Expand Up @@ -90,9 +89,19 @@ func NewCoreAccessor(
prt.RegisterOpDecoder(storetypes.ProofOpIAVLCommitment, storetypes.CommitmentOpDecoder)
prt.RegisterOpDecoder(storetypes.ProofOpSimpleMerkleCommitment, storetypes.CommitmentOpDecoder)

rec, err := keyring.Key(keyname)
if err != nil {
return nil, err
}
addr, err := rec.GetAddress()
if err != nil {
return nil, err
}

ca := &CoreAccessor{
keyring: keyring,
defaultSignerAccount: keyname,
defaultSignerAddress: addr,
getter: getter,
prt: prt,
coreConn: conn,
Expand All @@ -118,17 +127,15 @@ func (ca *CoreAccessor) Start(ctx context.Context) error {
return fmt.Errorf("wrong network in core.ip endpoint, expected %s, got %s", ca.network, defaultNetwork)
}

// set up signer to handle tx submission
ca.client, err = ca.setupTxClient(ctx, ca.defaultSignerAccount)
err = ca.setupTxClient(ctx)
if err != nil {
log.Warnw("failed to set up signer, check if node's account is funded", "err", err)
log.Warn(err)
}

ca.minGasPrice, err = ca.queryMinimumGasPrice(ctx)
if err != nil {
return fmt.Errorf("querying minimum gas price: %w", err)
}

return nil
}

Expand All @@ -145,6 +152,11 @@ func (ca *CoreAccessor) SubmitPayForBlob(
libBlobs []*libshare.Blob,
cfg *TxConfig,
) (*TxResponse, error) {
client, err := ca.getTxClient(ctx)
if err != nil {
return nil, err
}

if len(libBlobs) == 0 {
return nil, errors.New("state: no blobs provided")
}
Expand Down Expand Up @@ -179,7 +191,7 @@ func (ca *CoreAccessor) SubmitPayForBlob(

accName := ca.defaultSignerAccount
if !signer.Equals(ca.defaultSignerAddress) {
account := ca.client.AccountByAddress(signer)
account := client.AccountByAddress(signer)
if account == nil {
return nil, fmt.Errorf("account for signer %s not found", signer)
}
Expand All @@ -193,7 +205,7 @@ func (ca *CoreAccessor) SubmitPayForBlob(
opts = append(opts, feeGrant)
}

response, err := ca.client.SubmitPayForBlobWithAccount(ctx, accName, libBlobs, opts...)
response, err := client.SubmitPayForBlobWithAccount(ctx, accName, libBlobs, opts...)
// Network min gas price can be updated through governance in app
// If that's the case, we parse the insufficient min gas price error message and update the gas price
if apperrors.IsInsufficientMinGasPrice(err) {
Expand Down Expand Up @@ -524,36 +536,48 @@ func (ca *CoreAccessor) queryMinimumGasPrice(
return coins.AmountOf(app.BondDenom).MustFloat64(), nil
}

func (ca *CoreAccessor) setupTxClient(ctx context.Context, keyName string) (*user.TxClient, error) {
func (ca *CoreAccessor) setupTxClient(ctx context.Context) error {
if ca.client != nil {
return nil
}

encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...)
// explicitly set default address. Otherwise, there could be a mismatch between defaultKey and
// defaultAddress.
rec, err := ca.keyring.Key(keyName)
client, err := user.SetupTxClient(ctx, ca.keyring, ca.coreConn, encCfg,
user.WithDefaultAddress(ca.defaultSignerAddress),
)
if err != nil {
return nil, err
return fmt.Errorf("failed to setup a tx client: %w", err)
}
addr, err := rec.GetAddress()
if err != nil {
return nil, err

ca.client = client
return nil
}

func (ca *CoreAccessor) getTxClient(ctx context.Context) (*user.TxClient, error) {
if ca.client == nil {
err := ca.setupTxClient(ctx)
if err != nil {
return nil, err
}
}
ca.defaultSignerAddress = addr
return user.SetupTxClient(ctx, ca.keyring, ca.coreConn, encCfg,
user.WithDefaultAccount(keyName), user.WithDefaultAddress(addr),
)
return ca.client, nil
}

func (ca *CoreAccessor) submitMsg(
ctx context.Context,
msg sdktypes.Msg,
cfg *TxConfig,
) (*TxResponse, error) {
client, err := ca.getTxClient(ctx)
if err != nil {
return nil, err
}

txConfig := make([]user.TxOption, 0)
var (
gas = cfg.GasLimit()
err error
)
gas := cfg.GasLimit()

if gas == 0 {
gas, err = estimateGas(ctx, ca.client, msg)
gas, err = estimateGas(ctx, client, msg)
if err != nil {
return nil, fmt.Errorf("estimating gas: %w", err)
}
Expand All @@ -574,7 +598,7 @@ func (ca *CoreAccessor) submitMsg(
txConfig = append(txConfig, user.SetFeeGranter(granter))
}

resp, err := ca.client.SubmitTx(ctx, []sdktypes.Msg{msg}, txConfig...)
resp, err := client.SubmitTx(ctx, []sdktypes.Msg{msg}, txConfig...)
return convertToSdkTxResponse(resp), err
}

Expand Down
3 changes: 3 additions & 0 deletions state/core_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func TestSubmitPayForBlob(t *testing.T) {
t.Cleanup(func() {
_ = ca.Stop(ctx)
})
// explicitly reset client to nil to ensure
// that retry mechanism works.
ca.client = nil

ns, err := libshare.NewV0Namespace([]byte("namespace"))
require.NoError(t, err)
Expand Down

0 comments on commit 72ef0ac

Please sign in to comment.