Skip to content

Commit

Permalink
copy directly into bytes.Buffer without a tmp buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Apr 8, 2019
1 parent 427ad5f commit 8a71918
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 39 deletions.
59 changes: 24 additions & 35 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,48 +211,29 @@ func (s *Session) returnTokens(n int) {
}
}

// session read a frame from underlying connection
// it's data is pointed to the input buffer
func (s *Session) readFrame(buffer []byte) (f Frame, err error) {
var hdr rawHeader
if _, err := io.ReadFull(s.conn, hdr[:]); err != nil {
return f, errors.Wrap(err, "readFrame")
}

if hdr.Version() != version {
return f, errors.New(errInvalidProtocol)
}

f.ver = hdr.Version()
f.cmd = hdr.Cmd()
f.sid = hdr.StreamID()
if length := hdr.Length(); length > 0 {
f.data = buffer[:length]
if _, err := io.ReadFull(s.conn, f.data); err != nil {
return f, errors.Wrap(err, "readFrame")
}
}
return f, nil
}

// recvLoop keeps on reading from underlying connection if tokens are available
func (s *Session) recvLoop() {
buffer := make([]byte, 1<<16)
var hdr rawHeader

for {
for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
<-s.bucketNotify
}

if f, err := s.readFrame(buffer); err == nil {
// read header first
if _, err := io.ReadFull(s.conn, hdr[:]); err == nil {
if hdr.Version() != version { // just ignore
continue
}
atomic.StoreInt32(&s.dataReady, 1)

switch f.cmd {
sid := hdr.StreamID()
switch hdr.Cmd() {
case cmdNOP:
case cmdSYN:
s.streamLock.Lock()
if _, ok := s.streams[f.sid]; !ok {
stream := newStream(f.sid, s.config.MaxFrameSize, s)
s.streams[f.sid] = stream
if _, ok := s.streams[sid]; !ok {
stream := newStream(sid, s.config.MaxFrameSize, s)
s.streams[sid] = stream
select {
case s.chAccepts <- stream:
case <-s.die:
Expand All @@ -261,19 +242,27 @@ func (s *Session) recvLoop() {
s.streamLock.Unlock()
case cmdFIN:
s.streamLock.Lock()
if stream, ok := s.streams[f.sid]; ok {
if stream, ok := s.streams[sid]; ok {
stream.markRST()
stream.notifyReadEvent()
}
s.streamLock.Unlock()
case cmdPSH:
var written int64
var err error
s.streamLock.Lock()
if stream, ok := s.streams[f.sid]; ok {
atomic.AddInt32(&s.bucket, -int32(len(f.data)))
stream.pushBytes(f.data)
if stream, ok := s.streams[sid]; ok {
written, err = stream.receiveBytes(s.conn, int64(hdr.Length()))
atomic.AddInt32(&s.bucket, -int32(written))
stream.notifyReadEvent()
}
s.streamLock.Unlock()

// read data error
if err != nil {
s.Close()
return
}
default:
s.Close()
return
Expand Down
10 changes: 6 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Stream struct {
id uint32
rstflag int32
sess *Session
buffer bytes.Buffer
buffer *bytes.Buffer
bufferLock sync.Mutex
frameSize int
chReadEvent chan struct{} // notify a read event
Expand All @@ -34,6 +34,7 @@ func newStream(id uint32, frameSize int, sess *Session) *Stream {
s.frameSize = frameSize
s.sess = sess
s.die = make(chan struct{})
s.buffer = new(bytes.Buffer)
return s
}

Expand Down Expand Up @@ -203,11 +204,12 @@ func (s *Stream) RemoteAddr() net.Addr {
return nil
}

// pushBytes a slice into buffer
func (s *Stream) pushBytes(p []byte) {
// receiveBytes receive from the reader and write into the buffer
func (s *Stream) receiveBytes(r io.Reader, sz int64) (written int64, err error) {
s.bufferLock.Lock()
s.buffer.Write(p)
written, err = io.CopyN(s.buffer, r, sz)
s.bufferLock.Unlock()
return
}

// recycleTokens transform remaining bytes to tokens(will truncate buffer)
Expand Down

0 comments on commit 8a71918

Please sign in to comment.