From 3c32aabd3b0208d1d94b27a0108cbc55df42b5ac Mon Sep 17 00:00:00 2001 From: Brandon Weng <18161326+BrandonWeng@users.noreply.github.com> Date: Mon, 15 May 2023 19:12:32 -0400 Subject: [PATCH] Parallelize Sudo Deposit (#777) * More fixes * move deferred * fix nonterminism * debug * Fix unit test --- x/dex/contract/abci.go | 61 ++++++++++++++++++++++---- x/dex/keeper/abci/end_block_deposit.go | 22 ++++++---- x/dex/module_test.go | 6 +++ x/oracle/simulation/operations.go | 4 +- 4 files changed, 74 insertions(+), 19 deletions(-) diff --git a/x/dex/contract/abci.go b/x/dex/contract/abci.go index 6cde15104b..d0d15577e0 100644 --- a/x/dex/contract/abci.go +++ b/x/dex/contract/abci.go @@ -161,21 +161,64 @@ func decorateContextForContract(ctx sdk.Context, contractInfo types.ContractInfo ) } +func getSudoPlaceOrderMessages(sdkCtx sdk.Context, validContractsInfos []types.ContractInfoV2, keeperWrapper dexkeeperabci.KeeperWrapper) map[int]types.SudoOrderPlacementMsg { + resultChan := make(chan struct { + int + types.SudoOrderPlacementMsg + }, len(validContractsInfos)) + + var wg sync.WaitGroup + contractToProcess := dexutils.GetMemState(sdkCtx.Context()).GetContractToProcess() + for index, contract := range validContractsInfos { + wg.Add(1) + go func(index int, contract types.ContractInfoV2) { + defer wg.Done() + if !contract.NeedOrderMatching || !contractToProcess.Contains(contract.ContractAddr) { + return + } + typedContractAddr := types.ContractAddress(contract.ContractAddr) + msg := keeperWrapper.GetDepositSudoMsg(sdkCtx, typedContractAddr) + if msg.IsEmpty() { + return + } + resultChan <- struct { + int + types.SudoOrderPlacementMsg + }{index, msg} + + }(index, contract) + } + + go func() { + wg.Wait() + close(resultChan) + }() + + resultMapToIndex := map[int]types.SudoOrderPlacementMsg{} + for result := range resultChan { + resultMapToIndex[result.int] = result.SudoOrderPlacementMsg + } + + keeperWrapper.BankKeeper.WriteDeferredOperations(sdkCtx) + + return resultMapToIndex +} + func handleDeposits(spanCtx context.Context, ctx sdk.Context, env *environment, keeper *keeper.Keeper, tracer *otrace.Tracer) { // Handle deposit sequentially since they mutate `bank` state which is shared by all contracts _, span := (*tracer).Start(spanCtx, "handleDeposits") defer span.End() defer telemetry.MeasureSince(time.Now(), "dex", "handle_deposits") keeperWrapper := dexkeeperabci.KeeperWrapper{Keeper: keeper} - for _, contract := range env.validContractsInfo { - if !dexutils.GetMemState(ctx.Context()).GetContractToProcess().Contains(contract.ContractAddr) { - continue - } - if !contract.NeedOrderMatching { - continue - } - if err := keeperWrapper.HandleEBDeposit(spanCtx, ctx, tracer, contract.ContractAddr); err != nil { - env.addError(contract.ContractAddr, err) + + // Filter and create and pending sudo messages concurrently and batch write to bank keeper + sudoMessages := getSudoPlaceOrderMessages(ctx, env.validContractsInfo, keeperWrapper) + for index := 0; index < len(env.validContractsInfo); index++ { + if message, ok := sudoMessages[index]; ok { + contract := env.validContractsInfo[index] + if err := keeperWrapper.HandleEBDeposit(spanCtx, ctx, tracer, contract.ContractAddr, message); err != nil { + env.addError(contract.ContractAddr, err) + } } } } diff --git a/x/dex/keeper/abci/end_block_deposit.go b/x/dex/keeper/abci/end_block_deposit.go index 4bbda57cf5..affa7dfa01 100644 --- a/x/dex/keeper/abci/end_block_deposit.go +++ b/x/dex/keeper/abci/end_block_deposit.go @@ -13,17 +13,12 @@ import ( otrace "go.opentelemetry.io/otel/trace" ) -func (w KeeperWrapper) HandleEBDeposit(ctx context.Context, sdkCtx sdk.Context, tracer *otrace.Tracer, contractAddr string) error { +func (w KeeperWrapper) HandleEBDeposit(ctx context.Context, sdkCtx sdk.Context, tracer *otrace.Tracer, contractAddr string, message types.SudoOrderPlacementMsg) error { _, span := (*tracer).Start(ctx, "SudoDeposit") span.SetAttributes(attribute.String("contractAddr", contractAddr)) defer span.End() - typedContractAddr := types.ContractAddress(contractAddr) - msg := w.GetDepositSudoMsg(sdkCtx, typedContractAddr) - if msg.IsEmpty() { - return nil - } - _, err := utils.CallContractSudo(sdkCtx, w.Keeper, contractAddr, msg, dexutils.ZeroUserProvidedGas) // deposit + _, err := utils.CallContractSudo(sdkCtx, w.Keeper, contractAddr, message, dexutils.ZeroUserProvidedGas) // deposit if err != nil { sdkCtx.Logger().Error(fmt.Sprintf("Error during deposit: %s", err.Error())) return err @@ -34,6 +29,17 @@ func (w KeeperWrapper) HandleEBDeposit(ctx context.Context, sdkCtx sdk.Context, func (w KeeperWrapper) GetDepositSudoMsg(ctx sdk.Context, typedContractAddr types.ContractAddress) types.SudoOrderPlacementMsg { depositMemState := dexutils.GetMemState(ctx.Context()).GetDepositInfo(ctx, typedContractAddr).Get() + + // If there's no amount to send, exit early and avoid additional processing + if len(depositMemState) == 0 { + return types.SudoOrderPlacementMsg{ + OrderPlacements: types.OrderPlacementMsgDetails{ + Orders: []types.Order{}, + Deposits: []types.ContractDepositInfo{}, + }, + } + } + contractDepositInfo := seiutils.Map( depositMemState, func(d *types.DepositInfoEntry) types.ContractDepositInfo { return d.ToContractDepositInfo() }, @@ -46,7 +52,7 @@ func (w KeeperWrapper) GetDepositSudoMsg(ctx sdk.Context, typedContractAddr type if err != nil { panic(err) } - if err := w.BankKeeper.SendCoinsFromModuleToAccount(ctx, types.ModuleName, contractAddr, escrowed); err != nil { + if err := w.BankKeeper.DeferredSendCoinsFromModuleToAccount(ctx, types.ModuleName, contractAddr, escrowed); err != nil { panic(err) } return types.SudoOrderPlacementMsg{ diff --git a/x/dex/module_test.go b/x/dex/module_test.go index f68ddf8bc9..8b43e74383 100644 --- a/x/dex/module_test.go +++ b/x/dex/module_test.go @@ -695,6 +695,12 @@ func TestEndBlockRollbackWithRentCharge(t *testing.T) { params.MinProcessableRent = 0 dexkeeper.SetParams(ctx, params) + // rent should still be charged even if the contract failed + beforeC, err := dexkeeper.GetContract(ctx, contractAddr.String()) + require.Nil(t, err) + require.False(t, beforeC.Suspended) // good contract is not suspended + require.Equal(t, uint64(1), beforeC.RentBalance) // rent balance should not be empty + ctx = ctx.WithBlockHeight(1) creatorBalanceBefore := bankkeeper.GetBalance(ctx, testAccount, "usei") testApp.EndBlocker(ctx, abci.RequestEndBlock{}) diff --git a/x/oracle/simulation/operations.go b/x/oracle/simulation/operations.go index 3988c4b164..2028e73625 100644 --- a/x/oracle/simulation/operations.go +++ b/x/oracle/simulation/operations.go @@ -65,7 +65,7 @@ func WeightedOperations( } // SimulateMsgAggregateExchangeRateVote generates a MsgAggregateExchangeRateVote with random values. -//nolint: funlen +// nolint: funlen func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string, @@ -122,7 +122,7 @@ func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankK } // SimulateMsgDelegateFeedConsent generates a MsgDelegateFeedConsent with random values. -//nolint: funlen +// nolint: funlen func SimulateMsgDelegateFeedConsent(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation { return func( r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,