Skip to content

Commit

Permalink
feat: split mutex for each account (#49)
Browse files Browse the repository at this point in the history
* split mutex for each account

* use lru cache for queued txs

* remove comment

* introduce mutex for accMut

* use reference count for accMut lifetime
  • Loading branch information
beer-1 authored Aug 21, 2024
1 parent efcca2e commit 7dd9388
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 54 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/go-metrics v0.5.3
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/uint256 v1.3.0
github.com/initia-labs/OPinit v0.4.1
github.com/initia-labs/initia v0.4.1
Expand Down Expand Up @@ -163,7 +164,6 @@ require (
github.com/hashicorp/go-safetemp v1.0.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/hdevalence/ed25519consensus v0.1.0 // indirect
Expand Down
70 changes: 51 additions & 19 deletions jsonrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"sync"

bigcache "github.com/allegro/bigcache/v3"
lrucache "github.com/hashicorp/golang-lru/v2"

"cosmossdk.io/log"
"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -18,8 +18,10 @@ type JSONRPCBackend struct {
app *app.MinitiaApp
logger log.Logger

queuedTxs *bigcache.BigCache
sendTxMut sync.Mutex
queuedTxs *lrucache.Cache[string, []byte]

mut sync.Mutex // mutex for accMuts
accMuts map[string]*AccMut

ctx context.Context
svrCtx *server.Context
Expand All @@ -36,31 +38,61 @@ func NewJSONRPCBackend(
clientCtx client.Context,
cfg config.JSONRPCConfig,
) (*JSONRPCBackend, error) {

if cfg.QueuedTransactionCap == 0 {
cfg.QueuedTransactionCap = config.DefaultQueuedTransactionCap
}
if cfg.QueuedTransactionTTL == 0 {
cfg.QueuedTransactionTTL = config.DefaultQueuedTransactionTTL
}

cacheConfig := bigcache.DefaultConfig(cfg.QueuedTransactionTTL)
cacheConfig.HardMaxCacheSize = cfg.QueuedTransactionCap

ctx := context.Background()

queuedTxs, err := bigcache.New(ctx, cacheConfig)
queuedTxs, err := lrucache.New[string, []byte](cfg.QueuedTransactionCap)
if err != nil {
return nil, err
}

ctx := context.Background()
return &JSONRPCBackend{
app: app,
logger: logger,
app: app,
logger: logger,

queuedTxs: queuedTxs,
accMuts: make(map[string]*AccMut),

ctx: ctx,
svrCtx: svrCtx,
clientCtx: clientCtx,
cfg: cfg,
}, nil
}

type AccMut struct {
mut sync.Mutex
rc int // reference count
}

// acquireAccMut acquires the mutex for the account with the given senderHex
// and increments the reference count. If the mutex does not exist, it is created.
func (b *JSONRPCBackend) acquireAccMut(senderHex string) {
// critical section for rc and create
b.mut.Lock()
accMut, ok := b.accMuts[senderHex]
if !ok {
accMut = &AccMut{rc: 0}
b.accMuts[senderHex] = accMut
}
accMut.rc++
b.mut.Unlock()
// critical section end

accMut.mut.Lock()
}

// releaseAccMut releases the mutex for the account with the given senderHex
// and decrements the reference count. If the reference count reaches zero,
// the mutex is deleted.
func (b *JSONRPCBackend) releaseAccMut(senderHex string) {
accMut := b.accMuts[senderHex]
accMut.mut.Unlock()

// critical section for rc and delete
b.mut.Lock()
accMut.rc--
if accMut.rc == 0 {
delete(b.accMuts, senderHex)
}
b.mut.Unlock()
// critical section end
}
29 changes: 13 additions & 16 deletions jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ func (b *JSONRPCBackend) SendRawTransaction(input hexutil.Bytes) (common.Hash, e
}

func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
b.sendTxMut.Lock()
defer b.sendTxMut.Unlock()

queryCtx, err := b.getQueryCtx()
if err != nil {
return err
Expand Down Expand Up @@ -70,28 +67,28 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
accSeq := uint64(0)
sender := sdk.AccAddress(sig.PubKey.Address().Bytes())

senderHex := hexutil.Encode(sender.Bytes())

// hold mutex for each sender
b.acquireAccMut(senderHex)
defer b.releaseAccMut(senderHex)

checkCtx := b.app.GetContextForCheckTx(nil)
if acc := b.app.AccountKeeper.GetAccount(checkCtx, sender); acc != nil {
accSeq = acc.GetSequence()
}

b.logger.Debug("enqueue tx", "sender", sender, "txSeq", txSeq, "accSeq", accSeq)
cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), txSeq)
if err := b.queuedTxs.Set(cacheKey, txBytes); err != nil {
b.logger.Error("failed to enqueue tx", "key", cacheKey, "err", err)
return NewInternalError("failed to enqueue tx")
}
b.logger.Debug("enqueue tx", "sender", senderHex, "txSeq", txSeq, "accSeq", accSeq)
cacheKey := fmt.Sprintf("%s-%d", senderHex, txSeq)
_ = b.queuedTxs.Add(cacheKey, txBytes)

// check if there are queued txs which can be sent
for {
cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), accSeq)
if txBytes, err := b.queuedTxs.Get(cacheKey); err == nil {
if err := b.queuedTxs.Delete(cacheKey); err != nil {
b.logger.Error("failed to delete queued tx", "key", cacheKey, "err", err)
return NewInternalError("failed to delete queued tx")
}
cacheKey := fmt.Sprintf("%s-%d", senderHex, accSeq)
if txBytes, ok := b.queuedTxs.Get(cacheKey); ok {
_ = b.queuedTxs.Remove(cacheKey)

b.logger.Debug("broadcast queued tx", "sender", sender, "txSeq", accSeq)
b.logger.Debug("broadcast queued tx", "sender", senderHex, "txSeq", accSeq)
res, err := b.clientCtx.BroadcastTxSync(txBytes)
if err != nil {
return err
Expand Down
24 changes: 6 additions & 18 deletions jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,8 @@ const (
DefaultBatchRequestLimit = 1000
// DefaultBatchResponseMaxSize is the default maximum number of bytes returned from a batched call
DefaultBatchResponseMaxSize = 25 * 1000 * 1000
// DefaultQueuedTransactionCap is the default maximum size (MiB) of queued transactions that can be in the transaction pool.
DefaultQueuedTransactionCap = 100
// DefaultQueuedTransactionTTL is the default maximum time a queued transaction can
// remain in the transaction pool before being evicted.
DefaultQueuedTransactionTTL = 1 * time.Minute
// DefaultQueuedTransactionCap is the default maximum number of queued transactions that can be in the transaction pool.
DefaultQueuedTransactionCap = 1000
)

var (
Expand Down Expand Up @@ -98,11 +95,8 @@ type JSONRPCConfig struct {
BatchRequestLimit int `mapstructure:"batch-request-limit"`
// Maximum number of bytes returned from a batched call
BatchResponseMaxSize int `mapstructure:"batch-response-max-size"`
// QueuedTransactionCap is a maximum size (MiB) of queued transactions that can be in the transaction pool.
// QueuedTransactionCap is a maximum number of queued transactions that can be in the transaction pool.
QueuedTransactionCap int `mapstructure:"queued-transaction-cap"`
// QueuedTransactionTTL is the maximum time a queued transaction can
// remain in the transaction pool before being evicted.
QueuedTransactionTTL time.Duration `mapstructure:"queued-transaction-ttl"`
}

// DefaultJSONRPCConfig returns a default configuration for the EVM RPC server.
Expand All @@ -127,7 +121,6 @@ func DefaultJSONRPCConfig() JSONRPCConfig {
BatchResponseMaxSize: DefaultBatchResponseMaxSize,

QueuedTransactionCap: DefaultQueuedTransactionCap,
QueuedTransactionTTL: DefaultQueuedTransactionTTL,
}
}

Expand All @@ -146,8 +139,7 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().Int(flagJSONRPCMaxOpenConnections, DefaultMaxOpenConnections, "Maximum number of simultaneous connections for the server listener")
startCmd.Flags().Int(flagJSONRPCBatchRequestLimit, DefaultBatchRequestLimit, "Maximum number of requests in a batch")
startCmd.Flags().Int(flagJSONRPCBatchResponseMaxSize, DefaultBatchResponseMaxSize, "Maximum number of bytes returned from a batched call")
startCmd.Flags().Int(flagJSONRPCQueuedTransactionCap, DefaultQueuedTransactionCap, "Maximum size (MiB) of queued transactions that can be in the transaction pool")
startCmd.Flags().Duration(flagJSONRPCQueuedTransactionTTL, DefaultQueuedTransactionTTL, "Maximum time a queued transaction can remain in the transaction pool before being evicted")
startCmd.Flags().Int(flagJSONRPCQueuedTransactionCap, DefaultQueuedTransactionCap, "Maximum number of queued transactions that can be in the transaction pool")
}

// GetConfig load config values from the app options
Expand All @@ -167,7 +159,6 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
BatchRequestLimit: cast.ToInt(appOpts.Get(flagJSONRPCBatchRequestLimit)),
BatchResponseMaxSize: cast.ToInt(appOpts.Get(flagJSONRPCBatchResponseMaxSize)),
QueuedTransactionCap: cast.ToInt(appOpts.Get(flagJSONRPCQueuedTransactionCap)),
QueuedTransactionTTL: cast.ToDuration(appOpts.Get(flagJSONRPCQueuedTransactionTTL)),
}
}

Expand Down Expand Up @@ -220,10 +211,7 @@ batch-request-limit = {{ .JSONRPCConfig.BatchRequestLimit }}
# Maximum number of bytes returned from a batched call
batch-response-max-size = {{ .JSONRPCConfig.BatchResponseMaxSize }}
# QueuedTransactionCap is a maximum size (MiB) of queued transactions that can be in the transaction pool.
# QueuedTransactionCap is the maximum number of queued transactions that
# can be in the transaction pool.
queued-transaction-cap = {{ .JSONRPCConfig.QueuedTransactionCap }}
# QueuedTransactionTTL is the maximum time a queued transaction can
# remain in the transaction pool before being evicted.
queued-transaction-ttl = "{{ .JSONRPCConfig.QueuedTransactionTTL }}"
`

0 comments on commit 7dd9388

Please sign in to comment.