diff --git a/clientconn.go b/clientconn.go index 3443345..5be50dd 100644 --- a/clientconn.go +++ b/clientconn.go @@ -71,7 +71,7 @@ type response struct { func (r *response) GetFrame() (*Frame, error) { frame := <-r.Frame if frame == nil { - return nil, ErrConnAlreadyClosed + return nil, ErrStreamClosed } return frame, nil } @@ -80,7 +80,7 @@ func (r *response) GetFrameWithContext(ctx context.Context) (*Frame, error) { select { case frame := <-r.Frame: if frame == nil { - return nil, ErrConnAlreadyClosed + return nil, ErrStreamClosed } return frame, nil case <-ctx.Done(): @@ -329,6 +329,9 @@ var ( ErrNoNewUUID = errors.New("no new uuid available temporary") // ErrConnAlreadyClosed when try to operate on an already closed conn ErrConnAlreadyClosed = errors.New("connection already closed") + // ErrStreamClosed used by Response + // reason may be: 1. reset by peer 2. connection closed + ErrStreamClosed = errors.New("stream already closed") ) func (conn *Connection) nextRequestID() uint64 { @@ -521,7 +524,11 @@ func (conn *Connection) readFrames() { delete(conn.respes, frame.RequestID) conn.mu.Unlock() - resp.SetResponse(frame) + if frame.Flags.IsRst() { + resp.Close() + } else { + resp.SetResponse(frame) + } } } diff --git a/framereader.go b/framereader.go index 3240649..1149274 100644 --- a/framereader.go +++ b/framereader.go @@ -57,7 +57,7 @@ start: s.ResetByPeer() } - goto start + return f, nil } s, loaded := cs.CreateOrGetStream(dfr.ctx, requestID, flags) if !loaded { diff --git a/serveconn.go b/serveconn.go index f2cb076..500dae3 100644 --- a/serveconn.go +++ b/serveconn.go @@ -390,7 +390,11 @@ func (sc *serveconn) readFrames() (err error) { } ci.l.Unlock() if ok { - resp.SetResponse(req) + if req.Flags.IsRst() { + resp.Close() + } else { + resp.SetResponse(req) + } continue } } else {