diff --git a/v3/client/connection.go b/v3/client/connection.go index cdf65a70..37884ead 100644 --- a/v3/client/connection.go +++ b/v3/client/connection.go @@ -23,6 +23,8 @@ import ( "sync/atomic" "time" + cache "github.com/patrickmn/go-cache" + "github.com/FISCO-BCOS/bcos-c-sdk/bindings/go/csdk" "github.com/FISCO-BCOS/go-sdk/v3/types" "github.com/ethereum/go-ethereum/common" @@ -41,11 +43,12 @@ var ( const ( // Timeouts - tcpKeepAliveInterval = 30 * time.Second - defaultDialTimeout = 10 * time.Second // used if context has no deadline - subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls - amopTimeout = 1000 - jsonRPCVersion = "2.0" + tcpKeepAliveInterval = 30 * time.Second + subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls + amopTimeout = 1000 + cleanupInterval = 15 * time.Minute + defaultTransactionTimeout = 10 * time.Minute + jsonRPCVersion = "2.0" ) // Error wraps RPC errors, which contain an error code in addition to the message. @@ -56,17 +59,19 @@ type Error interface { // Connection represents a connection to an RPC server. type Connection struct { - csdk *csdk.CSDK - idCounter int64 - blockNumberNotify func(int64) - notifyLock sync.Mutex + csdk *csdk.CSDK + idCounter int64 + blockNumberNotify func(int64) + notifyLock sync.Mutex + transactionHandlers *cache.Cache + closed bool } type requestOp struct { ids []json.RawMessage err error - resp chan *jsonrpcMessage // receives up to len(ids) responses respChanData *csdk.CallbackChan + handler func(*types.Receipt, error) } type EventLogRespResult struct { @@ -86,7 +91,7 @@ type eventLogResp struct { Status int `json:"status"` } -func (op *requestOp) waitRpcMessage(ctx context.Context) (*jsonrpcMessage, interface{}, error) { +func (op *requestOp) waitRpcMessage() (*jsonrpcMessage, interface{}, error) { respBody := <-op.respChanData.Data var respData jsonrpcMessage if respBody.Err == nil { @@ -138,7 +143,8 @@ func NewConnectionByFile(configFile, groupID string, privateKey []byte) (*Connec if err != nil { return nil, err } - c := &Connection{csdk: sdk} + c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)} + go c.processTransactionResponses() return c, nil } @@ -167,14 +173,53 @@ func NewConnection(config *Config) (*Connection, error) { if err != nil { return nil, err } - c := &Connection{csdk: sdk} + c := &Connection{csdk: sdk, transactionHandlers: cache.New(defaultTransactionTimeout, cleanupInterval)} + go c.processTransactionResponses() return c, nil } - func (c *Connection) GetCSDK() *csdk.CSDK { return c.csdk } +func (c *Connection) processTransactionResponses() { + for { + if !c.closed { + items := c.transactionHandlers.Items() + for key, item := range items { + op := item.Object.(*requestOp) + if len(op.respChanData.Data) > 0 { + go func() { + resp, _, err := op.waitRpcMessage() + if err != nil { + op.handler(nil, err) + return + } + if resp.Error != nil { + op.handler(nil, resp.Error) + return + } + if len(resp.Result) == 0 { + op.handler(nil, errors.New("result is null")) + return + } + var receipt types.Receipt + err = json.Unmarshal(resp.Result, &receipt) + if err != nil { + op.handler(nil, fmt.Errorf("unmarshal receipt error: %v", err)) + return + } + op.handler(&receipt, nil) + }() + c.transactionHandlers.Delete(key) + } + } + } else { + return + } + } + +} + func (c *Connection) nextID() int64 { id := atomic.AddInt64(&c.idCounter, 1) return id @@ -193,6 +238,7 @@ func (c *Connection) NewMessage(method string, paramsIn ...interface{}) (*jsonrp // Close closes the client, aborting any in-flight requests. func (c *Connection) Close() { + c.closed = true c.csdk.Close() } @@ -264,7 +310,7 @@ func (c *Connection) SubscribeAmopTopic(topic string, handler func(data []byte, } func (c *Connection) PublishAmopTopicMessage(ctx context.Context, topic string, data []byte, handler func([]byte, error)) error { - op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 100)}} + op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 1)}} c.csdk.PublishAmopTopicMsg(op.respChanData, topic, data, amopTimeout) go func() { select { @@ -294,7 +340,7 @@ func (c *Connection) Call(result interface{}, method string, args ...interface{} // can also pass nil, in which case the result is ignored. func (c *Connection) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { //logrus.Infof("CallContext method:%s\n", method) - op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 100)}} + op := &requestOp{respChanData: &csdk.CallbackChan{Data: make(chan csdk.Response, 1)}} switch method { case "call": arg := args[0].(map[string]interface{}) @@ -399,28 +445,9 @@ func (c *Connection) CallContext(ctx context.Context, result interface{}, method } // async send transaction if handler != nil { - go func() { - resp, _, err := op.waitRpcMessage(ctx) - if err != nil { - handler(nil, err) - return - } - if resp.Error != nil { - handler(nil, resp.Error) - return - } - if len(resp.Result) == 0 { - handler(nil, errors.New("result is null")) - return - } - var receipt types.Receipt - err = json.Unmarshal(resp.Result, &receipt) - if err != nil { - handler(nil, fmt.Errorf("unmarshal receipt error: %v", err)) - return - } - handler(&receipt, nil) - }() + op.handler = handler + pointer := fmt.Sprintf("%p", op.respChanData) + c.transactionHandlers.Set(pointer, op, defaultTransactionTimeout) return nil } default: @@ -428,7 +455,7 @@ func (c *Connection) CallContext(ctx context.Context, result interface{}, method } // dispatch has accepted the request and will close the channel when it quits. - switch resp, _, err := op.waitRpcMessage(ctx); { + switch resp, _, err := op.waitRpcMessage(); { case err != nil: return err case resp.Error != nil: diff --git a/v3/examples/parallelok/manual/main.go b/v3/examples/parallelok/manual/main.go index 7978a26d..e6675f81 100644 --- a/v3/examples/parallelok/manual/main.go +++ b/v3/examples/parallelok/manual/main.go @@ -179,7 +179,7 @@ func main() { var wg2 sync.WaitGroup sendBar := progressbar.Default(int64(*totalTx), "send") receiveBar := progressbar.Default(int64(*totalTx), "receive") - limiter := rate.NewLimiter(rate.Limit(*totalTx), *qps) + limiter := rate.NewLimiter(rate.Limit(*qps), *qps) for i := 0; i < *totalTx; i++ { from := i % *userCount to := (i + *userCount/2) % *userCount diff --git a/v3/examples/parallelok/wrapper/main.go b/v3/examples/parallelok/wrapper/main.go index 2b6d83d2..6ae29782 100644 --- a/v3/examples/parallelok/wrapper/main.go +++ b/v3/examples/parallelok/wrapper/main.go @@ -16,6 +16,7 @@ import ( "github.com/FISCO-BCOS/go-sdk/v3/types" "github.com/schollz/progressbar/v3" flag "github.com/spf13/pflag" + "golang.org/x/time/rate" ) func main() { @@ -126,6 +127,8 @@ func main() { var wg2 sync.WaitGroup sendBar := progressbar.Default(int64(*totalTx), "send") receiveBar := progressbar.Default(int64(*totalTx), "receive") + limiter := rate.NewLimiter(rate.Limit(*qps), *qps) + // routineCount := (qps + 4000) / 4000 // sended := int64(0) // for i := 0; i < routineCount; i++ { @@ -184,6 +187,10 @@ func main() { from := i % *userCount to := (i + *userCount/2) % *userCount amount := int64(1) + err = limiter.Wait(context.Background()) + if err != nil { + log.Fatalf("limiter Wait error: %v", err) + } _, err = transfer.AsyncTransfer(func(receipt *types.Receipt, err error) { receiveBar.Add(1) if err != nil {