Skip to content

Commit

Permalink
Parallelize begin block for dex module (sei-protocol#771)
Browse files Browse the repository at this point in the history
* Parallelize begin block for dex module

* Fix import

* Fix concurrent modification error

---------

Co-authored-by: Yiming Zang <[email protected]>
Co-authored-by: Brandon Weng <[email protected]>
  • Loading branch information
3 people authored May 15, 2023
1 parent 593632c commit a3576fd
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 21 deletions.
4 changes: 2 additions & 2 deletions x/dex/keeper/price.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ func (k Keeper) SetPriceState(ctx sdk.Context, price types.Price, contractAddr s

func (k Keeper) DeletePriceStateBefore(ctx sdk.Context, contractAddr string, timestamp uint64, pair types.Pair) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.PricePrefix(contractAddr, pair.PriceDenom, pair.AssetDenom))
for _, key := range k.getPriceKeysToDelete(store, timestamp) {
for _, key := range k.GetPriceKeysToDelete(store, timestamp) {
store.Delete(key)
}
}

func (k Keeper) getPriceKeysToDelete(store sdk.KVStore, timestamp uint64) [][]byte {
func (k Keeper) GetPriceKeysToDelete(store sdk.KVStore, timestamp uint64) [][]byte {
keys := [][]byte{}
iterator := sdk.KVStorePrefixIterator(store, []byte{})
defer iterator.Close()
Expand Down
64 changes: 45 additions & 19 deletions x/dex/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/CosmWasm/wasmd/x/wasm"
"github.com/cosmos/cosmos-sdk/store/prefix"
"github.com/cosmos/cosmos-sdk/telemetry"
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel/attribute"

abci "github.com/tendermint/tendermint/abci/types"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
cdctypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
seisync "github.com/sei-protocol/sei-chain/sync"
"github.com/sei-protocol/sei-chain/utils"
"github.com/sei-protocol/sei-chain/utils/datastructures"
"github.com/sei-protocol/sei-chain/utils/tracing"
Expand Down Expand Up @@ -256,30 +255,57 @@ func (am AppModule) BeginBlock(ctx sdk.Context, _ abci.RequestBeginBlock) {
am.keeper.SetEpoch(ctx, currentEpoch)
}
cachedCtx, cachedStore := store.GetCachedContext(ctx)
gasLimit := am.keeper.GetParams(ctx).BeginBlockGasLimit
for _, contract := range am.getAllContractInfo(ctx) {
am.beginBlockForContract(cachedCtx, contract, gasLimit)
priceRetention := am.keeper.GetParams(ctx).PriceSnapshotRetention
cutOffTime := uint64(ctx.BlockTime().Unix()) - priceRetention
wg := sync.WaitGroup{}
mutex := sync.Mutex{}
allContracts := am.getAllContractInfo(ctx)
allPricesToDelete := make(map[string][]*types.PriceStore, len(allContracts))

// Parallelize the logic to find all prices to delete
for _, contract := range allContracts {
wg.Add(1)
go func(contract types.ContractInfoV2) {
priceKeysToDelete := am.getPriceToDelete(cachedCtx, contract, cutOffTime)
mutex.Lock()
allPricesToDelete[contract.ContractAddr] = priceKeysToDelete
mutex.Unlock()
wg.Done()
}(contract)
}
wg.Wait()

// Execute the deletion in order
for _, contract := range allContracts {
if priceStores, found := allPricesToDelete[contract.ContractAddr]; found {
for _, priceStore := range priceStores {
for _, key := range priceStore.PriceKeys {
priceStore.Store.Delete(key)
}
}
}
}
// only write if all contracts have been processed
cachedStore.Write()
}

func (am AppModule) beginBlockForContract(ctx sdk.Context, contract types.ContractInfoV2, gasLimit uint64) {
_, span := am.tracingInfo.Start("DexBeginBlock")
contractAddr := contract.ContractAddr
span.SetAttributes(attribute.String("contract", contractAddr))
defer span.End()

ctx = ctx.WithGasMeter(seisync.NewGasWrapper(dexutils.GetGasMeterForLimit(gasLimit)))

func (am AppModule) getPriceToDelete(
ctx sdk.Context,
contract types.ContractInfoV2,
timestamp uint64,
) []*types.PriceStore {
var result []*types.PriceStore
if contract.NeedOrderMatching {
currentTimestamp := uint64(ctx.BlockTime().Unix())
ctx.Logger().Debug(fmt.Sprintf("Removing stale prices for ts %d", currentTimestamp))
priceRetention := am.keeper.GetParams(ctx).PriceSnapshotRetention
for _, pair := range am.keeper.GetAllRegisteredPairs(ctx, contractAddr) {
am.keeper.DeletePriceStateBefore(ctx, contractAddr, currentTimestamp-priceRetention, pair)
for _, pair := range am.keeper.GetAllRegisteredPairs(ctx, contract.ContractAddr) {
store := prefix.NewStore(ctx.KVStore(am.keeper.GetStoreKey()), types.PricePrefix(contract.ContractAddr, pair.PriceDenom, pair.AssetDenom))
keysToDelete := am.keeper.GetPriceKeysToDelete(store, timestamp)
result = append(result, &types.PriceStore{
Store: store,
PriceKeys: keysToDelete,
})
}
}
return result
}

// EndBlock executes all ABCI EndBlock logic respective to the capability module. It
Expand Down
6 changes: 6 additions & 0 deletions x/dex/types/orders.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/sei-protocol/sei-chain/utils"
)
Expand Down Expand Up @@ -151,6 +152,11 @@ func (m *LongBook) SetPrice(p sdk.Dec) {
m.Price = p
}

type PriceStore struct {
Store prefix.Store
PriceKeys [][]byte
}

func (m *LongBook) GetPrice() sdk.Dec {
return m.Price
}
Expand Down

0 comments on commit a3576fd

Please sign in to comment.