From cded3bd97c3a1c01b1e359139a2ee67bc09bdad7 Mon Sep 17 00:00:00 2001 From: Makram Kamaleddine Date: Thu, 2 May 2024 16:28:58 +0300 Subject: [PATCH] syncer lifecycle --- core/services/capreg/syncer.go | 36 ++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/core/services/capreg/syncer.go b/core/services/capreg/syncer.go index b3588ac2a65..0449ab7efec 100644 --- a/core/services/capreg/syncer.go +++ b/core/services/capreg/syncer.go @@ -4,14 +4,18 @@ import ( "context" "time" + commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services" "go.uber.org/multierr" + + "github.com/smartcontractkit/chainlink/v2/core/services" ) var _ services.ServiceCtx = (*syncer)(nil) type syncer struct { + cancel context.CancelFunc + sm commonservices.StateMachine locals []Local lggr logger.Logger } @@ -40,23 +44,39 @@ func NewSyncer(locals []Local, lggr logger.Logger) *syncer { // Close implements services.Service. func (s *syncer) Close() error { - var errs error - for _, local := range s.locals { - if err := local.Close(); err != nil { - errs = multierr.Append(errs, err) + return s.sm.StopOnce("CapabilityRegistrySyncer", func() error { + // cancel the sync loop thats running in the background. + s.cancel() + + // close all the locals consuming the syncer's updates. + var errs error + for _, local := range s.locals { + if err := local.Close(); err != nil { + errs = multierr.Append(errs, err) + } } - } - return errs + + return errs + }) } // Start implements services.Service. func (s *syncer) Start(ctx context.Context) error { + return s.sm.StartOnce("CapabilityRegistrySyncer", func() error { + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + go s.syncLoop(ctx) + return nil + }) +} + +func (s *syncer) syncLoop(ctx context.Context) { tick := time.NewTicker(12 * time.Second) defer tick.Stop() for { select { case <-ctx.Done(): - return nil + return case <-tick.C: latestState := s.refreshOnchainState(ctx) for _, local := range s.locals {