Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize async sendTransaction memory usage #257

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 66 additions & 39 deletions v3/client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
"sync/atomic"
"time"

cache "github.com/patrickmn/go-cache"

"github.com/FISCO-BCOS/bcos-c-sdk/bindings/go/csdk"
"github.com/FISCO-BCOS/go-sdk/v3/types"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -41,11 +43,12 @@

const (
// Timeouts
tcpKeepAliveInterval = 30 * time.Second
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
amopTimeout = 1000
jsonRPCVersion = "2.0"
tcpKeepAliveInterval = 30 * time.Second
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
amopTimeout = 1000
cleanupInterval = 15 * time.Minute
defaultTransactionTimeout = 10 * time.Minute
jsonRPCVersion = "2.0"
)

// Error wraps RPC errors, which contain an error code in addition to the message.
Expand All @@ -56,17 +59,19 @@

// Connection represents a connection to an RPC server.
type Connection struct {
csdk *csdk.CSDK
idCounter int64
blockNumberNotify func(int64)
notifyLock sync.Mutex
csdk *csdk.CSDK
idCounter int64
blockNumberNotify func(int64)
notifyLock sync.Mutex
transactionHandlers *cache.Cache
closed bool
}

type requestOp struct {
ids []json.RawMessage
err error
resp chan *jsonrpcMessage // receives up to len(ids) responses
respChanData *csdk.CallbackChan
handler func(*types.Receipt, error)
}

type EventLogRespResult struct {
Expand All @@ -86,7 +91,7 @@
Status int `json:"status"`
}

func (op *requestOp) waitRpcMessage(ctx context.Context) (*jsonrpcMessage, interface{}, error) {
func (op *requestOp) waitRpcMessage() (*jsonrpcMessage, interface{}, error) {
respBody := <-op.respChanData.Data
var respData jsonrpcMessage
if respBody.Err == nil {
Expand Down Expand Up @@ -138,7 +143,8 @@
if err != nil {
return nil, err
}
c := &Connection{csdk: sdk}
c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)}
go c.processTransactionResponses()

Check warning on line 147 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L146-L147

Added lines #L146 - L147 were not covered by tests
return c, nil
}

Expand Down Expand Up @@ -167,14 +173,53 @@
if err != nil {
return nil, err
}
c := &Connection{csdk: sdk}
c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)}
go c.processTransactionResponses()
return c, nil
}

func (c *Connection) GetCSDK() *csdk.CSDK {
return c.csdk
}

func (c *Connection) processTransactionResponses() {
for {
if !c.closed {
items := c.transactionHandlers.Items()
for key, item := range items {
op := item.Object.(*requestOp)
if len(op.respChanData.Data) > 0 {
go func() {
resp, _, err := op.waitRpcMessage()
if err != nil {
op.handler(nil, err)
return

Check warning on line 195 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L194-L195

Added lines #L194 - L195 were not covered by tests
}
if resp.Error != nil {
op.handler(nil, resp.Error)
return

Check warning on line 199 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L198-L199

Added lines #L198 - L199 were not covered by tests
}
if len(resp.Result) == 0 {
op.handler(nil, errors.New("result is null"))
return

Check warning on line 203 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L202-L203

Added lines #L202 - L203 were not covered by tests
}
var receipt types.Receipt
err = json.Unmarshal(resp.Result, &receipt)
if err != nil {
op.handler(nil, fmt.Errorf("unmarshal receipt error: %v", err))
return

Check warning on line 209 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L208-L209

Added lines #L208 - L209 were not covered by tests
}
op.handler(&receipt, nil)
}()
c.transactionHandlers.Delete(key)
}
}
} else {
return

Check warning on line 217 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L216-L217

Added lines #L216 - L217 were not covered by tests
}
}

}

func (c *Connection) nextID() int64 {
id := atomic.AddInt64(&c.idCounter, 1)
return id
Expand All @@ -193,6 +238,7 @@

// Close closes the client, aborting any in-flight requests.
func (c *Connection) Close() {
c.closed = true

Check warning on line 241 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L241

Added line #L241 was not covered by tests
c.csdk.Close()
}

Expand Down Expand Up @@ -264,7 +310,7 @@
}

func (c *Connection) PublishAmopTopicMessage(ctx context.Context, topic string, data []byte, handler func([]byte, error)) error {
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 100)}}
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 1)}}

Check warning on line 313 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L313

Added line #L313 was not covered by tests
c.csdk.PublishAmopTopicMsg(op.respChanData, topic, data, amopTimeout)
go func() {
select {
Expand Down Expand Up @@ -294,7 +340,7 @@
// can also pass nil, in which case the result is ignored.
func (c *Connection) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
//logrus.Infof("CallContext method:%s\n", method)
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 100)}}
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 1)}}
switch method {
case "call":
arg := args[0].(map[string]interface{})
Expand Down Expand Up @@ -399,36 +445,17 @@
}
// async send transaction
if handler != nil {
go func() {
resp, _, err := op.waitRpcMessage(ctx)
if err != nil {
handler(nil, err)
return
}
if resp.Error != nil {
handler(nil, resp.Error)
return
}
if len(resp.Result) == 0 {
handler(nil, errors.New("result is null"))
return
}
var receipt types.Receipt
err = json.Unmarshal(resp.Result, &receipt)
if err != nil {
handler(nil, fmt.Errorf("unmarshal receipt error: %v", err))
return
}
handler(&receipt, nil)
}()
op.handler = handler
pointer := fmt.Sprintf("%p", op.respChanData)
c.transactionHandlers.Set(pointer, op, defaultTransactionTimeout)
return nil
}
default:
return ErrNoRpcMethod
}

// dispatch has accepted the request and will close the channel when it quits.
switch resp, _, err := op.waitRpcMessage(ctx); {
switch resp, _, err := op.waitRpcMessage(); {
case err != nil:
return err
case resp.Error != nil:
Expand Down
2 changes: 1 addition & 1 deletion v3/examples/parallelok/manual/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func main() {
var wg2 sync.WaitGroup
sendBar := progressbar.Default(int64(*totalTx), "send")
receiveBar := progressbar.Default(int64(*totalTx), "receive")
limiter := rate.NewLimiter(rate.Limit(*totalTx), *qps)
limiter := rate.NewLimiter(rate.Limit(*qps), *qps)
for i := 0; i < *totalTx; i++ {
from := i % *userCount
to := (i + *userCount/2) % *userCount
Expand Down
7 changes: 7 additions & 0 deletions v3/examples/parallelok/wrapper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/FISCO-BCOS/go-sdk/v3/types"
"github.com/schollz/progressbar/v3"
flag "github.com/spf13/pflag"
"golang.org/x/time/rate"
)

func main() {
Expand Down Expand Up @@ -126,6 +127,8 @@ func main() {
var wg2 sync.WaitGroup
sendBar := progressbar.Default(int64(*totalTx), "send")
receiveBar := progressbar.Default(int64(*totalTx), "receive")
limiter := rate.NewLimiter(rate.Limit(*qps), *qps)

// routineCount := (qps + 4000) / 4000
// sended := int64(0)
// for i := 0; i < routineCount; i++ {
Expand Down Expand Up @@ -184,6 +187,10 @@ func main() {
from := i % *userCount
to := (i + *userCount/2) % *userCount
amount := int64(1)
err = limiter.Wait(context.Background())
if err != nil {
log.Fatalf("limiter Wait error: %v", err)
}
_, err = transfer.AsyncTransfer(func(receipt *types.Receipt, err error) {
receiveBar.Add(1)
if err != nil {
Expand Down
Loading