Skip to content

Commit

Permalink
core/services/synchronization: track go routines to block close (#11443)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Dec 1, 2023
1 parent 699088f commit abac315
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
22 changes: 12 additions & 10 deletions core/services/synchronization/telemetry_ingress_batch_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type telemetryIngressBatchClient struct {
lggr logger.Logger

wgDone sync.WaitGroup
chDone chan struct{}
chDone services.StopChan

telemBufferSize uint
telemMaxBatchSize uint
Expand All @@ -78,7 +78,7 @@ func NewTelemetryIngressBatchClient(url *url.URL, serverPubKeyHex string, ks key
globalLogger: lggr,
logging: logging,
lggr: lggr.Named("TelemetryIngressBatchClient").Named(network).Named(chainID),
chDone: make(chan struct{}),
chDone: make(services.StopChan),
workers: make(map[string]*telemetryIngressBatchWorker),
useUniConn: useUniconn,
}
Expand All @@ -103,22 +103,24 @@ func (tc *telemetryIngressBatchClient) Start(ctx context.Context) error {
// This is used to call RPC methods on the server
if tc.telemClient == nil { // only preset for tests
if tc.useUniConn {
tc.wgDone.Add(1)
go func() {
// Use background context to retry forever to connect
// Blocks until we connect
conn, err := wsrpc.DialUniWithContext(ctx, tc.lggr, tc.url.String(), clientPrivKey, serverPubKey)
defer tc.wgDone.Done()
ctx2, cancel := tc.chDone.NewCtx()
defer cancel()
conn, err := wsrpc.DialUniWithContext(ctx2, tc.lggr, tc.url.String(), clientPrivKey, serverPubKey)
if err != nil {
if ctx.Err() != nil {
if ctx2.Err() != nil {
tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err)
} else {
tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err)
tc.SvcErrBuffer.Append(err)
}
} else {
tc.telemClient = telemPb.NewTelemClient(conn)
tc.close = conn.Close
tc.connected.Store(true)
return
}
tc.telemClient = telemPb.NewTelemClient(conn)
tc.close = conn.Close
tc.connected.Store(true)
}()
} else {
// Spawns a goroutine that will eventually connect
Expand Down
12 changes: 8 additions & 4 deletions core/services/synchronization/telemetry_ingress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ func NewTelemetryIngressClient(url *url.URL, serverPubKeyHex string, ks keystore
}

// Start connects the wsrpc client to the telemetry ingress server
func (tc *telemetryIngressClient) Start(ctx context.Context) error {
func (tc *telemetryIngressClient) Start(context.Context) error {
return tc.StartOnce("TelemetryIngressClient", func() error {
privkey, err := tc.getCSAPrivateKey()
if err != nil {
return err
}

tc.connect(ctx, privkey)
tc.connect(privkey)

return nil
})
Expand All @@ -95,14 +95,15 @@ func (tc *telemetryIngressClient) HealthReport() map[string]error {
return map[string]error{tc.Name(): tc.Healthy()}
}

func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []byte) {
func (tc *telemetryIngressClient) connect(clientPrivKey []byte) {
tc.wgDone.Add(1)

go func() {
defer tc.wgDone.Done()
ctx, cancel := tc.chDone.NewCtx()
defer cancel()

serverPubKey := keys.FromHex(tc.serverPubKeyHex)

conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey), wsrpc.WithLogger(tc.lggr))
if err != nil {
if ctx.Err() != nil {
Expand All @@ -111,6 +112,7 @@ func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []b
tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err)
tc.SvcErrBuffer.Append(err)
}
return
}
defer conn.Close()

Expand All @@ -130,7 +132,9 @@ func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []b
}

func (tc *telemetryIngressClient) handleTelemetry() {
tc.wgDone.Add(1)
go func() {
defer tc.wgDone.Done()
ctx, cancel := tc.chDone.NewCtx()
defer cancel()
for {
Expand Down

0 comments on commit abac315

Please sign in to comment.