Skip to content

Commit

Permalink
use sqlutil instead of pg.QOpts
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 24, 2024
1 parent 9ab3347 commit 588f99a
Show file tree
Hide file tree
Showing 184 changed files with 3,802 additions and 4,592 deletions.
2 changes: 1 addition & 1 deletion core/bridges/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestORM_TestCachedResponse(t *testing.T) {
orm := bridges.NewORM(db)

trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.JobPipeline().MaxSuccessfulRuns())
specID, err := trORM.CreateSpec(ctx, nil, pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute))
specID, err := trORM.CreateSpec(ctx, pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute))
require.NoError(t, err)

_, err = orm.GetCachedResponse(ctx, "dot", specID, 1*time.Second)
Expand Down
13 changes: 6 additions & 7 deletions core/chains/evm/log/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newBroadcasterHelperWithEthClient(t *testing.T, ethClient evmclient.Client,
m[r.Chain().ID().String()] = r.Chain()
}
legacyChains := legacyevm.NewLegacyChains(m, cc.AppConfig().EVMConfigs())
pipelineHelper := cltest.NewJobPipelineV2(t, globalConfig.WebServer(), globalConfig.JobPipeline(), globalConfig.Database(), legacyChains, db, kst, nil, nil)
pipelineHelper := cltest.NewJobPipelineV2(t, globalConfig.WebServer(), globalConfig.JobPipeline(), legacyChains, db, kst, nil, nil)

return &broadcasterHelper{
t: t,
Expand Down Expand Up @@ -263,7 +263,7 @@ func (helper *broadcasterHelper) newLogListenerWithJob(name string) *simpleLogLi
PipelineSpec: &pipeline.Spec{},
ExternalJobID: uuid.New(),
}
err := helper.pipelineHelper.Jrm.CreateJob(jb)
err := helper.pipelineHelper.Jrm.CreateJob(testutils.Context(t), jb)
require.NoError(t, err)

var rec received
Expand All @@ -288,7 +288,7 @@ func (listener *simpleLogListener) HandleLog(ctx context.Context, lb log.Broadca

listener.received.logs = append(listener.received.logs, lb.RawLog())
listener.received.broadcasts = append(listener.received.broadcasts, lb)
consumed := listener.handleLogBroadcast(lb)
consumed := listener.handleLogBroadcast(ctx, lb)

if !consumed {
listener.received.uniqueLogs = append(listener.received.uniqueLogs, lb.RawLog())
Expand Down Expand Up @@ -321,9 +321,8 @@ func (listener *simpleLogListener) requireAllReceived(t *testing.T, expectedStat
}, testutils.WaitTimeout(t), time.Second, "len(received.uniqueLogs): %v is not equal len(expectedState.uniqueLogs): %v", len(received.getUniqueLogs()), len(expectedState.getUniqueLogs()))
}

func (listener *simpleLogListener) handleLogBroadcast(lb log.Broadcast) bool {
func (listener *simpleLogListener) handleLogBroadcast(ctx context.Context, lb log.Broadcast) bool {
t := listener.t
ctx := testutils.Context(t)
consumed, err := listener.WasAlreadyConsumed(ctx, lb)
if !assert.NoError(t, err) {
return false
Expand Down Expand Up @@ -354,8 +353,8 @@ type mockListener struct {
jobID int32
}

func (l *mockListener) JobID() int32 { return l.jobID }
func (l *mockListener) HandleLog(log.Broadcast) {}
func (l *mockListener) JobID() int32 { return l.jobID }
func (l *mockListener) HandleLog(context.Context, log.Broadcast) {}

type mockEthClientExpectedCalls struct {
SubscribeFilterLogs int
Expand Down
8 changes: 6 additions & 2 deletions core/chains/evm/log/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func TestBroadcaster_ReplaysLogs(t *testing.T) {
func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
contract1 := newMockContract(t)
contract2 := newMockContract(t)
ctx := testutils.Context(t)

blocks := cltest.NewBlocks(t, 10)
const (
Expand All @@ -267,6 +266,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
helper := newBroadcasterHelper(t, 0, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].FinalityDepth = ptr[uint32](confs)
})
ctx := testutils.Context(t)
orm := log.NewORM(helper.db, cltest.FixtureChainID)

listener := helper.newLogListenerWithJob("one")
Expand All @@ -292,6 +292,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
helper := newBroadcasterHelper(t, 2, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].FinalityDepth = ptr[uint32](confs)
})
ctx := testutils.Context(t)
orm := log.NewORM(helper.db, cltest.FixtureChainID)
contract1.On("ParseLog", log1).Return(flux_aggregator_wrapper.FluxAggregatorNewRound{}, nil)
contract2.On("ParseLog", log2).Return(flux_aggregator_wrapper.FluxAggregatorAnswerUpdated{}, nil)
Expand All @@ -318,6 +319,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
helper := newBroadcasterHelper(t, 4, 1, logs, func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].FinalityDepth = ptr[uint32](confs)
})
ctx := testutils.Context(t)
orm := log.NewORM(helper.db, cltest.FixtureChainID)

listener := helper.newLogListenerWithJob("one")
Expand All @@ -342,6 +344,7 @@ func TestBroadcaster_BackfillUnconsumedAfterCrash(t *testing.T) {
helper := newBroadcasterHelper(t, 7, 1, logs[1:], func(c *chainlink.Config, s *chainlink.Secrets) {
c.EVM[0].FinalityDepth = ptr[uint32](confs)
})
ctx := testutils.Context(t)
orm := log.NewORM(helper.db, cltest.FixtureChainID)
listener := helper.newLogListenerWithJob("one")
listener2 := helper.newLogListenerWithJob("two")
Expand Down Expand Up @@ -377,8 +380,9 @@ func (helper *broadcasterHelper) simulateHeads(t *testing.T, listener, listener2

<-headsDone

ctx := testutils.Context(t)
require.Eventually(t, func() bool {
blockNum, err := orm.GetPendingMinBlock(testutils.Context(t))
blockNum, err := orm.GetPendingMinBlock(ctx)
if !assert.NoError(t, err) {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type ObservedORM struct {

// NewObservedORM creates an observed version of log poller's ORM created by NewORM
// Please see ObservedLogPoller for more details on how latencies are measured
func NewObservedORM(chainID *big.Int, db sqlutil.DataSource, lggr logger.Logger) *ObservedORM {
func NewObservedORM(chainID *big.Int, ds sqlutil.DataSource, lggr logger.Logger) *ObservedORM {
return &ObservedORM{
ORM: NewORM(chainID, db, lggr),
ORM: NewORM(chainID, ds, lggr),
queryDuration: lpQueryDuration,
datasetSize: lpQueryDataSets,
logsInserted: lpLogsInserted,
Expand Down
Loading

0 comments on commit 588f99a

Please sign in to comment.