Skip to content

Commit

Permalink
Parallelize Sudo Deposit (sei-protocol#777)
Browse files Browse the repository at this point in the history
* More fixes

* move deferred

* fix nonterminism

* debug

* Fix unit test
  • Loading branch information
BrandonWeng authored May 15, 2023
1 parent a3576fd commit 3c32aab
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 19 deletions.
61 changes: 52 additions & 9 deletions x/dex/contract/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,64 @@ func decorateContextForContract(ctx sdk.Context, contractInfo types.ContractInfo
)
}

func getSudoPlaceOrderMessages(sdkCtx sdk.Context, validContractsInfos []types.ContractInfoV2, keeperWrapper dexkeeperabci.KeeperWrapper) map[int]types.SudoOrderPlacementMsg {
resultChan := make(chan struct {
int
types.SudoOrderPlacementMsg
}, len(validContractsInfos))

var wg sync.WaitGroup
contractToProcess := dexutils.GetMemState(sdkCtx.Context()).GetContractToProcess()
for index, contract := range validContractsInfos {
wg.Add(1)
go func(index int, contract types.ContractInfoV2) {
defer wg.Done()
if !contract.NeedOrderMatching || !contractToProcess.Contains(contract.ContractAddr) {
return
}
typedContractAddr := types.ContractAddress(contract.ContractAddr)
msg := keeperWrapper.GetDepositSudoMsg(sdkCtx, typedContractAddr)
if msg.IsEmpty() {
return
}
resultChan <- struct {
int
types.SudoOrderPlacementMsg
}{index, msg}

}(index, contract)
}

go func() {
wg.Wait()
close(resultChan)
}()

resultMapToIndex := map[int]types.SudoOrderPlacementMsg{}
for result := range resultChan {
resultMapToIndex[result.int] = result.SudoOrderPlacementMsg
}

keeperWrapper.BankKeeper.WriteDeferredOperations(sdkCtx)

return resultMapToIndex
}

func handleDeposits(spanCtx context.Context, ctx sdk.Context, env *environment, keeper *keeper.Keeper, tracer *otrace.Tracer) {
// Handle deposit sequentially since they mutate `bank` state which is shared by all contracts
_, span := (*tracer).Start(spanCtx, "handleDeposits")
defer span.End()
defer telemetry.MeasureSince(time.Now(), "dex", "handle_deposits")
keeperWrapper := dexkeeperabci.KeeperWrapper{Keeper: keeper}
for _, contract := range env.validContractsInfo {
if !dexutils.GetMemState(ctx.Context()).GetContractToProcess().Contains(contract.ContractAddr) {
continue
}
if !contract.NeedOrderMatching {
continue
}
if err := keeperWrapper.HandleEBDeposit(spanCtx, ctx, tracer, contract.ContractAddr); err != nil {
env.addError(contract.ContractAddr, err)

// Filter and create and pending sudo messages concurrently and batch write to bank keeper
sudoMessages := getSudoPlaceOrderMessages(ctx, env.validContractsInfo, keeperWrapper)
for index := 0; index < len(env.validContractsInfo); index++ {
if message, ok := sudoMessages[index]; ok {
contract := env.validContractsInfo[index]
if err := keeperWrapper.HandleEBDeposit(spanCtx, ctx, tracer, contract.ContractAddr, message); err != nil {
env.addError(contract.ContractAddr, err)
}
}
}
}
Expand Down
22 changes: 14 additions & 8 deletions x/dex/keeper/abci/end_block_deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,12 @@ import (
otrace "go.opentelemetry.io/otel/trace"
)

func (w KeeperWrapper) HandleEBDeposit(ctx context.Context, sdkCtx sdk.Context, tracer *otrace.Tracer, contractAddr string) error {
func (w KeeperWrapper) HandleEBDeposit(ctx context.Context, sdkCtx sdk.Context, tracer *otrace.Tracer, contractAddr string, message types.SudoOrderPlacementMsg) error {
_, span := (*tracer).Start(ctx, "SudoDeposit")
span.SetAttributes(attribute.String("contractAddr", contractAddr))
defer span.End()

typedContractAddr := types.ContractAddress(contractAddr)
msg := w.GetDepositSudoMsg(sdkCtx, typedContractAddr)
if msg.IsEmpty() {
return nil
}
_, err := utils.CallContractSudo(sdkCtx, w.Keeper, contractAddr, msg, dexutils.ZeroUserProvidedGas) // deposit
_, err := utils.CallContractSudo(sdkCtx, w.Keeper, contractAddr, message, dexutils.ZeroUserProvidedGas) // deposit
if err != nil {
sdkCtx.Logger().Error(fmt.Sprintf("Error during deposit: %s", err.Error()))
return err
Expand All @@ -34,6 +29,17 @@ func (w KeeperWrapper) HandleEBDeposit(ctx context.Context, sdkCtx sdk.Context,

func (w KeeperWrapper) GetDepositSudoMsg(ctx sdk.Context, typedContractAddr types.ContractAddress) types.SudoOrderPlacementMsg {
depositMemState := dexutils.GetMemState(ctx.Context()).GetDepositInfo(ctx, typedContractAddr).Get()

// If there's no amount to send, exit early and avoid additional processing
if len(depositMemState) == 0 {
return types.SudoOrderPlacementMsg{
OrderPlacements: types.OrderPlacementMsgDetails{
Orders: []types.Order{},
Deposits: []types.ContractDepositInfo{},
},
}
}

contractDepositInfo := seiutils.Map(
depositMemState,
func(d *types.DepositInfoEntry) types.ContractDepositInfo { return d.ToContractDepositInfo() },
Expand All @@ -46,7 +52,7 @@ func (w KeeperWrapper) GetDepositSudoMsg(ctx sdk.Context, typedContractAddr type
if err != nil {
panic(err)
}
if err := w.BankKeeper.SendCoinsFromModuleToAccount(ctx, types.ModuleName, contractAddr, escrowed); err != nil {
if err := w.BankKeeper.DeferredSendCoinsFromModuleToAccount(ctx, types.ModuleName, contractAddr, escrowed); err != nil {
panic(err)
}
return types.SudoOrderPlacementMsg{
Expand Down
6 changes: 6 additions & 0 deletions x/dex/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,12 @@ func TestEndBlockRollbackWithRentCharge(t *testing.T) {
params.MinProcessableRent = 0
dexkeeper.SetParams(ctx, params)

// rent should still be charged even if the contract failed
beforeC, err := dexkeeper.GetContract(ctx, contractAddr.String())
require.Nil(t, err)
require.False(t, beforeC.Suspended) // good contract is not suspended
require.Equal(t, uint64(1), beforeC.RentBalance) // rent balance should not be empty

ctx = ctx.WithBlockHeight(1)
creatorBalanceBefore := bankkeeper.GetBalance(ctx, testAccount, "usei")
testApp.EndBlocker(ctx, abci.RequestEndBlock{})
Expand Down
4 changes: 2 additions & 2 deletions x/oracle/simulation/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func WeightedOperations(
}

// SimulateMsgAggregateExchangeRateVote generates a MsgAggregateExchangeRateVote with random values.
//nolint: funlen
// nolint: funlen
func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation {
return func(
r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,
Expand Down Expand Up @@ -122,7 +122,7 @@ func SimulateMsgAggregateExchangeRateVote(ak types.AccountKeeper, bk types.BankK
}

// SimulateMsgDelegateFeedConsent generates a MsgDelegateFeedConsent with random values.
//nolint: funlen
// nolint: funlen
func SimulateMsgDelegateFeedConsent(ak types.AccountKeeper, bk types.BankKeeper, k keeper.Keeper) simtypes.Operation {
return func(
r *rand.Rand, app *baseapp.BaseApp, ctx sdk.Context, accs []simtypes.Account, chainID string,
Expand Down

0 comments on commit 3c32aab

Please sign in to comment.