Skip to content

Commit

Permalink
Close session on peer going away error
Browse files Browse the repository at this point in the history
  • Loading branch information
wweir committed Dec 13, 2018
1 parent d677634 commit 2daaa8f
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 10 deletions.
1 change: 1 addition & 0 deletions conf/sower.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ blocklist=[
"*.aws.amazon.com", # amazon
"m.media-amazon.com",
"*.awsstatic.com",
"*.s3.amazonaws.com",
"*.cloudfront.net", # atlassian
"synchrony-cdn.atlassian.com",
"avatar-cdn.atlassian.com",
Expand Down
4 changes: 2 additions & 2 deletions dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func manual(w dns.ResponseWriter, r *dns.Msg, domain, dnsServer string) {
return
}

msg, err := dns.Exchange(r, dnsServer+":53") // expose any response
if msg == nil {
msg, err := dns.Exchange(r, dnsServer+":53")
if msg == nil { // expose any response except nil
glog.V(1).Infof("get dns of %s fail: %s", domain, err)
return
}
Expand Down
12 changes: 9 additions & 3 deletions proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func StartClient(server string) {
connCh := listenLocal([]string{":80", ":443"})
reDialCh := make(chan net.Conn, 10)
var conn net.Conn
var count int

for {
sess, err := quic.DialAddr(server, &tls.Config{InsecureSkipVerify: true}, dialConf)
Expand All @@ -25,22 +26,24 @@ func StartClient(server string) {
}
glog.Infof("new session from (%s) to (%s)", sess.LocalAddr(), sess.RemoteAddr())

count = 0
for { // session rotate logic
select {
case conn = <-connCh:
case conn = <-reDialCh:
}
count++

// sync action to reuse sigle sess
if !openStream(conn, sess, reDialCh) {
if !openStream(conn, sess, count, reDialCh) {
sess.Close()
break
}
}
}
}

func openStream(conn net.Conn, sess quic.Session, reDialCh chan<- net.Conn) bool {
func openStream(conn net.Conn, sess quic.Session, count int, reDialCh chan<- net.Conn) bool {
glog.V(2).Infoln("new request from", conn.RemoteAddr())

okCh := make(chan struct{})
Expand All @@ -54,6 +57,9 @@ func openStream(conn net.Conn, sess quic.Session, reDialCh chan<- net.Conn) bool
}
defer stream.Close()

glog.V(2).Infof("START stream\t%d", count)
defer glog.V(2).Infof("CLOSE stream\t%d", count)

select {
case okCh <- struct{}{}:
default:
Expand All @@ -65,7 +71,7 @@ func openStream(conn net.Conn, sess quic.Session, reDialCh chan<- net.Conn) bool
if err := conn.(*net.TCPConn).SetKeepAlive(true); err != nil {
glog.Warningln(err)
}
relay(&streamConn{stream, sess}, conn)
relay(sess, &streamConn{stream, sess}, conn)
conn.Close()
}()

Expand Down
2 changes: 1 addition & 1 deletion proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ func acceptStream(stream quic.Stream, sess quic.Session) {
if err := rc.(*net.TCPConn).SetKeepAlive(true); err != nil {
glog.Warningln(err)
}
relay(rc, conn)
relay(sess, rc, conn)
}
13 changes: 9 additions & 4 deletions proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"math/big"
"net"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -36,18 +37,22 @@ func (s *streamConn) RemoteAddr() net.Addr {
return s.sess.RemoteAddr()
}

func relay(conn1, conn2 net.Conn) {
func relay(sess quic.Session, conn1, conn2 net.Conn) {
wg := &sync.WaitGroup{}
exitFlag := new(int32)
wg.Add(2)
go redirect(conn1, conn2, wg, exitFlag)
redirect(conn2, conn1, wg, exitFlag)
go redirect(sess, conn1, conn2, wg, exitFlag)
redirect(sess, conn2, conn1, wg, exitFlag)
wg.Wait()
}

func redirect(conn1, conn2 net.Conn, wg *sync.WaitGroup, exitFlag *int32) {
func redirect(sess quic.Session, conn1, conn2 net.Conn, wg *sync.WaitGroup, exitFlag *int32) {
if _, err := io.Copy(conn2, conn1); err != nil && (atomic.LoadInt32(exitFlag) == 0) {
glog.V(1).Infof("%s<>%s -> %s<>%s: %s", conn1.RemoteAddr(), conn1.LocalAddr(), conn2.LocalAddr(), conn2.RemoteAddr(), err)

if strings.Contains(err.Error(), "PeerGoingAway") { //for internal package, hard code here
sess.Close()
}
}

// wakeup all conn goroutine
Expand Down

0 comments on commit 2daaa8f

Please sign in to comment.