Skip to content

Commit

Permalink
optimize async sendTransaction memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
bxq2011hust committed Jan 24, 2024
1 parent 1a0c897 commit f46e238
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 40 deletions.
105 changes: 66 additions & 39 deletions v3/client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync/atomic"
"time"

cache "github.com/patrickmn/go-cache"

"github.com/FISCO-BCOS/bcos-c-sdk/bindings/go/csdk"
"github.com/FISCO-BCOS/go-sdk/v3/types"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -41,11 +43,12 @@ var (

const (
// Timeouts
tcpKeepAliveInterval = 30 * time.Second
defaultDialTimeout = 10 * time.Second // used if context has no deadline
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
amopTimeout = 1000
jsonRPCVersion = "2.0"
tcpKeepAliveInterval = 30 * time.Second
subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls
amopTimeout = 1000
cleanupInterval = 15 * time.Minute
defaultTransactionTimeout = 10 * time.Minute
jsonRPCVersion = "2.0"
)

// Error wraps RPC errors, which contain an error code in addition to the message.
Expand All @@ -56,17 +59,19 @@ type Error interface {

// Connection represents a connection to an RPC server.
type Connection struct {
csdk *csdk.CSDK
idCounter int64
blockNumberNotify func(int64)
notifyLock sync.Mutex
csdk *csdk.CSDK
idCounter int64
blockNumberNotify func(int64)
notifyLock sync.Mutex
transactionHandlers *cache.Cache
closed bool
}

type requestOp struct {
ids []json.RawMessage
err error
resp chan *jsonrpcMessage // receives up to len(ids) responses
respChanData *csdk.CallbackChan
handler func(*types.Receipt, error)
}

type EventLogRespResult struct {
Expand All @@ -86,7 +91,7 @@ type eventLogResp struct {
Status int `json:"status"`
}

func (op *requestOp) waitRpcMessage(ctx context.Context) (*jsonrpcMessage, interface{}, error) {
func (op *requestOp) waitRpcMessage() (*jsonrpcMessage, interface{}, error) {
respBody := <-op.respChanData.Data
var respData jsonrpcMessage
if respBody.Err == nil {
Expand Down Expand Up @@ -138,7 +143,8 @@ func NewConnectionByFile(configFile, groupID string, privateKey []byte) (*Connec
if err != nil {
return nil, err
}
c := &Connection{csdk: sdk}
c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)}
go c.processTransactionResponses()

Check warning on line 147 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L146-L147

Added lines #L146 - L147 were not covered by tests
return c, nil
}

Expand Down Expand Up @@ -167,14 +173,53 @@ func NewConnection(config *Config) (*Connection, error) {
if err != nil {
return nil, err
}
c := &Connection{csdk: sdk}
c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)}
go c.processTransactionResponses()
return c, nil
}

func (c *Connection) GetCSDK() *csdk.CSDK {
return c.csdk
}

func (c *Connection) processTransactionResponses() {
for {
if !c.closed {
items := c.transactionHandlers.Items()
for key, item := range items {
op := item.Object.(*requestOp)
if len(op.respChanData.Data) > 0 {
go func() {
resp, _, err := op.waitRpcMessage()
if err != nil {
op.handler(nil, err)
return

Check warning on line 195 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L194-L195

Added lines #L194 - L195 were not covered by tests
}
if resp.Error != nil {
op.handler(nil, resp.Error)
return

Check warning on line 199 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L198-L199

Added lines #L198 - L199 were not covered by tests
}
if len(resp.Result) == 0 {
op.handler(nil, errors.New("result is null"))
return

Check warning on line 203 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L202-L203

Added lines #L202 - L203 were not covered by tests
}
var receipt types.Receipt
err = json.Unmarshal(resp.Result, &receipt)
if err != nil {
op.handler(nil, fmt.Errorf("unmarshal receipt error: %v", err))
return

Check warning on line 209 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L208-L209

Added lines #L208 - L209 were not covered by tests
}
op.handler(&receipt, nil)
}()
c.transactionHandlers.Delete(key)
}
}
} else {
return

Check warning on line 217 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L216-L217

Added lines #L216 - L217 were not covered by tests
}
}

}

func (c *Connection) nextID() int64 {
id := atomic.AddInt64(&c.idCounter, 1)
return id
Expand All @@ -193,6 +238,7 @@ func (c *Connection) NewMessage(method string, paramsIn ...interface{}) (*jsonrp

// Close closes the client, aborting any in-flight requests.
func (c *Connection) Close() {
c.closed = true

Check warning on line 241 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L241

Added line #L241 was not covered by tests
c.csdk.Close()
}

Expand Down Expand Up @@ -264,7 +310,7 @@ func (c *Connection) SubscribeAmopTopic(topic string, handler func(data []byte,
}

func (c *Connection) PublishAmopTopicMessage(ctx context.Context, topic string, data []byte, handler func([]byte, error)) error {
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 100)}}
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 1)}}

Check warning on line 313 in v3/client/connection.go

View check run for this annotation

Codecov / codecov/patch

v3/client/connection.go#L313

Added line #L313 was not covered by tests
c.csdk.PublishAmopTopicMsg(op.respChanData, topic, data, amopTimeout)
go func() {
select {
Expand Down Expand Up @@ -294,7 +340,7 @@ func (c *Connection) Call(result interface{}, method string, args ...interface{}
// can also pass nil, in which case the result is ignored.
func (c *Connection) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
//logrus.Infof("CallContext method:%s\n", method)
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 100)}}
op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 1)}}
switch method {
case "call":
arg := args[0].(map[string]interface{})
Expand Down Expand Up @@ -399,36 +445,17 @@ func (c *Connection) CallContext(ctx context.Context, result interface{}, method
}
// async send transaction
if handler != nil {
go func() {
resp, _, err := op.waitRpcMessage(ctx)
if err != nil {
handler(nil, err)
return
}
if resp.Error != nil {
handler(nil, resp.Error)
return
}
if len(resp.Result) == 0 {
handler(nil, errors.New("result is null"))
return
}
var receipt types.Receipt
err = json.Unmarshal(resp.Result, &receipt)
if err != nil {
handler(nil, fmt.Errorf("unmarshal receipt error: %v", err))
return
}
handler(&receipt, nil)
}()
op.handler = handler
pointer := fmt.Sprintf("%p", op.respChanData)
c.transactionHandlers.Set(pointer, op, defaultTransactionTimeout)
return nil
}
default:
return ErrNoRpcMethod
}

// dispatch has accepted the request and will close the channel when it quits.
switch resp, _, err := op.waitRpcMessage(ctx); {
switch resp, _, err := op.waitRpcMessage(); {
case err != nil:
return err
case resp.Error != nil:
Expand Down
2 changes: 1 addition & 1 deletion v3/examples/parallelok/manual/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func main() {
var wg2 sync.WaitGroup
sendBar := progressbar.Default(int64(*totalTx), "send")
receiveBar := progressbar.Default(int64(*totalTx), "receive")
limiter := rate.NewLimiter(rate.Limit(*totalTx), *qps)
limiter := rate.NewLimiter(rate.Limit(*qps), *qps)
for i := 0; i < *totalTx; i++ {
from := i % *userCount
to := (i + *userCount/2) % *userCount
Expand Down
7 changes: 7 additions & 0 deletions v3/examples/parallelok/wrapper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/FISCO-BCOS/go-sdk/v3/types"
"github.com/schollz/progressbar/v3"
flag "github.com/spf13/pflag"
"golang.org/x/time/rate"
)

func main() {
Expand Down Expand Up @@ -126,6 +127,8 @@ func main() {
var wg2 sync.WaitGroup
sendBar := progressbar.Default(int64(*totalTx), "send")
receiveBar := progressbar.Default(int64(*totalTx), "receive")
limiter := rate.NewLimiter(rate.Limit(*qps), *qps)

// routineCount := (qps + 4000) / 4000
// sended := int64(0)
// for i := 0; i < routineCount; i++ {
Expand Down Expand Up @@ -184,6 +187,10 @@ func main() {
from := i % *userCount
to := (i + *userCount/2) % *userCount
amount := int64(1)
err = limiter.Wait(context.Background())
if err != nil {
log.Fatalf("limiter Wait error: %v", err)
}
_, err = transfer.AsyncTransfer(func(receipt *types.Receipt, err error) {
receiveBar.Add(1)
if err != nil {
Expand Down

0 comments on commit f46e238

Please sign in to comment.