From e2fc0f1cb5ca8c08dac28e689422199d6b73233c Mon Sep 17 00:00:00 2001 From: laizy Date: Sat, 28 Mar 2020 11:08:14 +0800 Subject: [PATCH 1/4] support async send with lockfree list --- consensus/vbft/node_utils.go | 2 +- p2pserver/link/link.go | 168 +++++++++++++-------- p2pserver/link/link_test.go | 69 +-------- p2pserver/link/lockfree_list.go | 90 ++++++++++++ p2pserver/link/lockfree_list_test.go | 211 +++++++++++++++++++++++++++ p2pserver/net/netserver/nbr_peers.go | 2 +- p2pserver/net/netserver/netserver.go | 27 ++-- p2pserver/net/protocol/server.go | 1 + p2pserver/peer/peer.go | 41 +++--- 9 files changed, 443 insertions(+), 168 deletions(-) create mode 100644 p2pserver/link/lockfree_list.go create mode 100644 p2pserver/link/lockfree_list_test.go diff --git a/consensus/vbft/node_utils.go b/consensus/vbft/node_utils.go index de99361bf4..c72567da10 100644 --- a/consensus/vbft/node_utils.go +++ b/consensus/vbft/node_utils.go @@ -414,7 +414,7 @@ func (self *Server) sendToPeer(peerIdx uint32, data []byte) error { cons := msgpack.NewConsensus(msg) p2pid, present := self.peerPool.getP2pId(peerIdx) if present { - go self.p2p.SendTo(p2pid, cons) + self.p2p.SendToAsync(p2pid, cons) } else { log.Errorf("sendToPeer transmit failed index:%d", peerIdx) } diff --git a/p2pserver/link/link.go b/p2pserver/link/link.go index 8e67dedeb2..bdfeb588af 100644 --- a/p2pserver/link/link.go +++ b/p2pserver/link/link.go @@ -22,7 +22,9 @@ import ( "bufio" "errors" "fmt" + "io" "net" + "runtime" "time" comm "github.com/ontio/ontology/common" @@ -39,67 +41,65 @@ type Link struct { time time.Time // The latest time the node activity recvChan chan *types.MsgPayload //msgpayload channel reqRecord map[string]int64 //Map RequestId to Timestamp, using for rejecting duplicate request in specific time + + sendBuf *LockFreeList } -func NewLink() *Link { +func NewLink(id common.PeerId, conn net.Conn) *Link { link := &Link{ + id: id, + sendBuf: &LockFreeList{}, reqRecord: make(map[string]int64), + time: time.Now(), + conn: conn, + addr: conn.RemoteAddr().String(), } - return link -} -//SetID set peer id to link -func (this *Link) SetID(id common.PeerId) { - this.id = id + return link } //GetID return if from peer -func (this *Link) GetID() common.PeerId { - return this.id +func (self *Link) GetID() common.PeerId { + return self.id } //If there is connection return true -func (this *Link) Valid() bool { - return this.conn != nil +func (self *Link) Valid() bool { + return self.conn != nil } //set message channel for link layer -func (this *Link) SetChan(msgchan chan *types.MsgPayload) { - this.recvChan = msgchan +func (self *Link) SetChan(msgchan chan *types.MsgPayload) { + self.recvChan = msgchan } //get address -func (this *Link) GetAddr() string { - return this.addr -} - -//set address -func (this *Link) SetAddr(addr string) { - this.addr = addr +func (self *Link) GetAddr() string { + return self.addr } //get connection -func (this *Link) GetConn() net.Conn { - return this.conn +func (self *Link) GetConn() net.Conn { + return self.conn } //set connection -func (this *Link) SetConn(conn net.Conn) { - this.conn = conn +func (self *Link) SetConn(conn net.Conn) { + self.conn = conn } -//record latest message time -func (this *Link) UpdateRXTime(t time.Time) { - this.time = t +//GetRXTime return the latest message time +func (self *Link) GetRXTime() time.Time { + return self.time } -//GetRXTime return the latest message time -func (this *Link) GetRXTime() time.Time { - return this.time +func (self *Link) StartReadWriteLoop() { + go self.readLoop() + go self.sendLoop() } -func (this *Link) Rx() { - conn := this.conn +func (self *Link) readLoop() { + conn := self.conn if conn == nil { return } @@ -109,48 +109,62 @@ func (this *Link) Rx() { for { msg, payloadSize, err := types.ReadMessage(reader) if err != nil { - log.Infof("[p2p]error read from %s :%s", this.GetAddr(), err.Error()) + log.Infof("[p2p]error read from %s :%s", self.GetAddr(), err.Error()) break } - t := time.Now() - this.UpdateRXTime(t) + self.time = time.Now() - if !this.needSendMsg(msg) { - log.Debugf("skip handle msgType:%s from:%d", msg.CmdType(), this.id) + if !self.needSendMsg(msg) { + log.Debugf("skip handle msgType:%s from:%d", msg.CmdType(), self.id) continue } - this.addReqRecord(msg) - this.recvChan <- &types.MsgPayload{ - Id: this.id, - Addr: this.addr, + self.addReqRecord(msg) + self.recvChan <- &types.MsgPayload{ + Id: self.id, + Addr: self.addr, PayloadSize: payloadSize, Payload: msg, } } - this.CloseConn() + self.CloseConn() } //close connection -func (this *Link) CloseConn() { - if this.conn != nil { - this.conn.Close() - this.conn = nil +func (self *Link) CloseConn() { + self.sendBuf.TakeAndSeal() + if self.conn != nil { + _ = self.conn.Close() + self.conn = nil } } -func (this *Link) Send(msg types.Message) error { +func (self *Link) Send(msg types.Message) error { sink := comm.NewZeroCopySink(nil) types.WriteMessage(sink, msg) - return this.SendRaw(sink.Bytes()) + return self.SendRaw(sink.Bytes()) } -func (this *Link) SendRaw(rawPacket []byte) error { - conn := this.conn +func (self *Link) SendAsync(msg types.Message) error { + sink := comm.NewZeroCopySink(nil) + types.WriteMessage(sink, msg) + return self.SendRawAsync(sink.Bytes()) +} + +func (self *Link) SendRawAsync(packet []byte) error { + if !self.sendBuf.Push(packet) { + return io.ErrClosedPipe + } + + return nil +} + +func (self *Link) SendRaw(rawPacket []byte) error { + conn := self.conn if conn == nil { return errors.New("[p2p]tx link invalid") } @@ -164,8 +178,8 @@ func (this *Link) SendRaw(rawPacket []byte) error { _ = conn.SetWriteDeadline(time.Now().Add(time.Duration(nCount*common.WRITE_DEADLINE) * time.Second)) _, err := conn.Write(rawPacket) if err != nil { - log.Infof("[p2p] error sending messge to %s :%s", this.GetAddr(), err.Error()) - this.CloseConn() + log.Infof("[p2p] error sending messge to %s :%s", self.GetAddr(), err.Error()) + self.CloseConn() return err } @@ -173,7 +187,7 @@ func (this *Link) SendRaw(rawPacket []byte) error { } //needSendMsg check whether the msg is needed to push to channel -func (this *Link) needSendMsg(msg types.Message) bool { +func (self *Link) needSendMsg(msg types.Message) bool { if msg.CmdType() != common.GET_DATA_TYPE { return true } @@ -181,7 +195,7 @@ func (this *Link) needSendMsg(msg types.Message) bool { reqID := fmt.Sprintf("%x%s", dataReq.DataType, dataReq.Hash.ToHexString()) now := time.Now().Unix() - if t, ok := this.reqRecord[reqID]; ok { + if t, ok := self.reqRecord[reqID]; ok { if int(now-t) < common.REQ_INTERVAL { return false } @@ -190,20 +204,56 @@ func (this *Link) needSendMsg(msg types.Message) bool { } //addReqRecord add request record by removing outdated request records -func (this *Link) addReqRecord(msg types.Message) { +func (self *Link) addReqRecord(msg types.Message) { if msg.CmdType() != common.GET_DATA_TYPE { return } now := time.Now().Unix() - if len(this.reqRecord) >= common.MAX_REQ_RECORD_SIZE-1 { - for id := range this.reqRecord { - t := this.reqRecord[id] + if len(self.reqRecord) >= common.MAX_REQ_RECORD_SIZE-1 { + for id := range self.reqRecord { + t := self.reqRecord[id] if int(now-t) > common.REQ_INTERVAL { - delete(this.reqRecord, id) + delete(self.reqRecord, id) } } } var dataReq = msg.(*types.DataReq) reqID := fmt.Sprintf("%x%s", dataReq.DataType, dataReq.Hash.ToHexString()) - this.reqRecord[reqID] = now + self.reqRecord[reqID] = now +} + +const sendBufSize = 64 * 1024 + +func (self *Link) sendLoop() { + buffers := make([]byte, 0, sendBufSize) + buffList := make([][]byte, 0, 64) + for { + owned, sealed := self.sendBuf.Take() + if sealed { + return + } + buffList = getBuffers(buffList[:0], owned) + if len(buffList) > 0 { + for i := len(buffList) - 1; i >= 0; i -= 1 { + buffers = append(buffers, buffList[i]...) + if len(buffers) >= sendBufSize/2 { + if err := self.SendRaw(buffers); err != nil { + return + } + buffers = buffers[:0] + } + } + } else { + // no buffer has been taken, yield this goroutine to avoid busy loop + runtime.Gosched() + } + } +} + +func getBuffers(buffList [][]byte, owned *OwnedList) [][]byte { + for buf := owned.Pop(); buf != nil; buf = owned.Pop() { + buffList = append(buffList, buf) + } + + return buffList } diff --git a/p2pserver/link/link_test.go b/p2pserver/link/link_test.go index a8475e5a41..79dd40ae6e 100644 --- a/p2pserver/link/link_test.go +++ b/p2pserver/link/link_test.go @@ -26,79 +26,12 @@ import ( "github.com/ontio/ontology-crypto/keypair" "github.com/ontio/ontology/account" comm "github.com/ontio/ontology/common" - "github.com/ontio/ontology/common/log" ct "github.com/ontio/ontology/core/types" "github.com/ontio/ontology/p2pserver/common" mt "github.com/ontio/ontology/p2pserver/message/types" ) -var ( - cliLink *Link - serverLink *Link - cliChan chan *mt.MsgPayload - serverChan chan *mt.MsgPayload - cliAddr string - serAddr string -) - -func init() { - log.InitLog(log.InfoLog, log.Stdout) - - cliLink = NewLink() - serverLink = NewLink() - id := common.PseudoPeerIdFromUint64(0x733936) - cliLink.SetID(id) - id2 := common.PseudoPeerIdFromUint64(0x8274950) - serverLink.SetID(id2) - - cliChan = make(chan *mt.MsgPayload, 100) - serverChan = make(chan *mt.MsgPayload, 100) - //listen ip addr - cliAddr = "127.0.0.1:50338" - serAddr = "127.0.0.1:50339" -} - -func TestNewLink(t *testing.T) { - id := 0x74936295 - - if cliLink.GetID().ToUint64() != 0x733936 { - t.Fatal("link GetID failed") - } - i := common.PseudoPeerIdFromUint64(uint64(id)) - cliLink.SetID(i) - if cliLink.GetID().ToUint64() != uint64(id) { - t.Fatal("link SetID failed") - } - - cliLink.SetChan(cliChan) - serverLink.SetChan(serverChan) - - cliLink.UpdateRXTime(time.Now()) - - msg := &mt.MsgPayload{ - Id: cliLink.GetID(), - Addr: cliLink.GetAddr(), - Payload: &mt.NotFound{comm.UINT256_EMPTY}, - } - go func() { - time.Sleep(5000000) - cliChan <- msg - }() - - timeout := time.NewTimer(time.Second) - select { - case <-cliChan: - t.Log("read data from channel") - case <-timeout.C: - timeout.Stop() - t.Fatal("can`t read data from link channel") - } - -} - func TestUnpackBufNode(t *testing.T) { - cliLink.SetChan(cliChan) - msgType := "block" var msg mt.Message @@ -127,7 +60,7 @@ func TestUnpackBufNode(t *testing.T) { payload.Data = append(payload.Data, byte(byteInt)) } - msg = &mt.Consensus{payload} + msg = &mt.Consensus{Cons: payload} case "consensus": acct := account.NewAccount("SHA256withECDSA") key := acct.PubKey() diff --git a/p2pserver/link/lockfree_list.go b/p2pserver/link/lockfree_list.go new file mode 100644 index 0000000000..8f0bbb2a7f --- /dev/null +++ b/p2pserver/link/lockfree_list.go @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package link + +import ( + "sync/atomic" + "unsafe" +) + +var sealed = unsafe.Pointer(&innerNode{}) // dummy node + +// LockFreeList is a mpmc List support take batch nodes at oneshot +type LockFreeList struct { + head unsafe.Pointer +} + +type OwnedList struct { + head unsafe.Pointer +} + +type innerNode struct { + next unsafe.Pointer + buf []byte +} + +func (self *LockFreeList) Push(data []byte) bool { + if len(data) == 0 { + return false + } + node := &innerNode{buf: data} + for { + head := atomic.LoadPointer(&self.head) + if head == sealed { + return false + } + node.next = head + if atomic.CompareAndSwapPointer(&self.head, head, unsafe.Pointer(node)) { + return true + } + } +} + +func (self *LockFreeList) Sealed() bool { + return atomic.LoadPointer(&self.head) == sealed +} + +// return list contains appended node and sealed state +func (self *LockFreeList) Take() (*OwnedList, bool) { + list := atomic.LoadPointer(&self.head) + for { + if list == sealed || atomic.CompareAndSwapPointer(&self.head, list, nil) { + break + } + list = atomic.LoadPointer(&self.head) + } + + return &OwnedList{head: list}, list == sealed +} + +func (self *OwnedList) Pop() []byte { + head := self.head + if head == nil || head == sealed { + return nil + } + + node := (*innerNode)(head) + self.head = node.next + + return node.buf +} + +func (self *LockFreeList) TakeAndSeal() *OwnedList { + return &OwnedList{head: atomic.SwapPointer(&self.head, sealed)} +} diff --git a/p2pserver/link/lockfree_list_test.go b/p2pserver/link/lockfree_list_test.go new file mode 100644 index 0000000000..d3d138e89b --- /dev/null +++ b/p2pserver/link/lockfree_list_test.go @@ -0,0 +1,211 @@ +/* + * Copyright (C) 2018 The ontology Authors + * This file is part of The ontology library. + * + * The ontology is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ontology is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with The ontology. If not, see . + */ + +package link + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" +) + +// go test -race + +func TestLockfreeList_Push(t *testing.T) { + list := &LockFreeList{} + N := 200 + wg := &sync.WaitGroup{} + wg.Add(N) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + for j := 0; j < N; j++ { + list.Push([]byte{1}) + } + }() + } + + wg.Wait() + owned, _ := list.Take() + for i := 0; i < N*N; i++ { + buf := owned.Pop() + assert.Equal(t, buf, []byte{1}) + } +} + +func TestLockfreeList_Take(t *testing.T) { + list := &LockFreeList{} + N := 200 + wg := &sync.WaitGroup{} + wg.Add(N + 1) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + for j := 0; j < N; j++ { + list.Push([]byte{1}) + } + }() + } + + go func() { + defer wg.Done() + for i := 0; i < N*N; i++ { + owned, _ := list.Take() + for buf := owned.Pop(); buf != nil; buf = owned.Pop() { + assert.Equal(t, buf, []byte{1}) + } + } + }() + + wg.Wait() +} + +func TestLockfreeList_ConcurrentTake(t *testing.T) { + list := &LockFreeList{} + N := 200 + wg := &sync.WaitGroup{} + wg.Add(N * 2) + pushed := uint32(0) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + for j := 0; j < N; j++ { + if list.Push([]byte{1}) { + atomic.AddUint32(&pushed, 1) + } + } + }() + } + + poped := uint32(0) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + + owned, _ := list.Take() + for buf := owned.Pop(); buf != nil; buf = owned.Pop() { + assert.Equal(t, buf, []byte{1}) + atomic.AddUint32(&poped, 1) + } + }() + } + + wg.Wait() + owned, _ := list.Take() + for buf := owned.Pop(); buf != nil; buf = owned.Pop() { + assert.Equal(t, buf, []byte{1}) + atomic.AddUint32(&poped, 1) + } + + assert.Equal(t, pushed, poped) +} + +func TestLockfreeList_TakeAndSeal(t *testing.T) { + list := &LockFreeList{} + N := 200 + wg := &sync.WaitGroup{} + wg.Add(N + 1) + pushed := uint32(0) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + for j := 0; j < N; j++ { + if list.Push([]byte{1}) { + atomic.AddUint32(&pushed, 1) + } + } + }() + } + + poped := uint32(0) + go func() { + defer wg.Done() + + owned := list.TakeAndSeal() + for buf := owned.Pop(); buf != nil; buf = owned.Pop() { + assert.Equal(t, buf, []byte{1}) + poped += 1 + } + }() + + wg.Wait() + + assert.Equal(t, pushed, poped) +} + +func BenchmarkLockfreeList_Push(b *testing.B) { + list := &LockFreeList{} + G := 10 + wg := &sync.WaitGroup{} + wg.Add(G) + for g := 0; g < G; g++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + list.Push([]byte{1}) + } + }() + } + wg.Wait() +} + +type LockedList struct { + sync.Mutex + list [][]byte +} + +func BenchmarkLockedList_Push(b *testing.B) { + list := &LockedList{} + G := 10 + wg := &sync.WaitGroup{} + wg.Add(G) + for g := 0; g < G; g++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + list.Lock() + list.list = append(list.list, []byte{1}) + list.Unlock() + } + }() + } + wg.Wait() +} + +func BenchmarkArray_SinglePush(b *testing.B) { + var list [][]byte + for i := 0; i < b.N; i++ { + list = append(list, []byte{1}) + } +} + +func BenchmarkPreAllocArray_SinglePush(b *testing.B) { + list := make([][]byte, 0, b.N) + for i := 0; i < b.N; i++ { + list = append(list, []byte{1}) + } +} + +func BenchmarkLockfreeList_SinglePush(b *testing.B) { + list := &LockFreeList{} + for i := 0; i < b.N; i++ { + list.Push([]byte{1}) + } +} diff --git a/p2pserver/net/netserver/nbr_peers.go b/p2pserver/net/netserver/nbr_peers.go index 5e266c32df..37bb8ccde5 100644 --- a/p2pserver/net/netserver/nbr_peers.go +++ b/p2pserver/net/netserver/nbr_peers.go @@ -94,7 +94,7 @@ func (this *NbrPeers) Broadcast(msg types.Message) { defer this.RUnlock() for _, node := range this.List { if node.Peer.GetRelay() { - go node.Peer.SendRaw(msg.CmdType(), sink.Bytes()) + go node.Peer.SendRaw(sink.Bytes()) } } } diff --git a/p2pserver/net/netserver/netserver.go b/p2pserver/net/netserver/netserver.go index bccb8e5ac9..1a9decd5c1 100644 --- a/p2pserver/net/netserver/netserver.go +++ b/p2pserver/net/netserver/netserver.go @@ -21,7 +21,6 @@ package netserver import ( "errors" "net" - "time" "github.com/ontio/ontology/common/config" "github.com/ontio/ontology/common/log" @@ -226,11 +225,11 @@ func (this *NetServer) connect(addr string) error { if err != nil { return err } - remotePeer := createPeer(peerInfo, conn) + remotePeer := peer.NewPeer(peerInfo, conn) remotePeer.AttachChan(this.NetChan) this.ReplacePeer(remotePeer) - go remotePeer.Link.Rx() + go remotePeer.Link.StartReadWriteLoop() this.protocol.HandleSystemMessage(this, p2p.PeerConnected{Info: remotePeer.Info}) return nil @@ -263,11 +262,11 @@ func (this *NetServer) handleClientConnection(conn net.Conn) error { if err != nil { return err } - remotePeer := createPeer(peerInfo, conn) + remotePeer := peer.NewPeer(peerInfo, conn) remotePeer.AttachChan(this.NetChan) this.ReplacePeer(remotePeer) - go remotePeer.Link.Rx() + go remotePeer.Link.StartReadWriteLoop() this.protocol.HandleSystemMessage(this, p2p.PeerConnected{Info: remotePeer.Info}) return nil } @@ -301,17 +300,6 @@ func (this *NetServer) IsOwnAddress(addr string) bool { return addr == this.connCtrl.OwnAddress() } -func createPeer(info *peer.PeerInfo, conn net.Conn) *peer.Peer { - remotePeer := peer.NewPeer() - remotePeer.SetInfo(info) - remotePeer.Link.UpdateRXTime(time.Now()) - remotePeer.Link.SetAddr(conn.RemoteAddr().String()) - remotePeer.Link.SetConn(conn) - remotePeer.Link.SetID(info.Id) - - return remotePeer -} - func (ns *NetServer) ConnectController() *connect_controller.ConnectController { return ns.connCtrl } @@ -326,3 +314,10 @@ func (this *NetServer) SendTo(p common.PeerId, msg types.Message) { this.Send(peer, msg) } } + +func (this *NetServer) SendToAsync(p common.PeerId, msg types.Message) { + peer := this.GetPeer(p) + if peer != nil { + peer.SendAsync(msg) + } +} diff --git a/p2pserver/net/protocol/server.go b/p2pserver/net/protocol/server.go index a0026e31a1..ccc8433cf7 100644 --- a/p2pserver/net/protocol/server.go +++ b/p2pserver/net/protocol/server.go @@ -38,6 +38,7 @@ type P2P interface { SetHeight(uint64) Send(p *peer.Peer, msg types.Message) error SendTo(p common.PeerId, msg types.Message) + SendToAsync(p common.PeerId, msg types.Message) GetOutConnRecordLen() uint Broadcast(msg types.Message) IsOwnAddress(addr string) bool diff --git a/p2pserver/peer/peer.go b/p2pserver/peer/peer.go index b5675a9142..fa26057f36 100644 --- a/p2pserver/peer/peer.go +++ b/p2pserver/peer/peer.go @@ -85,16 +85,13 @@ type Peer struct { } //NewPeer return new peer without publickey initial -func NewPeer() *Peer { +func NewPeer(info *PeerInfo, c net.Conn) *Peer { p := &Peer{ - Info: &PeerInfo{}, - Link: conn.NewLink(), + Info: info, + Link: conn.NewLink(info.Id, c), } - return p -} -func (self *Peer) SetInfo(info *PeerInfo) { - self.Info = info + return p } func (self *PeerInfo) String() string { @@ -135,13 +132,19 @@ func (this *Peer) GetPort() uint16 { } //SendTo call sync link to send buffer -func (this *Peer) SendRaw(msgType string, msgPayload []byte) error { +func (this *Peer) SendRaw(msgPayload []byte) error { if this.Link != nil && this.Link.Valid() { return this.Link.SendRaw(msgPayload) } return errors.New("[p2p]sync link invalid") } +func (this *Peer) SendRawAsync(msgPayload []byte) { + if this.Link != nil && this.Link.Valid() { + _ = this.Link.SendRawAsync(msgPayload) + } +} + //Close halt sync connection func (this *Peer) Close() { this.connLock.Lock() @@ -210,7 +213,13 @@ func (this *Peer) Send(msg types.Message) error { sink := comm.NewZeroCopySink(nil) types.WriteMessage(sink, msg) - return this.SendRaw(msg.CmdType(), sink.Bytes()) + return this.SendRaw(sink.Bytes()) +} + +func (this *Peer) SendAsync(msg types.Message) { + sink := comm.NewZeroCopySink(nil) + types.WriteMessage(sink, msg) + this.SendRawAsync(sink.Bytes()) } //GetHttpInfoPort return peer`s httpinfo port @@ -222,17 +231,3 @@ func (this *Peer) GetHttpInfoPort() uint16 { func (this *Peer) SetHttpInfoPort(port uint16) { this.Info.HttpInfoPort = port } - -//UpdateInfo update peer`s information -func (this *Peer) UpdateInfo(t time.Time, version uint32, services uint64, - syncPort uint16, kid common.PeerId, relay uint8, height uint64, softVer string) { - this.Info.Id = kid - this.Info.Version = version - this.Info.Services = services - this.Info.Port = syncPort - this.Info.SoftVersion = softVer - this.Info.Relay = relay != 0 - this.Info.Height = height - - this.Link.UpdateRXTime(t) -} From a0989611c1a502859bbcf2d1759365047c4ce2e7 Mon Sep 17 00:00:00 2001 From: laizy Date: Sat, 28 Mar 2020 12:50:47 +0800 Subject: [PATCH 2/4] fix test --- p2pserver/dht/kbucket/table_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2pserver/dht/kbucket/table_test.go b/p2pserver/dht/kbucket/table_test.go index e7aeb4c24a..29701109d7 100644 --- a/p2pserver/dht/kbucket/table_test.go +++ b/p2pserver/dht/kbucket/table_test.go @@ -311,7 +311,7 @@ func TestTableFindMultipleBuckets(t *testing.T) { local := genpeerID() localID := local - rt := NewRoutingTable(5, local.Id) + rt := NewRoutingTable(10, local.Id) peers := make([]*common.PeerKeyId, 100) for i := 0; i < 100; i++ { From cbccad0035a4c288e0855a714ac15760553d9738 Mon Sep 17 00:00:00 2001 From: laizy Date: Wed, 15 Apr 2020 19:32:25 +0800 Subject: [PATCH 3/4] add send buffer size limit and keep ordering of send and trySend --- consensus/vbft/node_utils.go | 2 +- p2pserver/link/link.go | 114 +++++++++++++++++++++------ p2pserver/link/link_test.go | 58 ++++++++++++++ p2pserver/link/lockfree_list.go | 13 ++- p2pserver/net/netserver/netserver.go | 6 +- p2pserver/net/protocol/server.go | 2 +- p2pserver/peer/peer.go | 10 ++- 7 files changed, 164 insertions(+), 41 deletions(-) diff --git a/consensus/vbft/node_utils.go b/consensus/vbft/node_utils.go index c72567da10..0e0a733363 100644 --- a/consensus/vbft/node_utils.go +++ b/consensus/vbft/node_utils.go @@ -414,7 +414,7 @@ func (self *Server) sendToPeer(peerIdx uint32, data []byte) error { cons := msgpack.NewConsensus(msg) p2pid, present := self.peerPool.getP2pId(peerIdx) if present { - self.p2p.SendToAsync(p2pid, cons) + return self.p2p.TrySendToAsync(p2pid, cons) } else { log.Errorf("sendToPeer transmit failed index:%d", peerIdx) } diff --git a/p2pserver/link/link.go b/p2pserver/link/link.go index bdfeb588af..3069c6781e 100644 --- a/p2pserver/link/link.go +++ b/p2pserver/link/link.go @@ -25,6 +25,7 @@ import ( "io" "net" "runtime" + "sync/atomic" "time" comm "github.com/ontio/ontology/common" @@ -33,6 +34,10 @@ import ( "github.com/ontio/ontology/p2pserver/message/types" ) +const SEND_THROTTLE_SIZE = 512 * 1024 + +var ErrBufferFull = errors.New("send buffers full") + //Link used to establish type Link struct { id common.PeerId @@ -42,17 +47,65 @@ type Link struct { recvChan chan *types.MsgPayload //msgpayload channel reqRecord map[string]int64 //Map RequestId to Timestamp, using for rejecting duplicate request in specific time - sendBuf *LockFreeList + sendBuffer *SendBuffer +} + +type buffData struct { + data []byte + result chan error +} + +type SendBuffer struct { + ThrottleSize uint64 // read only once set + + bufferSize int64 // atomic read/write + buffers *LockFreeList +} + +func (self *SendBuffer) Close() { + self.buffers.TakeAndSeal() + atomic.StoreInt64(&self.bufferSize, 0) +} + +// return true if exceed throttle size +func (self *SendBuffer) IncrBuffSize(size int) bool { + newVal := atomic.AddInt64(&self.bufferSize, int64(size)) + return newVal > int64(size)+SEND_THROTTLE_SIZE +} + +func (self *SendBuffer) TryPush(packet []byte) error { + if self.IncrBuffSize(len(packet)) { + self.IncrBuffSize(-len(packet)) + return ErrBufferFull + } + if !self.buffers.Push(buffData{data: packet, result: nil}) { + self.IncrBuffSize(-len(packet)) + return io.ErrClosedPipe + } + + return nil +} + +// blocking until data writen to io +func (self *SendBuffer) Push(packet []byte) error { + result := make(chan error) + self.IncrBuffSize(len(packet)) + if !self.buffers.Push(buffData{data: packet, result: result}) { + self.IncrBuffSize(-len(packet)) + return io.ErrClosedPipe + } + + return <-result } func NewLink(id common.PeerId, conn net.Conn) *Link { link := &Link{ - id: id, - sendBuf: &LockFreeList{}, - reqRecord: make(map[string]int64), - time: time.Now(), - conn: conn, - addr: conn.RemoteAddr().String(), + id: id, + sendBuffer: &SendBuffer{ThrottleSize: SEND_THROTTLE_SIZE, buffers: &LockFreeList{}}, + reqRecord: make(map[string]int64), + time: time.Now(), + conn: conn, + addr: conn.RemoteAddr().String(), } return link @@ -135,7 +188,7 @@ func (self *Link) readLoop() { //close connection func (self *Link) CloseConn() { - self.sendBuf.TakeAndSeal() + self.sendBuffer.Close() if self.conn != nil { _ = self.conn.Close() self.conn = nil @@ -149,21 +202,22 @@ func (self *Link) Send(msg types.Message) error { return self.SendRaw(sink.Bytes()) } -func (self *Link) SendAsync(msg types.Message) error { +func (self *Link) TrySendRaw(packet []byte) error { + return self.sendBuffer.TryPush(packet) +} + +func (self *Link) TrySend(msg types.Message) error { sink := comm.NewZeroCopySink(nil) types.WriteMessage(sink, msg) - return self.SendRawAsync(sink.Bytes()) + return self.TrySendRaw(sink.Bytes()) } -func (self *Link) SendRawAsync(packet []byte) error { - if !self.sendBuf.Push(packet) { - return io.ErrClosedPipe - } - - return nil +func (self *Link) SendRaw(rawPacket []byte) error { + return self.sendBuffer.Push(rawPacket) } -func (self *Link) SendRaw(rawPacket []byte) error { +// only called by sendLoop +func (self *Link) writeToConn(rawPacket []byte) error { conn := self.conn if conn == nil { return errors.New("[p2p]tx link invalid") @@ -226,21 +280,31 @@ const sendBufSize = 64 * 1024 func (self *Link) sendLoop() { buffers := make([]byte, 0, sendBufSize) - buffList := make([][]byte, 0, 64) + var results []chan error + buffList := make([]buffData, 0, 64) for { - owned, sealed := self.sendBuf.Take() + owned, sealed := self.sendBuffer.buffers.Take() if sealed { return } - buffList = getBuffers(buffList[:0], owned) + buffList = getBufferData(buffList[:0], owned) if len(buffList) > 0 { for i := len(buffList) - 1; i >= 0; i -= 1 { - buffers = append(buffers, buffList[i]...) - if len(buffers) >= sendBufSize/2 { - if err := self.SendRaw(buffers); err != nil { + buffers = append(buffers, buffList[i].data...) + if buffList[i].result != nil { + results = append(results, buffList[i].result) + } + if len(buffers) >= sendBufSize/2 || i == 0 { + err := self.writeToConn(buffers) + self.sendBuffer.IncrBuffSize(-len(buffers)) + for _, c := range results { + c <- err + } + if err != nil { return } buffers = buffers[:0] + results = results[:0] } } } else { @@ -250,9 +314,9 @@ func (self *Link) sendLoop() { } } -func getBuffers(buffList [][]byte, owned *OwnedList) [][]byte { +func getBufferData(buffList []buffData, owned *OwnedList) []buffData { for buf := owned.Pop(); buf != nil; buf = owned.Pop() { - buffList = append(buffList, buf) + buffList = append(buffList, buf.(buffData)) } return buffList diff --git a/p2pserver/link/link_test.go b/p2pserver/link/link_test.go index 79dd40ae6e..7944227ca2 100644 --- a/p2pserver/link/link_test.go +++ b/p2pserver/link/link_test.go @@ -20,6 +20,9 @@ package link import ( "math/rand" + "net" + "runtime" + "sync" "testing" "time" @@ -28,7 +31,9 @@ import ( comm "github.com/ontio/ontology/common" ct "github.com/ontio/ontology/core/types" "github.com/ontio/ontology/p2pserver/common" + msgpack "github.com/ontio/ontology/p2pserver/message/msg_pack" mt "github.com/ontio/ontology/p2pserver/message/types" + "github.com/stretchr/testify/assert" ) func TestUnpackBufNode(t *testing.T) { @@ -116,3 +121,56 @@ func TestUnpackBufNode(t *testing.T) { sink := comm.NewZeroCopySink(nil) mt.WriteMessage(sink, msg) } + +func TestLink_Send(t *testing.T) { + const N = 100 + const M = 1000 + reader, writer := net.Pipe() + out := NewLink(common.PeerId{}, writer) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + out.sendLoop() + }() + wg.Add(1) + go func() { + defer wg.Done() + wg.Add(2 * N) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + for j := 0; j < M; j++ { + for { + err := out.TrySend(msgpack.NewPingMsg(1)) + if err == nil { + break + } + runtime.Gosched() + } + } + }() + go func() { + defer wg.Done() + for j := 0; j < M; j++ { + err := out.Send(msgpack.NewPingMsg(1)) + if err != nil { + panic(err) + } + } + }() + } + }() + + for m := 0; m < 2*N*M; m++ { + msg, _, err := mt.ReadMessage(reader) + if err != nil { + panic(err) + } + + assert.Equal(t, msg.(*mt.Ping), msgpack.NewPingMsg(1)) + } + out.CloseConn() + + wg.Wait() +} diff --git a/p2pserver/link/lockfree_list.go b/p2pserver/link/lockfree_list.go index 8f0bbb2a7f..6f51241bcb 100644 --- a/p2pserver/link/lockfree_list.go +++ b/p2pserver/link/lockfree_list.go @@ -36,14 +36,11 @@ type OwnedList struct { type innerNode struct { next unsafe.Pointer - buf []byte + data interface{} } -func (self *LockFreeList) Push(data []byte) bool { - if len(data) == 0 { - return false - } - node := &innerNode{buf: data} +func (self *LockFreeList) Push(data interface{}) bool { + node := &innerNode{data: data} for { head := atomic.LoadPointer(&self.head) if head == sealed { @@ -73,7 +70,7 @@ func (self *LockFreeList) Take() (*OwnedList, bool) { return &OwnedList{head: list}, list == sealed } -func (self *OwnedList) Pop() []byte { +func (self *OwnedList) Pop() interface{} { head := self.head if head == nil || head == sealed { return nil @@ -82,7 +79,7 @@ func (self *OwnedList) Pop() []byte { node := (*innerNode)(head) self.head = node.next - return node.buf + return node.data } func (self *LockFreeList) TakeAndSeal() *OwnedList { diff --git a/p2pserver/net/netserver/netserver.go b/p2pserver/net/netserver/netserver.go index 1a9decd5c1..3e34082681 100644 --- a/p2pserver/net/netserver/netserver.go +++ b/p2pserver/net/netserver/netserver.go @@ -315,9 +315,11 @@ func (this *NetServer) SendTo(p common.PeerId, msg types.Message) { } } -func (this *NetServer) SendToAsync(p common.PeerId, msg types.Message) { +func (this *NetServer) TrySendToAsync(p common.PeerId, msg types.Message) error { peer := this.GetPeer(p) if peer != nil { - peer.SendAsync(msg) + return peer.TrySendAsync(msg) } + + return nil } diff --git a/p2pserver/net/protocol/server.go b/p2pserver/net/protocol/server.go index ccc8433cf7..1c5f4140fe 100644 --- a/p2pserver/net/protocol/server.go +++ b/p2pserver/net/protocol/server.go @@ -38,7 +38,7 @@ type P2P interface { SetHeight(uint64) Send(p *peer.Peer, msg types.Message) error SendTo(p common.PeerId, msg types.Message) - SendToAsync(p common.PeerId, msg types.Message) + TrySendToAsync(p common.PeerId, msg types.Message) error GetOutConnRecordLen() uint Broadcast(msg types.Message) IsOwnAddress(addr string) bool diff --git a/p2pserver/peer/peer.go b/p2pserver/peer/peer.go index fa26057f36..0c0d1197b0 100644 --- a/p2pserver/peer/peer.go +++ b/p2pserver/peer/peer.go @@ -21,6 +21,7 @@ package peer import ( "errors" "fmt" + "io" "net" "strconv" "strings" @@ -139,10 +140,11 @@ func (this *Peer) SendRaw(msgPayload []byte) error { return errors.New("[p2p]sync link invalid") } -func (this *Peer) SendRawAsync(msgPayload []byte) { +func (this *Peer) SendRawAsync(msgPayload []byte) error { if this.Link != nil && this.Link.Valid() { - _ = this.Link.SendRawAsync(msgPayload) + return this.Link.TrySendRaw(msgPayload) } + return io.ErrClosedPipe } //Close halt sync connection @@ -216,10 +218,10 @@ func (this *Peer) Send(msg types.Message) error { return this.SendRaw(sink.Bytes()) } -func (this *Peer) SendAsync(msg types.Message) { +func (this *Peer) TrySendAsync(msg types.Message) error { sink := comm.NewZeroCopySink(nil) types.WriteMessage(sink, msg) - this.SendRawAsync(sink.Bytes()) + return this.SendRawAsync(sink.Bytes()) } //GetHttpInfoPort return peer`s httpinfo port From 890bb186e2356d8fcef7cff77b55be3e0b334bed Mon Sep 17 00:00:00 2001 From: laizy Date: Tue, 28 Apr 2020 14:29:07 +0800 Subject: [PATCH 4/4] broadcast message using async send --- p2pserver/net/netserver/nbr_peers.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/p2pserver/net/netserver/nbr_peers.go b/p2pserver/net/netserver/nbr_peers.go index 37bb8ccde5..8f85ae2253 100644 --- a/p2pserver/net/netserver/nbr_peers.go +++ b/p2pserver/net/netserver/nbr_peers.go @@ -94,7 +94,9 @@ func (this *NbrPeers) Broadcast(msg types.Message) { defer this.RUnlock() for _, node := range this.List { if node.Peer.GetRelay() { - go node.Peer.SendRaw(sink.Bytes()) + // try send and drop message if link is full + _ = node.Peer.SendRawAsync(sink.Bytes()) + //go node.Peer.SendRaw(sink.Bytes()) } } }