Skip to content

Commit

Permalink
Add reconnection to accesspoint
Browse files Browse the repository at this point in the history
  • Loading branch information
devgianlu committed Sep 25, 2023
1 parent 75da255 commit bdb3a0a
Showing 1 changed file with 44 additions and 5 deletions.
49 changes: 44 additions & 5 deletions ap/ap.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"
)

const pongAckInterval = 120 * time.Second

type Accesspoint struct {
addr librespot.GetAddressFunc

Expand All @@ -35,11 +37,13 @@ type Accesspoint struct {
conn net.Conn
encConn *shannonConn

stop bool
recvLoopStop chan struct{}
recvLoopOnce sync.Once
recvChans map[PacketType][]chan Packet
recvChansLock sync.RWMutex
stop bool
pongAckTickerStop chan struct{}
recvLoopStop chan struct{}
recvLoopOnce sync.Once
recvChans map[PacketType][]chan Packet
recvChansLock sync.RWMutex
lastPongAck time.Time

// reconnectLock is held for writing when performing reconnection and for reading mainly when accessing welcome
// or sending packets. If it's not held, a valid connection (and APWelcome) is available. Be careful not to deadlock
Expand All @@ -51,12 +55,16 @@ type Accesspoint struct {
func NewAccesspoint(addr librespot.GetAddressFunc, deviceId string) (ap *Accesspoint, err error) {
ap = &Accesspoint{addr: addr, deviceId: deviceId}
ap.recvLoopStop = make(chan struct{}, 1)
ap.pongAckTickerStop = make(chan struct{}, 1)
ap.recvChans = make(map[PacketType][]chan Packet)

if err = ap.init(); err != nil {
return nil, err
}

// start the ping ticker, this should stop only if we close the connection definitely
go ap.pongAckTicker()

return ap, nil
}

Expand All @@ -78,6 +86,9 @@ func (ap *Accesspoint) init() (err error) {
return fmt.Errorf("failed dialing accesspoint: %w", err)
}

// set last ping in the future
ap.lastPongAck = time.Now().Add(pongAckInterval)

return nil
}

Expand Down Expand Up @@ -187,6 +198,7 @@ func (ap *Accesspoint) Connect(creds *pb.LoginCredentials) error {
func (ap *Accesspoint) Close() {
ap.stop = true
ap.recvLoopStop <- struct{}{}
ap.pongAckTickerStop <- struct{}{}
_ = ap.conn.Close()
}

Expand Down Expand Up @@ -237,6 +249,7 @@ loop:
break loop
}
case PacketTypePongAck:
ap.lastPongAck = time.Now()
continue
default:
ap.recvChansLock.RLock()
Expand Down Expand Up @@ -280,6 +293,29 @@ loop:
}
}

func (ap *Accesspoint) pongAckTicker() {
ticker := time.NewTicker(pongAckInterval)

loop:
for {
select {
case <-ap.pongAckTickerStop:
break loop
case <-ticker.C:
if time.Since(ap.lastPongAck) > pongAckInterval {
log.Errorf("did not receive last pong ack from accesspoint, %.0fs passed", time.Since(ap.lastPongAck).Seconds())

// closing the connection should make the read on the "recvLoop" fail,
// continue hoping for a new connection
_ = ap.conn.Close()
continue
}
}
}

ticker.Stop()
}

func (ap *Accesspoint) reconnect() (err error) {
if ap.welcome == nil {
return backoff.Permanent(fmt.Errorf("cannot reconnect without APWelcome"))
Expand All @@ -295,6 +331,9 @@ func (ap *Accesspoint) reconnect() (err error) {
return err
}

// if we are here the "recvLoop" has already died, restart it
go ap.recvLoop()

log.Debugf("re-established accesspoint connection")
return nil
}
Expand Down

0 comments on commit bdb3a0a

Please sign in to comment.