From 5f38ffbadfb487019f568b9b1bb7614735be67a0 Mon Sep 17 00:00:00 2001 From: krehermann Date: Tue, 9 Jan 2024 16:38:44 -0700 Subject: [PATCH] rm eventbroadcaster completely --- core/cmd/shell.go | 4 - core/internal/cltest/cltest.go | 5 - core/internal/cltest/simulated_backend.go | 8 +- core/services/chainlink/application.go | 10 +- core/services/pg/event_broadcaster.go | 346 -------------------- core/services/pg/event_broadcaster_test.go | 239 -------------- core/services/pg/mocks/event_broadcaster.go | 169 ---------- core/services/pg/mocks/subscription.go | 93 ------ 8 files changed, 3 insertions(+), 871 deletions(-) delete mode 100644 core/services/pg/event_broadcaster.go delete mode 100644 core/services/pg/event_broadcaster_test.go delete mode 100644 core/services/pg/mocks/event_broadcaster.go delete mode 100644 core/services/pg/mocks/subscription.go diff --git a/core/cmd/shell.go b/core/cmd/shell.go index f4386a4faab..547de67210f 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -43,7 +43,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/versioning" @@ -156,8 +155,6 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G keyStore := keystore.New(db, utils.GetScryptParams(cfg), appLggr, cfg.Database()) mailMon := mailbox.NewMonitor(cfg.AppID().String(), appLggr.Named("Mailbox")) - dbListener := cfg.Database().Listener() - eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), dbListener.MinReconnectInterval(), dbListener.MaxReconnectDuration(), appLggr, cfg.AppID()) loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing()) mercuryPool := wsrpc.NewPool(appLggr, cache.Config{ @@ -226,7 +223,6 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G SqlxDB: db, KeyStore: keyStore, RelayerChainInteroperators: relayChainInterops, - EventBroadcaster: eventBroadcaster, MailMon: mailMon, Logger: appLggr, AuditLogger: auditLogger, diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 7bb2a76bc7b..dcd16b8e59c 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -311,8 +311,6 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn auditLogger = audit.NoopLogger } - var eventBroadcaster pg.EventBroadcaster = pg.NewNullEventBroadcaster() - url := cfg.Database().URL() db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database()) require.NoError(t, err) @@ -329,8 +327,6 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn ethClient = dep case webhook.ExternalInitiatorManager: externalInitiatorManager = dep - case pg.EventBroadcaster: - eventBroadcaster = dep default: switch flag { case UseRealExternalInitiatorManager: @@ -415,7 +411,6 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn c := clhttptest.NewTestLocalOnlyHTTPClient() appInstance, err := chainlink.NewApplication(chainlink.ApplicationOpts{ Config: cfg, - EventBroadcaster: eventBroadcaster, MailMon: mailMon, SqlxDB: db, KeyStore: keyStore, diff --git a/core/internal/cltest/simulated_backend.go b/core/internal/cltest/simulated_backend.go index 0aecc7f8324..cde060d7f4a 100644 --- a/core/internal/cltest/simulated_backend.go +++ b/core/internal/cltest/simulated_backend.go @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" "github.com/ethereum/go-ethereum/core" - "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" @@ -16,7 +15,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" ) func NewSimulatedBackend(t *testing.T, alloc core.GenesisAlloc, gasLimit uint32) *backends.SimulatedBackend { @@ -41,9 +39,8 @@ func NewApplicationWithConfigV2OnSimulatedBlockchain( require.Zero(t, evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()).Cmp(testutils.SimulatedChainID)) chainID := big.New(testutils.SimulatedChainID) client := client.NewSimulatedBackendClient(t, backend, testutils.SimulatedChainID) - eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), 0, 0, logger.TestLogger(t), uuid.New()) - flagsAndDeps = append(flagsAndDeps, client, eventBroadcaster, chainID) + flagsAndDeps = append(flagsAndDeps, client, chainID) // app.Stop() will call client.Close on the simulated backend app := NewApplicationWithConfig(t, cfg, flagsAndDeps...) @@ -66,9 +63,8 @@ func NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain( require.Zero(t, evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()).Cmp(testutils.SimulatedChainID)) chainID := big.New(testutils.SimulatedChainID) client := client.NewSimulatedBackendClient(t, backend, testutils.SimulatedChainID) - eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), 0, 0, logger.TestLogger(t), uuid.New()) - flagsAndDeps = append(flagsAndDeps, client, eventBroadcaster, chainID) + flagsAndDeps = append(flagsAndDeps, client, chainID) // app.Stop() will call client.Close on the simulated backend return NewApplicationWithConfigAndKey(t, cfg, flagsAndDeps...) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 71a8b0b33fe..fae938c0db6 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -116,7 +116,6 @@ type Application interface { // in the services package, but the Store has its own package. type ChainlinkApplication struct { relayers *CoreRelayerChainInteroperators - EventBroadcaster pg.EventBroadcaster jobORM job.ORM jobSpawner job.Spawner pipelineORM pipeline.ORM @@ -150,7 +149,6 @@ type ChainlinkApplication struct { type ApplicationOpts struct { Config GeneralConfig Logger logger.Logger - EventBroadcaster pg.EventBroadcaster MailMon *mailbox.Monitor SqlxDB *sqlx.DB KeyStore keystore.Master @@ -178,7 +176,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { db := opts.SqlxDB cfg := opts.Config relayerChainInterops := opts.RelayerChainInteroperators - eventBroadcaster := opts.EventBroadcaster mailMon := opts.MailMon externalInitiatorManager := opts.ExternalInitiatorManager globalLogger := logger.Sugared(opts.Logger) @@ -254,7 +251,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("no evm chains found") } - srvcs = append(srvcs, eventBroadcaster, mailMon) + srvcs = append(srvcs, mailMon) srvcs = append(srvcs, relayerChainInterops.Services()...) promReporter := promreporter.NewPromReporter(db.DB, legacyEVMChains, globalLogger) srvcs = append(srvcs, promReporter) @@ -480,7 +477,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return &ChainlinkApplication{ relayers: opts.RelayerChainInteroperators, - EventBroadcaster: eventBroadcaster, jobORM: jobORM, jobSpawner: jobSpawner, pipelineRunner: pipelineRunner, @@ -809,10 +805,6 @@ func (app *ChainlinkApplication) GetRelayers() RelayerChainInteroperators { return app.relayers } -func (app *ChainlinkApplication) GetEventBroadcaster() pg.EventBroadcaster { - return app.EventBroadcaster -} - func (app *ChainlinkApplication) GetSqlxDB() *sqlx.DB { return app.sqlxDB } diff --git a/core/services/pg/event_broadcaster.go b/core/services/pg/event_broadcaster.go deleted file mode 100644 index a575ab33489..00000000000 --- a/core/services/pg/event_broadcaster.go +++ /dev/null @@ -1,346 +0,0 @@ -package pg - -import ( - "context" - "database/sql" - "net/url" - "sync" - "time" - - "github.com/google/uuid" - "github.com/lib/pq" - "github.com/pkg/errors" - - "github.com/smartcontractkit/chainlink-common/pkg/services" - commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils" - - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/static" - "github.com/smartcontractkit/chainlink/v2/core/utils" -) - -//go:generate mockery --quiet --name EventBroadcaster --output ./mocks/ --case=underscore -//go:generate mockery --quiet --name Subscription --output ./mocks/ --case=underscore - -// EventBroadcaster opaquely manages a collection of Postgres event listeners -// and broadcasts events to subscribers (with an optional payload filter). -type EventBroadcaster interface { - services.Service - Subscribe(channel, payloadFilter string) (Subscription, error) - Notify(channel string, payload string) error -} - -type eventBroadcaster struct { - services.StateMachine - uri string - minReconnectInterval time.Duration - maxReconnectDuration time.Duration - db *sql.DB - listener *pq.Listener - subscriptions map[string]map[Subscription]struct{} - subscriptionsMu sync.RWMutex - chStop chan struct{} - chDone chan struct{} - lggr logger.Logger -} - -var _ EventBroadcaster = (*eventBroadcaster)(nil) - -type Event struct { - Channel string - Payload string -} - -func NewEventBroadcaster(uri url.URL, minReconnectInterval time.Duration, maxReconnectDuration time.Duration, lggr logger.Logger, appID uuid.UUID) *eventBroadcaster { - if minReconnectInterval == time.Duration(0) { - minReconnectInterval = 1 * time.Second - } - if maxReconnectDuration == time.Duration(0) { - maxReconnectDuration = 1 * time.Minute - } - static.SetConsumerName(&uri, "EventBroadcaster", &appID) - return &eventBroadcaster{ - uri: uri.String(), - minReconnectInterval: minReconnectInterval, - maxReconnectDuration: maxReconnectDuration, - subscriptions: make(map[string]map[Subscription]struct{}), - chStop: make(chan struct{}), - chDone: make(chan struct{}), - lggr: lggr.Named("EventBroadcaster"), - } -} - -// Start starts EventBroadcaster. -func (b *eventBroadcaster) Start(context.Context) error { - return b.StartOnce("Postgres event broadcaster", func() (err error) { - // Explicitly using the lib/pq for notifications so we use the postgres driverName - // and NOT pgx. - db, err := sql.Open("postgres", b.uri) - if err != nil { - return err - } - b.db = db - b.listener = pq.NewListener(b.uri, b.minReconnectInterval, b.maxReconnectDuration, func(ev pq.ListenerEventType, err error) { - // sanity check since these can still be called after closing the listener - select { - case <-b.chStop: - return - default: - } - // These are always connection-related events, and the pq library - // automatically handles reconnecting to the DB. Therefore, we do not - // need to terminate, but rather simply log these events for node - // operators' sanity. - switch ev { - case pq.ListenerEventConnected: - b.lggr.Debug("Postgres event broadcaster: connected") - case pq.ListenerEventDisconnected: - b.lggr.Warnw("Postgres event broadcaster: disconnected, trying to reconnect...", "err", err) - case pq.ListenerEventReconnected: - b.lggr.Debug("Postgres event broadcaster: reconnected") - case pq.ListenerEventConnectionAttemptFailed: - b.lggr.Warnw("Postgres event broadcaster: reconnect attempt failed, trying again...", "err", err) - } - }) - - go b.runLoop() - return nil - }) -} - -// Stop permanently destroys the EventBroadcaster. Calling this does not clean -// up any outstanding subscriptions. Subscribers must explicitly call `.Close()` -// or they will leak goroutines. -func (b *eventBroadcaster) Close() error { - return b.StopOnce("Postgres event broadcaster", func() (err error) { - b.subscriptionsMu.RLock() - defer b.subscriptionsMu.RUnlock() - b.subscriptions = nil - - err = services.CloseAll(b.db, b.listener) - close(b.chStop) - <-b.chDone - return err - }) -} - -func (b *eventBroadcaster) Name() string { - return b.lggr.Name() -} - -func (b *eventBroadcaster) HealthReport() map[string]error { - return map[string]error{b.Name(): b.Healthy()} -} - -func (b *eventBroadcaster) runLoop() { - defer close(b.chDone) - for { - select { - case <-b.chStop: - return - - case notification, open := <-b.listener.NotificationChannel(): - if !open { - return - } else if notification == nil { - continue - } - b.lggr.Debugw("Postgres event broadcaster: received notification", - "channel", notification.Channel, - "payload", notification.Extra, - ) - b.broadcast(notification) - } - } -} - -func (b *eventBroadcaster) Notify(channel string, payload string) error { - _, err := b.db.Exec(`SELECT pg_notify($1, $2)`, channel, payload) - return errors.Wrap(err, "Postgres event broadcaster could not notify") -} - -func (b *eventBroadcaster) Subscribe(channel, payloadFilter string) (Subscription, error) { - b.subscriptionsMu.Lock() - defer b.subscriptionsMu.Unlock() - - if _, exists := b.subscriptions[channel]; !exists { - err := b.listener.Listen(channel) - if err != nil { - return nil, errors.Wrap(err, "Postgres event broadcaster could not subscribe") - } - b.subscriptions[channel] = make(map[Subscription]struct{}) - } - - sub := &subscription{ - channel: channel, - payloadFilter: payloadFilter, - eventBroadcaster: b, - queue: utils.NewBoundedQueue[Event](1000), - chEvents: make(chan Event), - chDone: make(chan struct{}), - lggr: logger.Sugared(b.lggr), - } - sub.processQueueWorker = commonutils.NewSleeperTask( - commonutils.SleeperFuncTask(sub.processQueue, "SubscriptionQueueProcessor"), - ) - b.subscriptions[channel][sub] = struct{}{} - return sub, nil -} - -func (b *eventBroadcaster) removeSubscription(sub Subscription) { - b.subscriptionsMu.Lock() - defer b.subscriptionsMu.Unlock() - - // The following conditions can occur on shutdown when .Stop() is called - // before one or more subscriptions' .Close() methods are called - if b.subscriptions == nil { - return - } - subs, exists := b.subscriptions[sub.ChannelName()] - if !exists || subs == nil { - return - } - - delete(b.subscriptions[sub.ChannelName()], sub) - if len(b.subscriptions[sub.ChannelName()]) == 0 { - err := b.listener.Unlisten(sub.ChannelName()) - if err != nil { - b.lggr.Errorw("Postgres event broadcaster: failed to unsubscribe", "err", err) - } - delete(b.subscriptions, sub.ChannelName()) - } -} - -func (b *eventBroadcaster) broadcast(notification *pq.Notification) { - b.subscriptionsMu.RLock() - defer b.subscriptionsMu.RUnlock() - - event := Event{ - Channel: notification.Channel, - Payload: notification.Extra, - } - - var wg sync.WaitGroup - for sub := range b.subscriptions[event.Channel] { - if sub.InterestedIn(event) { - wg.Add(1) - go func(sub Subscription) { - defer wg.Done() - sub.Send(event) - }(sub) - } - } - wg.Wait() -} - -// Subscription represents a subscription to a Postgres event channel -type Subscription interface { - Events() <-chan Event - Close() - - ChannelName() string - InterestedIn(event Event) bool - Send(event Event) -} - -type subscription struct { - channel string - payloadFilter string - eventBroadcaster *eventBroadcaster - queue *utils.BoundedQueue[Event] - processQueueWorker *commonutils.SleeperTask - chEvents chan Event - chDone chan struct{} - lggr logger.SugaredLogger -} - -var _ Subscription = (*subscription)(nil) - -func (sub *subscription) InterestedIn(event Event) bool { - return sub.payloadFilter == event.Payload || sub.payloadFilter == "" -} - -func (sub *subscription) Send(event Event) { - sub.queue.Add(event) - sub.processQueueWorker.WakeUpIfStarted() -} - -const broadcastTimeout = 10 * time.Second - -func (sub *subscription) processQueue() { - deadline := time.Now().Add(broadcastTimeout) - for !sub.queue.Empty() { - event := sub.queue.Take() - select { - case sub.chEvents <- event: - case <-time.After(time.Until(deadline)): - sub.lggr.Warnf("Postgres event broadcaster: SLOW processQueue(), timed out after %s", broadcastTimeout) - return - case <-sub.chDone: - sub.lggr.Debugw("Postgres event broadcaster: request cancelled during processQueue()") - return - } - } -} - -func (sub *subscription) Events() <-chan Event { - return sub.chEvents -} - -func (sub *subscription) ChannelName() string { - return sub.channel -} - -func (sub *subscription) Close() { - sub.eventBroadcaster.removeSubscription(sub) - // Close chDone before stopping the SleeperTask to avoid deadlocks - close(sub.chDone) - err := sub.processQueueWorker.Stop() - if err != nil { - sub.lggr.Errorw("THIS NEVER RETURNS AN ERROR", "err", err) - } - close(sub.chEvents) -} - -// NullEventBroadcaster implements null pattern for event broadcaster -type NullEventBroadcaster struct { - Sub *NullSubscription -} - -func NewNullEventBroadcaster() *NullEventBroadcaster { - sub := &NullSubscription{make(chan (Event))} - return &NullEventBroadcaster{sub} -} - -var _ EventBroadcaster = &NullEventBroadcaster{} - -func (*NullEventBroadcaster) Name() string { return "NullEventBroadcaster" } - -// Start does no-op. -func (*NullEventBroadcaster) Start(context.Context) error { return nil } - -// Close does no-op. -func (*NullEventBroadcaster) Close() error { return nil } - -// Ready does no-op. -func (*NullEventBroadcaster) Ready() error { return nil } - -// HealthReport does no-op -func (*NullEventBroadcaster) HealthReport() map[string]error { return map[string]error{} } - -func (ne *NullEventBroadcaster) Subscribe(channel, payloadFilter string) (Subscription, error) { - return ne.Sub, nil -} -func (*NullEventBroadcaster) Notify(channel string, payload string) error { return nil } - -var _ Subscription = &NullSubscription{} - -type NullSubscription struct { - Ch chan (Event) -} - -func (ns *NullSubscription) Events() <-chan Event { return ns.Ch } -func (ns *NullSubscription) Close() {} -func (ns *NullSubscription) ChannelName() string { return "" } -func (ns *NullSubscription) InterestedIn(event Event) bool { return false } -func (ns *NullSubscription) Send(event Event) {} diff --git a/core/services/pg/event_broadcaster_test.go b/core/services/pg/event_broadcaster_test.go deleted file mode 100644 index e8a4a1086db..00000000000 --- a/core/services/pg/event_broadcaster_test.go +++ /dev/null @@ -1,239 +0,0 @@ -package pg_test - -import ( - "sync" - "testing" - "time" - - "github.com/google/uuid" - "github.com/onsi/gomega" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - "github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/pg" -) - -func TestEventBroadcaster(t *testing.T) { - config, _ := heavyweight.FullTestDBNoFixturesV2(t, nil) - - eventBroadcaster := pg.NewEventBroadcaster(config.Database().URL(), 0, 0, logger.TestLogger(t), uuid.New()) - servicetest.Run(t, eventBroadcaster) - - t.Run("doesn't broadcast unrelated events (no payload filter)", func(t *testing.T) { - sub, err := eventBroadcaster.Subscribe("foo", "") - require.NoError(t, err) - defer sub.Close() - - go func() { - err := eventBroadcaster.Notify("bar", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("fooo", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("fo", "123") - require.NoError(t, err) - }() - - ch := sub.Events() - gomega.NewWithT(t).Consistently(ch).ShouldNot(gomega.Receive()) - }) - - t.Run("doesn't broadcast unrelated events (with payload filter)", func(t *testing.T) { - sub, err := eventBroadcaster.Subscribe("foo", "123") - require.NoError(t, err) - defer sub.Close() - - go func() { - err := eventBroadcaster.Notify("foo", "asdf") - require.NoError(t, err) - err = eventBroadcaster.Notify("bar", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("fooo", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("fo", "123") - require.NoError(t, err) - }() - - ch := sub.Events() - gomega.NewWithT(t).Consistently(ch).ShouldNot(gomega.Receive()) - }) - - t.Run("does broadcast related events (no payload filter)", func(t *testing.T) { - sub, err := eventBroadcaster.Subscribe("foo", "") - require.NoError(t, err) - defer sub.Close() - - go func() { - err := eventBroadcaster.Notify("foo", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("foo", "aslkdjslkdfj") - require.NoError(t, err) - err = eventBroadcaster.Notify("foo", "true") - require.NoError(t, err) - }() - - ch := sub.Events() - gomega.NewWithT(t).Eventually(ch).Should(gomega.Receive()) - gomega.NewWithT(t).Eventually(ch).Should(gomega.Receive()) - gomega.NewWithT(t).Eventually(ch).Should(gomega.Receive()) - }) - - t.Run("does broadcast related events (with payload filter)", func(t *testing.T) { - sub, err := eventBroadcaster.Subscribe("foo", "123") - require.NoError(t, err) - defer sub.Close() - - go func() { - err := eventBroadcaster.Notify("foo", "asdf") - require.NoError(t, err) - err = eventBroadcaster.Notify("foo", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("foo", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("foo", "true") - require.NoError(t, err) - }() - - ch := sub.Events() - gomega.NewWithT(t).Eventually(ch).Should(gomega.Receive()) - gomega.NewWithT(t).Eventually(ch).Should(gomega.Receive()) - gomega.NewWithT(t).Consistently(ch).ShouldNot(gomega.Receive()) - }) - - t.Run("broadcasts to the correct subscribers", func(t *testing.T) { - sub1, err := eventBroadcaster.Subscribe("foo", "") - require.NoError(t, err) - defer sub1.Close() - - sub2, err := eventBroadcaster.Subscribe("foo", "123") - require.NoError(t, err) - defer sub2.Close() - - sub3, err := eventBroadcaster.Subscribe("bar", "") - require.NoError(t, err) - defer sub3.Close() - - sub4, err := eventBroadcaster.Subscribe("bar", "asdf") - require.NoError(t, err) - defer sub4.Close() - - var wg sync.WaitGroup - wg.Add(5) - - recv := func(ch <-chan pg.Event) pg.Event { - select { - case e := <-ch: - return e - case <-time.After(5 * time.Second): - t.Fatal("did not receive") - } - return pg.Event{} - } - - go func() { - defer wg.Done() - err := eventBroadcaster.Notify("foo", "asdf") - require.NoError(t, err) - err = eventBroadcaster.Notify("foo", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("foo", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("foo", "true") - require.NoError(t, err) - - err = eventBroadcaster.Notify("bar", "asdf") - require.NoError(t, err) - err = eventBroadcaster.Notify("bar", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("bar", "123") - require.NoError(t, err) - err = eventBroadcaster.Notify("bar", "true") - require.NoError(t, err) - }() - - go func() { - defer wg.Done() - e := recv(sub1.Events()) - require.Equal(t, "foo", e.Channel) - require.Equal(t, "asdf", e.Payload) - - e = recv(sub1.Events()) - require.Equal(t, "foo", e.Channel) - require.Equal(t, "123", e.Payload) - - e = recv(sub1.Events()) - require.Equal(t, "foo", e.Channel) - require.Equal(t, "123", e.Payload) - - e = recv(sub1.Events()) - require.Equal(t, "foo", e.Channel) - require.Equal(t, "true", e.Payload) - - gomega.NewWithT(t).Consistently(sub1.Events()).ShouldNot(gomega.Receive()) - }() - - go func() { - defer wg.Done() - e := recv(sub2.Events()) - require.Equal(t, "foo", e.Channel) - require.Equal(t, "123", e.Payload) - - e = recv(sub2.Events()) - require.Equal(t, "foo", e.Channel) - require.Equal(t, "123", e.Payload) - - gomega.NewWithT(t).Consistently(sub2.Events()).ShouldNot(gomega.Receive()) - }() - - go func() { - defer wg.Done() - e := recv(sub3.Events()) - require.Equal(t, "bar", e.Channel) - require.Equal(t, "asdf", e.Payload) - - e = recv(sub3.Events()) - require.Equal(t, "bar", e.Channel) - require.Equal(t, "123", e.Payload) - - e = recv(sub3.Events()) - require.Equal(t, "bar", e.Channel) - require.Equal(t, "123", e.Payload) - - e = recv(sub3.Events()) - require.Equal(t, "bar", e.Channel) - require.Equal(t, "true", e.Payload) - - gomega.NewWithT(t).Consistently(sub3.Events()).ShouldNot(gomega.Receive()) - }() - - go func() { - defer wg.Done() - e := recv(sub4.Events()) - require.Equal(t, "bar", e.Channel) - require.Equal(t, "asdf", e.Payload) - - gomega.NewWithT(t).Consistently(sub4.Events()).ShouldNot(gomega.Receive()) - }() - - wg.Wait() - }) - - t.Run("closes events channel on subscription close", func(t *testing.T) { - sub, err := eventBroadcaster.Subscribe("foo", "") - require.NoError(t, err) - - chEvents := sub.Events() - - sub.Close() - - select { - case _, ok := <-chEvents: - if ok { - t.Fatal("expected chEvents to be closed") - } - default: - t.Fatal("expected chEvents to not block") - } - }) -} diff --git a/core/services/pg/mocks/event_broadcaster.go b/core/services/pg/mocks/event_broadcaster.go deleted file mode 100644 index 63f06db494b..00000000000 --- a/core/services/pg/mocks/event_broadcaster.go +++ /dev/null @@ -1,169 +0,0 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. - -package mocks - -import ( - context "context" - - pg "github.com/smartcontractkit/chainlink/v2/core/services/pg" - mock "github.com/stretchr/testify/mock" -) - -// EventBroadcaster is an autogenerated mock type for the EventBroadcaster type -type EventBroadcaster struct { - mock.Mock -} - -// Close provides a mock function with given fields: -func (_m *EventBroadcaster) Close() error { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Close") - } - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// HealthReport provides a mock function with given fields: -func (_m *EventBroadcaster) HealthReport() map[string]error { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for HealthReport") - } - - var r0 map[string]error - if rf, ok := ret.Get(0).(func() map[string]error); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]error) - } - } - - return r0 -} - -// Name provides a mock function with given fields: -func (_m *EventBroadcaster) Name() string { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Name") - } - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// Notify provides a mock function with given fields: channel, payload -func (_m *EventBroadcaster) Notify(channel string, payload string) error { - ret := _m.Called(channel, payload) - - if len(ret) == 0 { - panic("no return value specified for Notify") - } - - var r0 error - if rf, ok := ret.Get(0).(func(string, string) error); ok { - r0 = rf(channel, payload) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Ready provides a mock function with given fields: -func (_m *EventBroadcaster) Ready() error { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Ready") - } - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Start provides a mock function with given fields: _a0 -func (_m *EventBroadcaster) Start(_a0 context.Context) error { - ret := _m.Called(_a0) - - if len(ret) == 0 { - panic("no return value specified for Start") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Subscribe provides a mock function with given fields: channel, payloadFilter -func (_m *EventBroadcaster) Subscribe(channel string, payloadFilter string) (pg.Subscription, error) { - ret := _m.Called(channel, payloadFilter) - - if len(ret) == 0 { - panic("no return value specified for Subscribe") - } - - var r0 pg.Subscription - var r1 error - if rf, ok := ret.Get(0).(func(string, string) (pg.Subscription, error)); ok { - return rf(channel, payloadFilter) - } - if rf, ok := ret.Get(0).(func(string, string) pg.Subscription); ok { - r0 = rf(channel, payloadFilter) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(pg.Subscription) - } - } - - if rf, ok := ret.Get(1).(func(string, string) error); ok { - r1 = rf(channel, payloadFilter) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NewEventBroadcaster creates a new instance of EventBroadcaster. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewEventBroadcaster(t interface { - mock.TestingT - Cleanup(func()) -}) *EventBroadcaster { - mock := &EventBroadcaster{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/pg/mocks/subscription.go b/core/services/pg/mocks/subscription.go deleted file mode 100644 index fcd194004de..00000000000 --- a/core/services/pg/mocks/subscription.go +++ /dev/null @@ -1,93 +0,0 @@ -// Code generated by mockery v2.38.0. DO NOT EDIT. - -package mocks - -import ( - pg "github.com/smartcontractkit/chainlink/v2/core/services/pg" - mock "github.com/stretchr/testify/mock" -) - -// Subscription is an autogenerated mock type for the Subscription type -type Subscription struct { - mock.Mock -} - -// ChannelName provides a mock function with given fields: -func (_m *Subscription) ChannelName() string { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for ChannelName") - } - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// Close provides a mock function with given fields: -func (_m *Subscription) Close() { - _m.Called() -} - -// Events provides a mock function with given fields: -func (_m *Subscription) Events() <-chan pg.Event { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Events") - } - - var r0 <-chan pg.Event - if rf, ok := ret.Get(0).(func() <-chan pg.Event); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan pg.Event) - } - } - - return r0 -} - -// InterestedIn provides a mock function with given fields: event -func (_m *Subscription) InterestedIn(event pg.Event) bool { - ret := _m.Called(event) - - if len(ret) == 0 { - panic("no return value specified for InterestedIn") - } - - var r0 bool - if rf, ok := ret.Get(0).(func(pg.Event) bool); ok { - r0 = rf(event) - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// Send provides a mock function with given fields: event -func (_m *Subscription) Send(event pg.Event) { - _m.Called(event) -} - -// NewSubscription creates a new instance of Subscription. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewSubscription(t interface { - mock.TestingT - Cleanup(func()) -}) *Subscription { - mock := &Subscription{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -}