diff --git a/consensus/vbft/node_utils.go b/consensus/vbft/node_utils.go
index de99361bf4..0e0a733363 100644
--- a/consensus/vbft/node_utils.go
+++ b/consensus/vbft/node_utils.go
@@ -414,7 +414,7 @@ func (self *Server) sendToPeer(peerIdx uint32, data []byte) error {
cons := msgpack.NewConsensus(msg)
p2pid, present := self.peerPool.getP2pId(peerIdx)
if present {
- go self.p2p.SendTo(p2pid, cons)
+ return self.p2p.TrySendToAsync(p2pid, cons)
} else {
log.Errorf("sendToPeer transmit failed index:%d", peerIdx)
}
diff --git a/p2pserver/dht/kbucket/table_test.go b/p2pserver/dht/kbucket/table_test.go
index e7aeb4c24a..29701109d7 100644
--- a/p2pserver/dht/kbucket/table_test.go
+++ b/p2pserver/dht/kbucket/table_test.go
@@ -311,7 +311,7 @@ func TestTableFindMultipleBuckets(t *testing.T) {
local := genpeerID()
localID := local
- rt := NewRoutingTable(5, local.Id)
+ rt := NewRoutingTable(10, local.Id)
peers := make([]*common.PeerKeyId, 100)
for i := 0; i < 100; i++ {
diff --git a/p2pserver/link/link.go b/p2pserver/link/link.go
index 8e67dedeb2..3069c6781e 100644
--- a/p2pserver/link/link.go
+++ b/p2pserver/link/link.go
@@ -22,7 +22,10 @@ import (
"bufio"
"errors"
"fmt"
+ "io"
"net"
+ "runtime"
+ "sync/atomic"
"time"
comm "github.com/ontio/ontology/common"
@@ -31,6 +34,10 @@ import (
"github.com/ontio/ontology/p2pserver/message/types"
)
+const SEND_THROTTLE_SIZE = 512 * 1024
+
+var ErrBufferFull = errors.New("send buffers full")
+
//Link used to establish
type Link struct {
id common.PeerId
@@ -39,67 +46,113 @@ type Link struct {
time time.Time // The latest time the node activity
recvChan chan *types.MsgPayload //msgpayload channel
reqRecord map[string]int64 //Map RequestId to Timestamp, using for rejecting duplicate request in specific time
+
+ sendBuffer *SendBuffer
}
-func NewLink() *Link {
- link := &Link{
- reqRecord: make(map[string]int64),
+type buffData struct {
+ data []byte
+ result chan error
+}
+
+type SendBuffer struct {
+ ThrottleSize uint64 // read only once set
+
+ bufferSize int64 // atomic read/write
+ buffers *LockFreeList
+}
+
+func (self *SendBuffer) Close() {
+ self.buffers.TakeAndSeal()
+ atomic.StoreInt64(&self.bufferSize, 0)
+}
+
+// return true if exceed throttle size
+func (self *SendBuffer) IncrBuffSize(size int) bool {
+ newVal := atomic.AddInt64(&self.bufferSize, int64(size))
+ return newVal > int64(size)+SEND_THROTTLE_SIZE
+}
+
+func (self *SendBuffer) TryPush(packet []byte) error {
+ if self.IncrBuffSize(len(packet)) {
+ self.IncrBuffSize(-len(packet))
+ return ErrBufferFull
}
- return link
+ if !self.buffers.Push(buffData{data: packet, result: nil}) {
+ self.IncrBuffSize(-len(packet))
+ return io.ErrClosedPipe
+ }
+
+ return nil
+}
+
+// blocking until data writen to io
+func (self *SendBuffer) Push(packet []byte) error {
+ result := make(chan error)
+ self.IncrBuffSize(len(packet))
+ if !self.buffers.Push(buffData{data: packet, result: result}) {
+ self.IncrBuffSize(-len(packet))
+ return io.ErrClosedPipe
+ }
+
+ return <-result
}
-//SetID set peer id to link
-func (this *Link) SetID(id common.PeerId) {
- this.id = id
+func NewLink(id common.PeerId, conn net.Conn) *Link {
+ link := &Link{
+ id: id,
+ sendBuffer: &SendBuffer{ThrottleSize: SEND_THROTTLE_SIZE, buffers: &LockFreeList{}},
+ reqRecord: make(map[string]int64),
+ time: time.Now(),
+ conn: conn,
+ addr: conn.RemoteAddr().String(),
+ }
+
+ return link
}
//GetID return if from peer
-func (this *Link) GetID() common.PeerId {
- return this.id
+func (self *Link) GetID() common.PeerId {
+ return self.id
}
//If there is connection return true
-func (this *Link) Valid() bool {
- return this.conn != nil
+func (self *Link) Valid() bool {
+ return self.conn != nil
}
//set message channel for link layer
-func (this *Link) SetChan(msgchan chan *types.MsgPayload) {
- this.recvChan = msgchan
+func (self *Link) SetChan(msgchan chan *types.MsgPayload) {
+ self.recvChan = msgchan
}
//get address
-func (this *Link) GetAddr() string {
- return this.addr
-}
-
-//set address
-func (this *Link) SetAddr(addr string) {
- this.addr = addr
+func (self *Link) GetAddr() string {
+ return self.addr
}
//get connection
-func (this *Link) GetConn() net.Conn {
- return this.conn
+func (self *Link) GetConn() net.Conn {
+ return self.conn
}
//set connection
-func (this *Link) SetConn(conn net.Conn) {
- this.conn = conn
+func (self *Link) SetConn(conn net.Conn) {
+ self.conn = conn
}
-//record latest message time
-func (this *Link) UpdateRXTime(t time.Time) {
- this.time = t
+//GetRXTime return the latest message time
+func (self *Link) GetRXTime() time.Time {
+ return self.time
}
-//GetRXTime return the latest message time
-func (this *Link) GetRXTime() time.Time {
- return this.time
+func (self *Link) StartReadWriteLoop() {
+ go self.readLoop()
+ go self.sendLoop()
}
-func (this *Link) Rx() {
- conn := this.conn
+func (self *Link) readLoop() {
+ conn := self.conn
if conn == nil {
return
}
@@ -109,48 +162,63 @@ func (this *Link) Rx() {
for {
msg, payloadSize, err := types.ReadMessage(reader)
if err != nil {
- log.Infof("[p2p]error read from %s :%s", this.GetAddr(), err.Error())
+ log.Infof("[p2p]error read from %s :%s", self.GetAddr(), err.Error())
break
}
- t := time.Now()
- this.UpdateRXTime(t)
+ self.time = time.Now()
- if !this.needSendMsg(msg) {
- log.Debugf("skip handle msgType:%s from:%d", msg.CmdType(), this.id)
+ if !self.needSendMsg(msg) {
+ log.Debugf("skip handle msgType:%s from:%d", msg.CmdType(), self.id)
continue
}
- this.addReqRecord(msg)
- this.recvChan <- &types.MsgPayload{
- Id: this.id,
- Addr: this.addr,
+ self.addReqRecord(msg)
+ self.recvChan <- &types.MsgPayload{
+ Id: self.id,
+ Addr: self.addr,
PayloadSize: payloadSize,
Payload: msg,
}
}
- this.CloseConn()
+ self.CloseConn()
}
//close connection
-func (this *Link) CloseConn() {
- if this.conn != nil {
- this.conn.Close()
- this.conn = nil
+func (self *Link) CloseConn() {
+ self.sendBuffer.Close()
+ if self.conn != nil {
+ _ = self.conn.Close()
+ self.conn = nil
}
}
-func (this *Link) Send(msg types.Message) error {
+func (self *Link) Send(msg types.Message) error {
+ sink := comm.NewZeroCopySink(nil)
+ types.WriteMessage(sink, msg)
+
+ return self.SendRaw(sink.Bytes())
+}
+
+func (self *Link) TrySendRaw(packet []byte) error {
+ return self.sendBuffer.TryPush(packet)
+}
+
+func (self *Link) TrySend(msg types.Message) error {
sink := comm.NewZeroCopySink(nil)
types.WriteMessage(sink, msg)
+ return self.TrySendRaw(sink.Bytes())
+}
- return this.SendRaw(sink.Bytes())
+func (self *Link) SendRaw(rawPacket []byte) error {
+ return self.sendBuffer.Push(rawPacket)
}
-func (this *Link) SendRaw(rawPacket []byte) error {
- conn := this.conn
+// only called by sendLoop
+func (self *Link) writeToConn(rawPacket []byte) error {
+ conn := self.conn
if conn == nil {
return errors.New("[p2p]tx link invalid")
}
@@ -164,8 +232,8 @@ func (this *Link) SendRaw(rawPacket []byte) error {
_ = conn.SetWriteDeadline(time.Now().Add(time.Duration(nCount*common.WRITE_DEADLINE) * time.Second))
_, err := conn.Write(rawPacket)
if err != nil {
- log.Infof("[p2p] error sending messge to %s :%s", this.GetAddr(), err.Error())
- this.CloseConn()
+ log.Infof("[p2p] error sending messge to %s :%s", self.GetAddr(), err.Error())
+ self.CloseConn()
return err
}
@@ -173,7 +241,7 @@ func (this *Link) SendRaw(rawPacket []byte) error {
}
//needSendMsg check whether the msg is needed to push to channel
-func (this *Link) needSendMsg(msg types.Message) bool {
+func (self *Link) needSendMsg(msg types.Message) bool {
if msg.CmdType() != common.GET_DATA_TYPE {
return true
}
@@ -181,7 +249,7 @@ func (this *Link) needSendMsg(msg types.Message) bool {
reqID := fmt.Sprintf("%x%s", dataReq.DataType, dataReq.Hash.ToHexString())
now := time.Now().Unix()
- if t, ok := this.reqRecord[reqID]; ok {
+ if t, ok := self.reqRecord[reqID]; ok {
if int(now-t) < common.REQ_INTERVAL {
return false
}
@@ -190,20 +258,66 @@ func (this *Link) needSendMsg(msg types.Message) bool {
}
//addReqRecord add request record by removing outdated request records
-func (this *Link) addReqRecord(msg types.Message) {
+func (self *Link) addReqRecord(msg types.Message) {
if msg.CmdType() != common.GET_DATA_TYPE {
return
}
now := time.Now().Unix()
- if len(this.reqRecord) >= common.MAX_REQ_RECORD_SIZE-1 {
- for id := range this.reqRecord {
- t := this.reqRecord[id]
+ if len(self.reqRecord) >= common.MAX_REQ_RECORD_SIZE-1 {
+ for id := range self.reqRecord {
+ t := self.reqRecord[id]
if int(now-t) > common.REQ_INTERVAL {
- delete(this.reqRecord, id)
+ delete(self.reqRecord, id)
}
}
}
var dataReq = msg.(*types.DataReq)
reqID := fmt.Sprintf("%x%s", dataReq.DataType, dataReq.Hash.ToHexString())
- this.reqRecord[reqID] = now
+ self.reqRecord[reqID] = now
+}
+
+const sendBufSize = 64 * 1024
+
+func (self *Link) sendLoop() {
+ buffers := make([]byte, 0, sendBufSize)
+ var results []chan error
+ buffList := make([]buffData, 0, 64)
+ for {
+ owned, sealed := self.sendBuffer.buffers.Take()
+ if sealed {
+ return
+ }
+ buffList = getBufferData(buffList[:0], owned)
+ if len(buffList) > 0 {
+ for i := len(buffList) - 1; i >= 0; i -= 1 {
+ buffers = append(buffers, buffList[i].data...)
+ if buffList[i].result != nil {
+ results = append(results, buffList[i].result)
+ }
+ if len(buffers) >= sendBufSize/2 || i == 0 {
+ err := self.writeToConn(buffers)
+ self.sendBuffer.IncrBuffSize(-len(buffers))
+ for _, c := range results {
+ c <- err
+ }
+ if err != nil {
+ return
+ }
+ buffers = buffers[:0]
+ results = results[:0]
+ }
+ }
+ } else {
+ // no buffer has been taken, yield this goroutine to avoid busy loop
+ runtime.Gosched()
+ }
+ }
+}
+
+func getBufferData(buffList []buffData, owned *OwnedList) []buffData {
+ for buf := owned.Pop(); buf != nil; buf = owned.Pop() {
+ buffList = append(buffList, buf.(buffData))
+ }
+
+ return buffList
}
diff --git a/p2pserver/link/link_test.go b/p2pserver/link/link_test.go
index a8475e5a41..7944227ca2 100644
--- a/p2pserver/link/link_test.go
+++ b/p2pserver/link/link_test.go
@@ -20,85 +20,23 @@ package link
import (
"math/rand"
+ "net"
+ "runtime"
+ "sync"
"testing"
"time"
"github.com/ontio/ontology-crypto/keypair"
"github.com/ontio/ontology/account"
comm "github.com/ontio/ontology/common"
- "github.com/ontio/ontology/common/log"
ct "github.com/ontio/ontology/core/types"
"github.com/ontio/ontology/p2pserver/common"
+ msgpack "github.com/ontio/ontology/p2pserver/message/msg_pack"
mt "github.com/ontio/ontology/p2pserver/message/types"
+ "github.com/stretchr/testify/assert"
)
-var (
- cliLink *Link
- serverLink *Link
- cliChan chan *mt.MsgPayload
- serverChan chan *mt.MsgPayload
- cliAddr string
- serAddr string
-)
-
-func init() {
- log.InitLog(log.InfoLog, log.Stdout)
-
- cliLink = NewLink()
- serverLink = NewLink()
- id := common.PseudoPeerIdFromUint64(0x733936)
- cliLink.SetID(id)
- id2 := common.PseudoPeerIdFromUint64(0x8274950)
- serverLink.SetID(id2)
-
- cliChan = make(chan *mt.MsgPayload, 100)
- serverChan = make(chan *mt.MsgPayload, 100)
- //listen ip addr
- cliAddr = "127.0.0.1:50338"
- serAddr = "127.0.0.1:50339"
-}
-
-func TestNewLink(t *testing.T) {
- id := 0x74936295
-
- if cliLink.GetID().ToUint64() != 0x733936 {
- t.Fatal("link GetID failed")
- }
- i := common.PseudoPeerIdFromUint64(uint64(id))
- cliLink.SetID(i)
- if cliLink.GetID().ToUint64() != uint64(id) {
- t.Fatal("link SetID failed")
- }
-
- cliLink.SetChan(cliChan)
- serverLink.SetChan(serverChan)
-
- cliLink.UpdateRXTime(time.Now())
-
- msg := &mt.MsgPayload{
- Id: cliLink.GetID(),
- Addr: cliLink.GetAddr(),
- Payload: &mt.NotFound{comm.UINT256_EMPTY},
- }
- go func() {
- time.Sleep(5000000)
- cliChan <- msg
- }()
-
- timeout := time.NewTimer(time.Second)
- select {
- case <-cliChan:
- t.Log("read data from channel")
- case <-timeout.C:
- timeout.Stop()
- t.Fatal("can`t read data from link channel")
- }
-
-}
-
func TestUnpackBufNode(t *testing.T) {
- cliLink.SetChan(cliChan)
-
msgType := "block"
var msg mt.Message
@@ -127,7 +65,7 @@ func TestUnpackBufNode(t *testing.T) {
payload.Data = append(payload.Data, byte(byteInt))
}
- msg = &mt.Consensus{payload}
+ msg = &mt.Consensus{Cons: payload}
case "consensus":
acct := account.NewAccount("SHA256withECDSA")
key := acct.PubKey()
@@ -183,3 +121,56 @@ func TestUnpackBufNode(t *testing.T) {
sink := comm.NewZeroCopySink(nil)
mt.WriteMessage(sink, msg)
}
+
+func TestLink_Send(t *testing.T) {
+ const N = 100
+ const M = 1000
+ reader, writer := net.Pipe()
+ out := NewLink(common.PeerId{}, writer)
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ out.sendLoop()
+ }()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ wg.Add(2 * N)
+ for i := 0; i < N; i++ {
+ go func() {
+ defer wg.Done()
+ for j := 0; j < M; j++ {
+ for {
+ err := out.TrySend(msgpack.NewPingMsg(1))
+ if err == nil {
+ break
+ }
+ runtime.Gosched()
+ }
+ }
+ }()
+ go func() {
+ defer wg.Done()
+ for j := 0; j < M; j++ {
+ err := out.Send(msgpack.NewPingMsg(1))
+ if err != nil {
+ panic(err)
+ }
+ }
+ }()
+ }
+ }()
+
+ for m := 0; m < 2*N*M; m++ {
+ msg, _, err := mt.ReadMessage(reader)
+ if err != nil {
+ panic(err)
+ }
+
+ assert.Equal(t, msg.(*mt.Ping), msgpack.NewPingMsg(1))
+ }
+ out.CloseConn()
+
+ wg.Wait()
+}
diff --git a/p2pserver/link/lockfree_list.go b/p2pserver/link/lockfree_list.go
new file mode 100644
index 0000000000..6f51241bcb
--- /dev/null
+++ b/p2pserver/link/lockfree_list.go
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+
+package link
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+var sealed = unsafe.Pointer(&innerNode{}) // dummy node
+
+// LockFreeList is a mpmc List support take batch nodes at oneshot
+type LockFreeList struct {
+ head unsafe.Pointer
+}
+
+type OwnedList struct {
+ head unsafe.Pointer
+}
+
+type innerNode struct {
+ next unsafe.Pointer
+ data interface{}
+}
+
+func (self *LockFreeList) Push(data interface{}) bool {
+ node := &innerNode{data: data}
+ for {
+ head := atomic.LoadPointer(&self.head)
+ if head == sealed {
+ return false
+ }
+ node.next = head
+ if atomic.CompareAndSwapPointer(&self.head, head, unsafe.Pointer(node)) {
+ return true
+ }
+ }
+}
+
+func (self *LockFreeList) Sealed() bool {
+ return atomic.LoadPointer(&self.head) == sealed
+}
+
+// return list contains appended node and sealed state
+func (self *LockFreeList) Take() (*OwnedList, bool) {
+ list := atomic.LoadPointer(&self.head)
+ for {
+ if list == sealed || atomic.CompareAndSwapPointer(&self.head, list, nil) {
+ break
+ }
+ list = atomic.LoadPointer(&self.head)
+ }
+
+ return &OwnedList{head: list}, list == sealed
+}
+
+func (self *OwnedList) Pop() interface{} {
+ head := self.head
+ if head == nil || head == sealed {
+ return nil
+ }
+
+ node := (*innerNode)(head)
+ self.head = node.next
+
+ return node.data
+}
+
+func (self *LockFreeList) TakeAndSeal() *OwnedList {
+ return &OwnedList{head: atomic.SwapPointer(&self.head, sealed)}
+}
diff --git a/p2pserver/link/lockfree_list_test.go b/p2pserver/link/lockfree_list_test.go
new file mode 100644
index 0000000000..d3d138e89b
--- /dev/null
+++ b/p2pserver/link/lockfree_list_test.go
@@ -0,0 +1,211 @@
+/*
+ * Copyright (C) 2018 The ontology Authors
+ * This file is part of The ontology library.
+ *
+ * The ontology is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * The ontology is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with The ontology. If not, see .
+ */
+
+package link
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// go test -race
+
+func TestLockfreeList_Push(t *testing.T) {
+ list := &LockFreeList{}
+ N := 200
+ wg := &sync.WaitGroup{}
+ wg.Add(N)
+ for i := 0; i < N; i++ {
+ go func() {
+ defer wg.Done()
+ for j := 0; j < N; j++ {
+ list.Push([]byte{1})
+ }
+ }()
+ }
+
+ wg.Wait()
+ owned, _ := list.Take()
+ for i := 0; i < N*N; i++ {
+ buf := owned.Pop()
+ assert.Equal(t, buf, []byte{1})
+ }
+}
+
+func TestLockfreeList_Take(t *testing.T) {
+ list := &LockFreeList{}
+ N := 200
+ wg := &sync.WaitGroup{}
+ wg.Add(N + 1)
+ for i := 0; i < N; i++ {
+ go func() {
+ defer wg.Done()
+ for j := 0; j < N; j++ {
+ list.Push([]byte{1})
+ }
+ }()
+ }
+
+ go func() {
+ defer wg.Done()
+ for i := 0; i < N*N; i++ {
+ owned, _ := list.Take()
+ for buf := owned.Pop(); buf != nil; buf = owned.Pop() {
+ assert.Equal(t, buf, []byte{1})
+ }
+ }
+ }()
+
+ wg.Wait()
+}
+
+func TestLockfreeList_ConcurrentTake(t *testing.T) {
+ list := &LockFreeList{}
+ N := 200
+ wg := &sync.WaitGroup{}
+ wg.Add(N * 2)
+ pushed := uint32(0)
+ for i := 0; i < N; i++ {
+ go func() {
+ defer wg.Done()
+ for j := 0; j < N; j++ {
+ if list.Push([]byte{1}) {
+ atomic.AddUint32(&pushed, 1)
+ }
+ }
+ }()
+ }
+
+ poped := uint32(0)
+ for i := 0; i < N; i++ {
+ go func() {
+ defer wg.Done()
+
+ owned, _ := list.Take()
+ for buf := owned.Pop(); buf != nil; buf = owned.Pop() {
+ assert.Equal(t, buf, []byte{1})
+ atomic.AddUint32(&poped, 1)
+ }
+ }()
+ }
+
+ wg.Wait()
+ owned, _ := list.Take()
+ for buf := owned.Pop(); buf != nil; buf = owned.Pop() {
+ assert.Equal(t, buf, []byte{1})
+ atomic.AddUint32(&poped, 1)
+ }
+
+ assert.Equal(t, pushed, poped)
+}
+
+func TestLockfreeList_TakeAndSeal(t *testing.T) {
+ list := &LockFreeList{}
+ N := 200
+ wg := &sync.WaitGroup{}
+ wg.Add(N + 1)
+ pushed := uint32(0)
+ for i := 0; i < N; i++ {
+ go func() {
+ defer wg.Done()
+ for j := 0; j < N; j++ {
+ if list.Push([]byte{1}) {
+ atomic.AddUint32(&pushed, 1)
+ }
+ }
+ }()
+ }
+
+ poped := uint32(0)
+ go func() {
+ defer wg.Done()
+
+ owned := list.TakeAndSeal()
+ for buf := owned.Pop(); buf != nil; buf = owned.Pop() {
+ assert.Equal(t, buf, []byte{1})
+ poped += 1
+ }
+ }()
+
+ wg.Wait()
+
+ assert.Equal(t, pushed, poped)
+}
+
+func BenchmarkLockfreeList_Push(b *testing.B) {
+ list := &LockFreeList{}
+ G := 10
+ wg := &sync.WaitGroup{}
+ wg.Add(G)
+ for g := 0; g < G; g++ {
+ go func() {
+ defer wg.Done()
+ for i := 0; i < b.N; i++ {
+ list.Push([]byte{1})
+ }
+ }()
+ }
+ wg.Wait()
+}
+
+type LockedList struct {
+ sync.Mutex
+ list [][]byte
+}
+
+func BenchmarkLockedList_Push(b *testing.B) {
+ list := &LockedList{}
+ G := 10
+ wg := &sync.WaitGroup{}
+ wg.Add(G)
+ for g := 0; g < G; g++ {
+ go func() {
+ defer wg.Done()
+ for i := 0; i < b.N; i++ {
+ list.Lock()
+ list.list = append(list.list, []byte{1})
+ list.Unlock()
+ }
+ }()
+ }
+ wg.Wait()
+}
+
+func BenchmarkArray_SinglePush(b *testing.B) {
+ var list [][]byte
+ for i := 0; i < b.N; i++ {
+ list = append(list, []byte{1})
+ }
+}
+
+func BenchmarkPreAllocArray_SinglePush(b *testing.B) {
+ list := make([][]byte, 0, b.N)
+ for i := 0; i < b.N; i++ {
+ list = append(list, []byte{1})
+ }
+}
+
+func BenchmarkLockfreeList_SinglePush(b *testing.B) {
+ list := &LockFreeList{}
+ for i := 0; i < b.N; i++ {
+ list.Push([]byte{1})
+ }
+}
diff --git a/p2pserver/net/netserver/nbr_peers.go b/p2pserver/net/netserver/nbr_peers.go
index 5e266c32df..8f85ae2253 100644
--- a/p2pserver/net/netserver/nbr_peers.go
+++ b/p2pserver/net/netserver/nbr_peers.go
@@ -94,7 +94,9 @@ func (this *NbrPeers) Broadcast(msg types.Message) {
defer this.RUnlock()
for _, node := range this.List {
if node.Peer.GetRelay() {
- go node.Peer.SendRaw(msg.CmdType(), sink.Bytes())
+ // try send and drop message if link is full
+ _ = node.Peer.SendRawAsync(sink.Bytes())
+ //go node.Peer.SendRaw(sink.Bytes())
}
}
}
diff --git a/p2pserver/net/netserver/netserver.go b/p2pserver/net/netserver/netserver.go
index bccb8e5ac9..3e34082681 100644
--- a/p2pserver/net/netserver/netserver.go
+++ b/p2pserver/net/netserver/netserver.go
@@ -21,7 +21,6 @@ package netserver
import (
"errors"
"net"
- "time"
"github.com/ontio/ontology/common/config"
"github.com/ontio/ontology/common/log"
@@ -226,11 +225,11 @@ func (this *NetServer) connect(addr string) error {
if err != nil {
return err
}
- remotePeer := createPeer(peerInfo, conn)
+ remotePeer := peer.NewPeer(peerInfo, conn)
remotePeer.AttachChan(this.NetChan)
this.ReplacePeer(remotePeer)
- go remotePeer.Link.Rx()
+ go remotePeer.Link.StartReadWriteLoop()
this.protocol.HandleSystemMessage(this, p2p.PeerConnected{Info: remotePeer.Info})
return nil
@@ -263,11 +262,11 @@ func (this *NetServer) handleClientConnection(conn net.Conn) error {
if err != nil {
return err
}
- remotePeer := createPeer(peerInfo, conn)
+ remotePeer := peer.NewPeer(peerInfo, conn)
remotePeer.AttachChan(this.NetChan)
this.ReplacePeer(remotePeer)
- go remotePeer.Link.Rx()
+ go remotePeer.Link.StartReadWriteLoop()
this.protocol.HandleSystemMessage(this, p2p.PeerConnected{Info: remotePeer.Info})
return nil
}
@@ -301,17 +300,6 @@ func (this *NetServer) IsOwnAddress(addr string) bool {
return addr == this.connCtrl.OwnAddress()
}
-func createPeer(info *peer.PeerInfo, conn net.Conn) *peer.Peer {
- remotePeer := peer.NewPeer()
- remotePeer.SetInfo(info)
- remotePeer.Link.UpdateRXTime(time.Now())
- remotePeer.Link.SetAddr(conn.RemoteAddr().String())
- remotePeer.Link.SetConn(conn)
- remotePeer.Link.SetID(info.Id)
-
- return remotePeer
-}
-
func (ns *NetServer) ConnectController() *connect_controller.ConnectController {
return ns.connCtrl
}
@@ -326,3 +314,12 @@ func (this *NetServer) SendTo(p common.PeerId, msg types.Message) {
this.Send(peer, msg)
}
}
+
+func (this *NetServer) TrySendToAsync(p common.PeerId, msg types.Message) error {
+ peer := this.GetPeer(p)
+ if peer != nil {
+ return peer.TrySendAsync(msg)
+ }
+
+ return nil
+}
diff --git a/p2pserver/net/protocol/server.go b/p2pserver/net/protocol/server.go
index a0026e31a1..1c5f4140fe 100644
--- a/p2pserver/net/protocol/server.go
+++ b/p2pserver/net/protocol/server.go
@@ -38,6 +38,7 @@ type P2P interface {
SetHeight(uint64)
Send(p *peer.Peer, msg types.Message) error
SendTo(p common.PeerId, msg types.Message)
+ TrySendToAsync(p common.PeerId, msg types.Message) error
GetOutConnRecordLen() uint
Broadcast(msg types.Message)
IsOwnAddress(addr string) bool
diff --git a/p2pserver/peer/peer.go b/p2pserver/peer/peer.go
index b5675a9142..0c0d1197b0 100644
--- a/p2pserver/peer/peer.go
+++ b/p2pserver/peer/peer.go
@@ -21,6 +21,7 @@ package peer
import (
"errors"
"fmt"
+ "io"
"net"
"strconv"
"strings"
@@ -85,16 +86,13 @@ type Peer struct {
}
//NewPeer return new peer without publickey initial
-func NewPeer() *Peer {
+func NewPeer(info *PeerInfo, c net.Conn) *Peer {
p := &Peer{
- Info: &PeerInfo{},
- Link: conn.NewLink(),
+ Info: info,
+ Link: conn.NewLink(info.Id, c),
}
- return p
-}
-func (self *Peer) SetInfo(info *PeerInfo) {
- self.Info = info
+ return p
}
func (self *PeerInfo) String() string {
@@ -135,13 +133,20 @@ func (this *Peer) GetPort() uint16 {
}
//SendTo call sync link to send buffer
-func (this *Peer) SendRaw(msgType string, msgPayload []byte) error {
+func (this *Peer) SendRaw(msgPayload []byte) error {
if this.Link != nil && this.Link.Valid() {
return this.Link.SendRaw(msgPayload)
}
return errors.New("[p2p]sync link invalid")
}
+func (this *Peer) SendRawAsync(msgPayload []byte) error {
+ if this.Link != nil && this.Link.Valid() {
+ return this.Link.TrySendRaw(msgPayload)
+ }
+ return io.ErrClosedPipe
+}
+
//Close halt sync connection
func (this *Peer) Close() {
this.connLock.Lock()
@@ -210,7 +215,13 @@ func (this *Peer) Send(msg types.Message) error {
sink := comm.NewZeroCopySink(nil)
types.WriteMessage(sink, msg)
- return this.SendRaw(msg.CmdType(), sink.Bytes())
+ return this.SendRaw(sink.Bytes())
+}
+
+func (this *Peer) TrySendAsync(msg types.Message) error {
+ sink := comm.NewZeroCopySink(nil)
+ types.WriteMessage(sink, msg)
+ return this.SendRawAsync(sink.Bytes())
}
//GetHttpInfoPort return peer`s httpinfo port
@@ -222,17 +233,3 @@ func (this *Peer) GetHttpInfoPort() uint16 {
func (this *Peer) SetHttpInfoPort(port uint16) {
this.Info.HttpInfoPort = port
}
-
-//UpdateInfo update peer`s information
-func (this *Peer) UpdateInfo(t time.Time, version uint32, services uint64,
- syncPort uint16, kid common.PeerId, relay uint8, height uint64, softVer string) {
- this.Info.Id = kid
- this.Info.Version = version
- this.Info.Services = services
- this.Info.Port = syncPort
- this.Info.SoftVersion = softVer
- this.Info.Relay = relay != 0
- this.Info.Height = height
-
- this.Link.UpdateRXTime(t)
-}