Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: split mutex for each account #49

Merged
merged 5 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/go-metrics v0.5.3
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/uint256 v1.3.0
github.com/initia-labs/OPinit v0.4.1
github.com/initia-labs/initia v0.4.1
Expand Down Expand Up @@ -163,7 +164,6 @@ require (
github.com/hashicorp/go-safetemp v1.0.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/hdevalence/ed25519consensus v0.1.0 // indirect
Expand Down
16 changes: 13 additions & 3 deletions jsonrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"sync"

bigcache "github.com/allegro/bigcache/v3"
lrucache "github.com/hashicorp/golang-lru/v2"

"cosmossdk.io/log"
"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -19,7 +20,7 @@
logger log.Logger

queuedTxs *bigcache.BigCache
sendTxMut sync.Mutex
accMuts *lrucache.Cache[string, *sync.Mutex]

ctx context.Context
svrCtx *server.Context
Expand Down Expand Up @@ -54,13 +55,22 @@
return nil, err
}

// support concurrent 100 accounts mutex
accMuts, err := lrucache.New[string, *sync.Mutex](100)
if err != nil {
return nil, err
}

Check warning on line 62 in jsonrpc/backend/backend.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/backend.go#L59-L62

Added lines #L59 - L62 were not covered by tests

return &JSONRPCBackend{
app: app,
logger: logger,
app: app,
logger: logger,

Check warning on line 67 in jsonrpc/backend/backend.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/backend.go#L65-L67

Added lines #L65 - L67 were not covered by tests
queuedTxs: queuedTxs,
accMuts: accMuts,

ctx: ctx,
svrCtx: svrCtx,
clientCtx: clientCtx,
cfg: cfg,
}, nil
}
23 changes: 16 additions & 7 deletions jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"context"
"errors"
"fmt"
"sync"

"cosmossdk.io/collections"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
Expand Down Expand Up @@ -36,9 +37,6 @@
}

func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {
b.sendTxMut.Lock()
defer b.sendTxMut.Unlock()

queryCtx, err := b.getQueryCtx()
if err != nil {
return err
Expand Down Expand Up @@ -70,28 +68,39 @@
accSeq := uint64(0)
sender := sdk.AccAddress(sig.PubKey.Address().Bytes())

// hold mutex for each sender
senderHex := hexutil.Encode(sender.Bytes())
accMut, ok := b.accMuts.Get(senderHex)
if !ok {
accMut = &sync.Mutex{}
_ = b.accMuts.Add(senderHex, accMut)
}
beer-1 marked this conversation as resolved.
Show resolved Hide resolved

accMut.Lock()
defer accMut.Unlock()

Check warning on line 81 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L79-L81

Added lines #L79 - L81 were not covered by tests
checkCtx := b.app.GetContextForCheckTx(nil)
if acc := b.app.AccountKeeper.GetAccount(checkCtx, sender); acc != nil {
accSeq = acc.GetSequence()
}

b.logger.Debug("enqueue tx", "sender", sender, "txSeq", txSeq, "accSeq", accSeq)
cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), txSeq)
b.logger.Debug("enqueue tx", "sender", senderHex, "txSeq", txSeq, "accSeq", accSeq)
cacheKey := fmt.Sprintf("%s-%d", senderHex, txSeq)

Check warning on line 88 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L87-L88

Added lines #L87 - L88 were not covered by tests
if err := b.queuedTxs.Set(cacheKey, txBytes); err != nil {
b.logger.Error("failed to enqueue tx", "key", cacheKey, "err", err)
return NewInternalError("failed to enqueue tx")
}

// check if there are queued txs which can be sent
for {
cacheKey := fmt.Sprintf("%X-%d", sender.Bytes(), accSeq)
cacheKey := fmt.Sprintf("%s-%d", senderHex, accSeq)

Check warning on line 96 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L96

Added line #L96 was not covered by tests
if txBytes, err := b.queuedTxs.Get(cacheKey); err == nil {
if err := b.queuedTxs.Delete(cacheKey); err != nil {
b.logger.Error("failed to delete queued tx", "key", cacheKey, "err", err)
return NewInternalError("failed to delete queued tx")
}

b.logger.Debug("broadcast queued tx", "sender", sender, "txSeq", accSeq)
b.logger.Debug("broadcast queued tx", "sender", senderHex, "txSeq", accSeq)

Check warning on line 103 in jsonrpc/backend/tx.go

View check run for this annotation

Codecov / codecov/patch

jsonrpc/backend/tx.go#L103

Added line #L103 was not covered by tests
res, err := b.clientCtx.BroadcastTxSync(txBytes)
if err != nil {
return err
Expand Down
Loading