Skip to content

Commit

Permalink
no concurrent dial to same node (#186)
Browse files Browse the repository at this point in the history
* don't allow concurrent dials to same node

* remove timer, use timeout context only

* tidy

* feedback

* add timer for min amount of time between dial attempts
  • Loading branch information
agouin authored Jul 31, 2023
1 parent f1c9264 commit 1944d3d
Showing 1 changed file with 62 additions and 53 deletions.
115 changes: 62 additions & 53 deletions signer/remote_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
cometproto "github.com/cometbft/cometbft/proto/tendermint/types"
)

const connRetrySec = 2

// PrivValidator is a wrapper for tendermint PrivValidator,
// with additional Stop method for safe shutdown.
type PrivValidator interface {
Expand Down Expand Up @@ -73,98 +75,93 @@ func (rs *ReconnRemoteSigner) OnStop() {
}

func (rs *ReconnRemoteSigner) establishConnection(ctx context.Context) (net.Conn, error) {
ctx, cancel := context.WithTimeout(ctx, connRetrySec*time.Second)
defer cancel()

proto, address := cometnet.ProtocolAndAddress(rs.address)
netConn, err := rs.dialer.DialContext(ctx, proto, address)
if err != nil {

return nil, fmt.Errorf("dial error: %w", err)
}

conn, err := cometp2pconn.MakeSecretConnection(netConn, rs.privKey)
if err != nil {
netConn.Close()
return nil, fmt.Errorf("secret connection error: %w", err)
}

return conn, nil
}

type conWrapper struct {
conn net.Conn
}

func (rs *ReconnRemoteSigner) attemptConnection(ctx context.Context, cw *conWrapper, connected chan<- bool) {
conn, err := rs.establishConnection(ctx)
if err != nil {
sentryConnectTries.Add(float64(1))
totalSentryConnectTries.Inc()

rs.Logger.Error("Error establishing connection", "err", err)
return
}

cw.conn = conn

sentryConnectTries.Set(0)

rs.Logger.Info("Connected to Sentry", "address", rs.address)
connected <- true

}

// main loop for ReconnRemoteSigner
func (rs *ReconnRemoteSigner) loop(ctx context.Context) {
var cw conWrapper
var conn net.Conn
for {
if !rs.IsRunning() {
if cw.conn != nil {
if err := cw.conn.Close(); err != nil {
rs.Logger.Error("Close", "err", err.Error()+"closing listener failed")
}
}
rs.closeConn(conn)
return
}

ticker := time.NewTicker(1 * time.Second)
connected := make(chan bool)
retries := 0
for conn == nil {
var err error
timer := time.NewTimer(connRetrySec * time.Second)
conn, err = rs.establishConnection(ctx)
if err == nil {
sentryConnectTries.Set(0)
timer.Stop()
rs.Logger.Info("Connected to Sentry", "address", rs.address)
break
}

ConnLoop:
for cw.conn == nil {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go rs.attemptConnection(ctx, &cw, connected)
sentryConnectTries.Add(1)
totalSentryConnectTries.Inc()
retries++
rs.Logger.Error(
"Error establishing connection, will retry",
"sleep (s)", connRetrySec,
"address", rs.address,
"attempt", retries,
"err", err,
)
select {
case <-connected:
break ConnLoop
case <-ticker.C:
cancel()
rs.Logger.Info("Retrying", "sleep (s)", 1, "address", rs.address)
case <-ctx.Done():
return
case <-timer.C:
continue
}
}

// since dialing can take time, we check running again
if !rs.IsRunning() {
if err := cw.conn.Close(); err != nil {
rs.Logger.Error("Close", "err", err.Error()+"closing listener failed")
}
rs.closeConn(conn)
return
}

req, err := ReadMsg(cw.conn)
req, err := ReadMsg(conn)
if err != nil {
rs.Logger.Error("readMsg", "err", err)
cw.conn.Close()
cw.conn = nil
rs.Logger.Error(
"Failed to read message from connection",
"address", rs.address,
"err", err,
)
rs.closeConn(conn)
conn = nil
continue
}

// handleRequest handles request errors. We always send back a response
res := rs.handleRequest(req)

err = WriteMsg(cw.conn, res)
err = WriteMsg(conn, res)
if err != nil {
rs.Logger.Error("writeMsg", "err", err)
cw.conn.Close()
cw.conn = nil
rs.Logger.Error(
"Failed to write message to connection",
"address", rs.address,
"err", err,
)
rs.closeConn(conn)
conn = nil
}
}
}
Expand Down Expand Up @@ -410,3 +407,15 @@ func StartRemoteSigners(
}
return services, err
}

func (rs *ReconnRemoteSigner) closeConn(conn net.Conn) {
if conn == nil {
return
}
if err := conn.Close(); err != nil {
rs.Logger.Error("Failed to close connection to chain node",
"address", rs.address,
"err", err,
)
}
}

0 comments on commit 1944d3d

Please sign in to comment.