Skip to content

Commit

Permalink
rm eventbroadcaster completely
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Jan 9, 2024
1 parent 3a598bb commit 5f38ffb
Show file tree
Hide file tree
Showing 8 changed files with 3 additions and 871 deletions.
4 changes: 0 additions & 4 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
Expand Down Expand Up @@ -156,8 +155,6 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
keyStore := keystore.New(db, utils.GetScryptParams(cfg), appLggr, cfg.Database())
mailMon := mailbox.NewMonitor(cfg.AppID().String(), appLggr.Named("Mailbox"))

dbListener := cfg.Database().Listener()
eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), dbListener.MinReconnectInterval(), dbListener.MaxReconnectDuration(), appLggr, cfg.AppID())
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing())

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
Expand Down Expand Up @@ -226,7 +223,6 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
SqlxDB: db,
KeyStore: keyStore,
RelayerChainInteroperators: relayChainInterops,
EventBroadcaster: eventBroadcaster,
MailMon: mailMon,
Logger: appLggr,
AuditLogger: auditLogger,
Expand Down
5 changes: 0 additions & 5 deletions core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
auditLogger = audit.NoopLogger
}

var eventBroadcaster pg.EventBroadcaster = pg.NewNullEventBroadcaster()

url := cfg.Database().URL()
db, err := pg.NewConnection(url.String(), cfg.Database().Dialect(), cfg.Database())
require.NoError(t, err)
Expand All @@ -329,8 +327,6 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
ethClient = dep
case webhook.ExternalInitiatorManager:
externalInitiatorManager = dep
case pg.EventBroadcaster:
eventBroadcaster = dep
default:
switch flag {
case UseRealExternalInitiatorManager:
Expand Down Expand Up @@ -415,7 +411,6 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
c := clhttptest.NewTestLocalOnlyHTTPClient()
appInstance, err := chainlink.NewApplication(chainlink.ApplicationOpts{
Config: cfg,
EventBroadcaster: eventBroadcaster,
MailMon: mailMon,
SqlxDB: db,
KeyStore: keyStore,
Expand Down
8 changes: 2 additions & 6 deletions core/internal/cltest/simulated_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/core"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

func NewSimulatedBackend(t *testing.T, alloc core.GenesisAlloc, gasLimit uint32) *backends.SimulatedBackend {
Expand All @@ -41,9 +39,8 @@ func NewApplicationWithConfigV2OnSimulatedBlockchain(
require.Zero(t, evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()).Cmp(testutils.SimulatedChainID))
chainID := big.New(testutils.SimulatedChainID)
client := client.NewSimulatedBackendClient(t, backend, testutils.SimulatedChainID)
eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), 0, 0, logger.TestLogger(t), uuid.New())

flagsAndDeps = append(flagsAndDeps, client, eventBroadcaster, chainID)
flagsAndDeps = append(flagsAndDeps, client, chainID)

// app.Stop() will call client.Close on the simulated backend
app := NewApplicationWithConfig(t, cfg, flagsAndDeps...)
Expand All @@ -66,9 +63,8 @@ func NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(
require.Zero(t, evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()).Cmp(testutils.SimulatedChainID))
chainID := big.New(testutils.SimulatedChainID)
client := client.NewSimulatedBackendClient(t, backend, testutils.SimulatedChainID)
eventBroadcaster := pg.NewEventBroadcaster(cfg.Database().URL(), 0, 0, logger.TestLogger(t), uuid.New())

flagsAndDeps = append(flagsAndDeps, client, eventBroadcaster, chainID)
flagsAndDeps = append(flagsAndDeps, client, chainID)

// app.Stop() will call client.Close on the simulated backend
return NewApplicationWithConfigAndKey(t, cfg, flagsAndDeps...)
Expand Down
10 changes: 1 addition & 9 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ type Application interface {
// in the services package, but the Store has its own package.
type ChainlinkApplication struct {
relayers *CoreRelayerChainInteroperators
EventBroadcaster pg.EventBroadcaster
jobORM job.ORM
jobSpawner job.Spawner
pipelineORM pipeline.ORM
Expand Down Expand Up @@ -150,7 +149,6 @@ type ChainlinkApplication struct {
type ApplicationOpts struct {
Config GeneralConfig
Logger logger.Logger
EventBroadcaster pg.EventBroadcaster
MailMon *mailbox.Monitor
SqlxDB *sqlx.DB
KeyStore keystore.Master
Expand Down Expand Up @@ -178,7 +176,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
db := opts.SqlxDB
cfg := opts.Config
relayerChainInterops := opts.RelayerChainInteroperators
eventBroadcaster := opts.EventBroadcaster
mailMon := opts.MailMon
externalInitiatorManager := opts.ExternalInitiatorManager
globalLogger := logger.Sugared(opts.Logger)
Expand Down Expand Up @@ -254,7 +251,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("no evm chains found")
}

srvcs = append(srvcs, eventBroadcaster, mailMon)
srvcs = append(srvcs, mailMon)
srvcs = append(srvcs, relayerChainInterops.Services()...)
promReporter := promreporter.NewPromReporter(db.DB, legacyEVMChains, globalLogger)
srvcs = append(srvcs, promReporter)
Expand Down Expand Up @@ -480,7 +477,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

return &ChainlinkApplication{
relayers: opts.RelayerChainInteroperators,
EventBroadcaster: eventBroadcaster,
jobORM: jobORM,
jobSpawner: jobSpawner,
pipelineRunner: pipelineRunner,
Expand Down Expand Up @@ -809,10 +805,6 @@ func (app *ChainlinkApplication) GetRelayers() RelayerChainInteroperators {
return app.relayers
}

func (app *ChainlinkApplication) GetEventBroadcaster() pg.EventBroadcaster {
return app.EventBroadcaster
}

func (app *ChainlinkApplication) GetSqlxDB() *sqlx.DB {
return app.sqlxDB
}
Expand Down
Loading

0 comments on commit 5f38ffb

Please sign in to comment.