Skip to content

Commit

Permalink
Merge pull request #8 from getlantern/master
Browse files Browse the repository at this point in the history
Fixed deadlock on concurrent closing
  • Loading branch information
xtaci authored Nov 29, 2016
2 parents b4a6fb8 + f28ad86 commit 427dd80
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
3 changes: 2 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,14 @@ func (s *Session) AcceptStream() (*Stream, error) {
// Close is used to close the session and all streams.
func (s *Session) Close() (err error) {
s.dieLock.Lock()
defer s.dieLock.Unlock()

select {
case <-s.die:
s.dieLock.Unlock()
return errors.New(errBrokenPipe)
default:
close(s.die)
s.dieLock.Unlock()
s.streamLock.Lock()
for k := range s.streams {
s.streams[k].sessionClose()
Expand Down
25 changes: 25 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,31 @@ func TestStreamDoubleClose(t *testing.T) {
session.Close()
}

func TestConcurrentClose(t *testing.T) {
cli, err := net.Dial("tcp", "127.0.0.1:19999")
if err != nil {
t.Fatal(err)
}
session, _ := Client(cli, nil)
numStreams := 100
streams := make([]*Stream, 0, numStreams)
var wg sync.WaitGroup
wg.Add(numStreams)
for i := 0; i < 100; i++ {
stream, _ := session.OpenStream()
streams = append(streams, stream)
}
for _, s := range streams {
stream := s
go func() {
stream.Close()
wg.Done()
}()
}
session.Close()
wg.Wait()
}

func TestTinyReadBuffer(t *testing.T) {
cli, err := net.Dial("tcp", "127.0.0.1:19999")
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,14 @@ func (s *Stream) Write(b []byte) (n int, err error) {
// Close implements io.ReadWriteCloser
func (s *Stream) Close() error {
s.dieLock.Lock()
defer s.dieLock.Unlock()

select {
case <-s.die:
s.dieLock.Unlock()
return errors.New(errBrokenPipe)
default:
close(s.die)
s.dieLock.Unlock()
s.sess.streamClosed(s.id)
_, err := s.sess.writeFrame(newFrame(cmdRST, s.id))
return err
Expand Down

0 comments on commit 427dd80

Please sign in to comment.