Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support async send with buffer size limit #1180

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/vbft/node_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (self *Server) sendToPeer(peerIdx uint32, data []byte) error {
cons := msgpack.NewConsensus(msg)
p2pid, present := self.peerPool.getP2pId(peerIdx)
if present {
go self.p2p.SendTo(p2pid, cons)
return self.p2p.TrySendToAsync(p2pid, cons)
} else {
log.Errorf("sendToPeer transmit failed index:%d", peerIdx)
}
Expand Down
2 changes: 1 addition & 1 deletion p2pserver/dht/kbucket/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func TestTableFindMultipleBuckets(t *testing.T) {
local := genpeerID()
localID := local

rt := NewRoutingTable(5, local.Id)
rt := NewRoutingTable(10, local.Id)

peers := make([]*common.PeerKeyId, 100)
for i := 0; i < 100; i++ {
Expand Down
234 changes: 174 additions & 60 deletions p2pserver/link/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import (
"bufio"
"errors"
"fmt"
"io"
"net"
"runtime"
"sync/atomic"
"time"

comm "github.com/ontio/ontology/common"
Expand All @@ -31,6 +34,10 @@ import (
"github.com/ontio/ontology/p2pserver/message/types"
)

const SEND_THROTTLE_SIZE = 512 * 1024

var ErrBufferFull = errors.New("send buffers full")

//Link used to establish
type Link struct {
id common.PeerId
Expand All @@ -39,67 +46,113 @@ type Link struct {
time time.Time // The latest time the node activity
recvChan chan *types.MsgPayload //msgpayload channel
reqRecord map[string]int64 //Map RequestId to Timestamp, using for rejecting duplicate request in specific time

sendBuffer *SendBuffer
}

func NewLink() *Link {
link := &Link{
reqRecord: make(map[string]int64),
type buffData struct {
data []byte
result chan error
}

type SendBuffer struct {
ThrottleSize uint64 // read only once set

bufferSize int64 // atomic read/write
buffers *LockFreeList
}

func (self *SendBuffer) Close() {
self.buffers.TakeAndSeal()
atomic.StoreInt64(&self.bufferSize, 0)
}

// return true if exceed throttle size
func (self *SendBuffer) IncrBuffSize(size int) bool {
newVal := atomic.AddInt64(&self.bufferSize, int64(size))
return newVal > int64(size)+SEND_THROTTLE_SIZE
}

func (self *SendBuffer) TryPush(packet []byte) error {
if self.IncrBuffSize(len(packet)) {
self.IncrBuffSize(-len(packet))
return ErrBufferFull
}
return link
if !self.buffers.Push(buffData{data: packet, result: nil}) {
self.IncrBuffSize(-len(packet))
return io.ErrClosedPipe
}

return nil
}

// blocking until data writen to io
func (self *SendBuffer) Push(packet []byte) error {
result := make(chan error)
self.IncrBuffSize(len(packet))
if !self.buffers.Push(buffData{data: packet, result: result}) {
self.IncrBuffSize(-len(packet))
return io.ErrClosedPipe
}

return <-result
}

//SetID set peer id to link
func (this *Link) SetID(id common.PeerId) {
this.id = id
func NewLink(id common.PeerId, conn net.Conn) *Link {
link := &Link{
id: id,
sendBuffer: &SendBuffer{ThrottleSize: SEND_THROTTLE_SIZE, buffers: &LockFreeList{}},
reqRecord: make(map[string]int64),
time: time.Now(),
conn: conn,
addr: conn.RemoteAddr().String(),
}

return link
}

//GetID return if from peer
func (this *Link) GetID() common.PeerId {
return this.id
func (self *Link) GetID() common.PeerId {
return self.id
}

//If there is connection return true
func (this *Link) Valid() bool {
return this.conn != nil
func (self *Link) Valid() bool {
return self.conn != nil
}

//set message channel for link layer
func (this *Link) SetChan(msgchan chan *types.MsgPayload) {
this.recvChan = msgchan
func (self *Link) SetChan(msgchan chan *types.MsgPayload) {
self.recvChan = msgchan
}

//get address
func (this *Link) GetAddr() string {
return this.addr
}

//set address
func (this *Link) SetAddr(addr string) {
this.addr = addr
func (self *Link) GetAddr() string {
return self.addr
}

//get connection
func (this *Link) GetConn() net.Conn {
return this.conn
func (self *Link) GetConn() net.Conn {
return self.conn
}

//set connection
func (this *Link) SetConn(conn net.Conn) {
this.conn = conn
func (self *Link) SetConn(conn net.Conn) {
self.conn = conn
}

//record latest message time
func (this *Link) UpdateRXTime(t time.Time) {
this.time = t
//GetRXTime return the latest message time
func (self *Link) GetRXTime() time.Time {
return self.time
}

//GetRXTime return the latest message time
func (this *Link) GetRXTime() time.Time {
return this.time
func (self *Link) StartReadWriteLoop() {
go self.readLoop()
go self.sendLoop()
}

func (this *Link) Rx() {
conn := this.conn
func (self *Link) readLoop() {
conn := self.conn
if conn == nil {
return
}
Expand All @@ -109,48 +162,63 @@ func (this *Link) Rx() {
for {
msg, payloadSize, err := types.ReadMessage(reader)
if err != nil {
log.Infof("[p2p]error read from %s :%s", this.GetAddr(), err.Error())
log.Infof("[p2p]error read from %s :%s", self.GetAddr(), err.Error())
break
}

t := time.Now()
this.UpdateRXTime(t)
self.time = time.Now()

if !this.needSendMsg(msg) {
log.Debugf("skip handle msgType:%s from:%d", msg.CmdType(), this.id)
if !self.needSendMsg(msg) {
log.Debugf("skip handle msgType:%s from:%d", msg.CmdType(), self.id)
continue
}

this.addReqRecord(msg)
this.recvChan <- &types.MsgPayload{
Id: this.id,
Addr: this.addr,
self.addReqRecord(msg)
self.recvChan <- &types.MsgPayload{
Id: self.id,
Addr: self.addr,
PayloadSize: payloadSize,
Payload: msg,
}

}

this.CloseConn()
self.CloseConn()
}

//close connection
func (this *Link) CloseConn() {
if this.conn != nil {
this.conn.Close()
this.conn = nil
func (self *Link) CloseConn() {
self.sendBuffer.Close()
if self.conn != nil {
_ = self.conn.Close()
self.conn = nil
}
}

func (this *Link) Send(msg types.Message) error {
func (self *Link) Send(msg types.Message) error {
sink := comm.NewZeroCopySink(nil)
types.WriteMessage(sink, msg)

return self.SendRaw(sink.Bytes())
}

func (self *Link) TrySendRaw(packet []byte) error {
return self.sendBuffer.TryPush(packet)
}

func (self *Link) TrySend(msg types.Message) error {
sink := comm.NewZeroCopySink(nil)
types.WriteMessage(sink, msg)
return self.TrySendRaw(sink.Bytes())
}

return this.SendRaw(sink.Bytes())
func (self *Link) SendRaw(rawPacket []byte) error {
return self.sendBuffer.Push(rawPacket)
}

func (this *Link) SendRaw(rawPacket []byte) error {
conn := this.conn
// only called by sendLoop
func (self *Link) writeToConn(rawPacket []byte) error {
conn := self.conn
if conn == nil {
return errors.New("[p2p]tx link invalid")
}
Expand All @@ -164,24 +232,24 @@ func (this *Link) SendRaw(rawPacket []byte) error {
_ = conn.SetWriteDeadline(time.Now().Add(time.Duration(nCount*common.WRITE_DEADLINE) * time.Second))
_, err := conn.Write(rawPacket)
if err != nil {
log.Infof("[p2p] error sending messge to %s :%s", this.GetAddr(), err.Error())
this.CloseConn()
log.Infof("[p2p] error sending messge to %s :%s", self.GetAddr(), err.Error())
self.CloseConn()
return err
}

return nil
}

//needSendMsg check whether the msg is needed to push to channel
func (this *Link) needSendMsg(msg types.Message) bool {
func (self *Link) needSendMsg(msg types.Message) bool {
if msg.CmdType() != common.GET_DATA_TYPE {
return true
}
var dataReq = msg.(*types.DataReq)
reqID := fmt.Sprintf("%x%s", dataReq.DataType, dataReq.Hash.ToHexString())
now := time.Now().Unix()

if t, ok := this.reqRecord[reqID]; ok {
if t, ok := self.reqRecord[reqID]; ok {
if int(now-t) < common.REQ_INTERVAL {
return false
}
Expand All @@ -190,20 +258,66 @@ func (this *Link) needSendMsg(msg types.Message) bool {
}

//addReqRecord add request record by removing outdated request records
func (this *Link) addReqRecord(msg types.Message) {
func (self *Link) addReqRecord(msg types.Message) {
if msg.CmdType() != common.GET_DATA_TYPE {
return
}
now := time.Now().Unix()
if len(this.reqRecord) >= common.MAX_REQ_RECORD_SIZE-1 {
for id := range this.reqRecord {
t := this.reqRecord[id]
if len(self.reqRecord) >= common.MAX_REQ_RECORD_SIZE-1 {
for id := range self.reqRecord {
t := self.reqRecord[id]
if int(now-t) > common.REQ_INTERVAL {
delete(this.reqRecord, id)
delete(self.reqRecord, id)
}
}
}
var dataReq = msg.(*types.DataReq)
reqID := fmt.Sprintf("%x%s", dataReq.DataType, dataReq.Hash.ToHexString())
this.reqRecord[reqID] = now
self.reqRecord[reqID] = now
}

const sendBufSize = 64 * 1024

func (self *Link) sendLoop() {
buffers := make([]byte, 0, sendBufSize)
var results []chan error
buffList := make([]buffData, 0, 64)
for {
owned, sealed := self.sendBuffer.buffers.Take()
if sealed {
return
}
buffList = getBufferData(buffList[:0], owned)
if len(buffList) > 0 {
for i := len(buffList) - 1; i >= 0; i -= 1 {
buffers = append(buffers, buffList[i].data...)
if buffList[i].result != nil {
results = append(results, buffList[i].result)
}
if len(buffers) >= sendBufSize/2 || i == 0 {
err := self.writeToConn(buffers)
self.sendBuffer.IncrBuffSize(-len(buffers))
for _, c := range results {
c <- err
}
if err != nil {
return
}
buffers = buffers[:0]
results = results[:0]
}
}
} else {
// no buffer has been taken, yield this goroutine to avoid busy loop
runtime.Gosched()
Copy link
Member

@Honglei-Cong Honglei-Cong May 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be implemented with channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cause sending a notification for every msg to wake the writer, which will hurt performance in high contention scenario.

}
}
}

func getBufferData(buffList []buffData, owned *OwnedList) []buffData {
for buf := owned.Pop(); buf != nil; buf = owned.Pop() {
buffList = append(buffList, buf.(buffData))
}

return buffList
}
Loading