Skip to content

Commit

Permalink
Merge pull request #2814 from smartcontractkit/chore/172534510-refact…
Browse files Browse the repository at this point in the history
…or-log-consumption-migration

Refactor LogConsumption model & migration
  • Loading branch information
samsondav authored Apr 29, 2020
2 parents bd8d77e + 43ef0db commit 0cfad53
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 249 deletions.
10 changes: 0 additions & 10 deletions core/internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,16 +338,6 @@ func NewBridgeType(t testing.TB, info ...string) (*models.BridgeTypeAuthenticati
return bta, bt
}

func NewLogConsumer(t testing.TB, store *strpkg.Store) models.LogConsumer {
job := NewJob()
err := store.ORM.CreateJob(&job)
require.NoError(t, err)
return models.LogConsumer{
Type: models.LogConsumerTypeJob,
ID: job.ID,
}
}

// WebURL parses a url into a models.WebURL
func WebURL(t testing.TB, unparsed string) models.WebURL {
parsed, err := url.Parse(unparsed)
Expand Down
22 changes: 12 additions & 10 deletions core/internal/mocks/log_listener.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 8 additions & 9 deletions core/services/eth/log_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ type LogBroadcaster interface {
}

// The LogListener responds to log events through HandleLog, and contains setup/tear-down
// callbacks in the On* functions. The Consumer function returns an instance of the LogConsumer, which
// uniquely identifies the listener
// callbacks in the On* functions.
type LogListener interface {
OnConnect()
OnDisconnect()
HandleLog(lb LogBroadcast, err error)
Consumer() models.LogConsumer
JobID() *models.ID
}

type logBroadcaster struct {
Expand Down Expand Up @@ -83,9 +82,9 @@ type LogBroadcast interface {
}

type logBroadcast struct {
orm *orm.ORM
log eth.RawLog
consumer models.LogConsumer
orm *orm.ORM
log eth.RawLog
consumerID *models.ID
}

func (lb *logBroadcast) Log() interface{} {
Expand All @@ -97,11 +96,11 @@ func (lb *logBroadcast) UpdateLog(newLog eth.RawLog) {
}

func (lb *logBroadcast) WasAlreadyConsumed() (bool, error) {
return lb.orm.HasConsumedLog(lb.log, lb.consumer)
return lb.orm.HasConsumedLog(lb.log, lb.consumerID)
}

func (lb *logBroadcast) MarkConsumed() error {
lc := models.NewLogConsumption(lb.log, lb.consumer)
lc := models.NewLogConsumption(lb.log, lb.consumerID)
return lb.orm.CreateLogConsumption(&lc)
}

Expand Down Expand Up @@ -342,7 +341,7 @@ func (b *logBroadcaster) onRawLog(rawLog eth.Log) {
}

rawLogCopy := rawLog.Copy()
lb := logBroadcast{b.orm, &rawLogCopy, listener.Consumer()}
lb := logBroadcast{b.orm, &rawLogCopy, listener.JobID()}
listener.HandleLog(&lb, nil)
}
}
Expand Down
57 changes: 27 additions & 30 deletions core/services/eth/log_broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,18 @@ func TestLogBroadcaster_ResubscribesOnAddOrRemoveContract(t *testing.T) {
sub.AssertExpectations(t)
}

type simpleLogListner struct {
handler func(lb ethsvc.LogBroadcast, err error)
id models.ID
type simpleLogListener struct {
handler func(lb ethsvc.LogBroadcast, err error)
consumerID *models.ID
}

func (listner simpleLogListner) HandleLog(lb ethsvc.LogBroadcast, err error) {
listner.handler(lb, err)
func (listener simpleLogListener) HandleLog(lb ethsvc.LogBroadcast, err error) {
listener.handler(lb, err)
}
func (listner simpleLogListner) OnConnect() {}
func (listner simpleLogListner) OnDisconnect() {}
func (listner simpleLogListner) Consumer() models.LogConsumer {
return models.LogConsumer{
Type: models.LogConsumerTypeJob,
ID: &listner.id,
}
func (listener simpleLogListener) OnConnect() {}
func (listener simpleLogListener) OnDisconnect() {}
func (listener simpleLogListener) JobID() *models.ID {
return listener.consumerID
}

func TestLogBroadcaster_BroadcastsToCorrectRecipients(t *testing.T) {
Expand Down Expand Up @@ -219,37 +216,37 @@ func TestLogBroadcaster_BroadcastsToCorrectRecipients(t *testing.T) {

var addr1Logs1, addr1Logs2, addr2Logs1, addr2Logs2 []interface{}

listener1 := simpleLogListner{
listener1 := simpleLogListener{
func(lb ethsvc.LogBroadcast, err error) {
require.NoError(t, err)
addr1Logs1 = append(addr1Logs1, lb.Log())
handleLogBroadcast(t, lb)
},
*createJob(t, store).ID,
createJob(t, store).ID,
}
listener2 := simpleLogListner{
listener2 := simpleLogListener{
func(lb ethsvc.LogBroadcast, err error) {
require.NoError(t, err)
addr1Logs2 = append(addr1Logs2, lb.Log())
handleLogBroadcast(t, lb)
},
*createJob(t, store).ID,
createJob(t, store).ID,
}
listener3 := simpleLogListner{
listener3 := simpleLogListener{
func(lb ethsvc.LogBroadcast, err error) {
require.NoError(t, err)
addr2Logs1 = append(addr2Logs1, lb.Log())
handleLogBroadcast(t, lb)
},
*createJob(t, store).ID,
createJob(t, store).ID,
}
listener4 := simpleLogListner{
listener4 := simpleLogListener{
func(lb ethsvc.LogBroadcast, err error) {
require.NoError(t, err)
addr2Logs2 = append(addr2Logs2, lb.Log())
handleLogBroadcast(t, lb)
},
*createJob(t, store).ID,
createJob(t, store).ID,
}

lb.Register(addr1, &listener1)
Expand Down Expand Up @@ -380,13 +377,12 @@ func TestDecodingLogListener(t *testing.T) {

var decodedLog interface{}

job := createJob(t, store)
listener := simpleLogListner{
listener := simpleLogListener{
func(lb ethsvc.LogBroadcast, innerErr error) {
err = innerErr
decodedLog = lb.Log()
},
*job.ID,
createJob(t, store).ID,
}

decodingListener := ethsvc.NewDecodingLogListener(contract, logTypes, &listener)
Expand Down Expand Up @@ -516,8 +512,9 @@ func TestLogBroadcaster_ReceivesAllLogsWhenResubscribing(t *testing.T) {
}
}

logListener := &simpleLogListner{
handler: handleLog,
logListener := &simpleLogListener{
handleLog,
createJob(t, store).ID,
}

// Send initial logs
Expand All @@ -539,7 +536,7 @@ func TestLogBroadcaster_ReceivesAllLogsWhenResubscribing(t *testing.T) {
ethClient.On("GetLatestBlock").Return(eth.Block{Number: hexutil.Uint64(test.blockHeight2)}, nil).Once()
ethClient.On("GetLogs", mock.Anything).Return(backfillableLogs, nil).Once()
// Trigger resubscription
lb.Register(common.Address{1}, &simpleLogListner{})
lb.Register(common.Address{1}, &simpleLogListener{})
chRawLogs2 := <-chchRawLogs
for _, logNum := range test.batch2 {
chRawLogs2 <- logs[logNum]
Expand Down Expand Up @@ -650,7 +647,7 @@ func TestLogBroadcaster_InjectsLogConsumptionRecordFunctions(t *testing.T) {
listenerCount := 0

job := createJob(t, store)
logListener := simpleLogListner{
logListener := simpleLogListener{
func(lb ethsvc.LogBroadcast, err error) {
consumed, err := lb.WasAlreadyConsumed()
require.NoError(t, err)
Expand All @@ -662,7 +659,7 @@ func TestLogBroadcaster_InjectsLogConsumptionRecordFunctions(t *testing.T) {
require.True(t, consumed)
listenerCount++
},
*job.ID,
job.ID,
}
addr := common.Address{1}

Expand Down Expand Up @@ -717,14 +714,14 @@ func TestLogBroadcaster_ProcessesLogsFromReorgs(t *testing.T) {
var recvd []*eth.Log

job := createJob(t, store)
listener := simpleLogListner{
listener := simpleLogListener{
func(lb ethsvc.LogBroadcast, err error) {
require.NoError(t, err)
ethLog := lb.Log().(*eth.Log)
recvd = append(recvd, ethLog)
handleLogBroadcast(t, lb)
},
*job.ID,
job.ID,
}

lb.Register(addr, &listener)
Expand Down
7 changes: 2 additions & 5 deletions core/services/fluxmonitor/flux_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,11 +938,8 @@ func (p *PollingDeviationChecker) loggerFieldsForAnswerUpdated(log contracts.Log
}
}

func (p *PollingDeviationChecker) Consumer() models.LogConsumer {
return models.LogConsumer{
Type: models.LogConsumerTypeJob,
ID: p.initr.JobSpecID,
}
func (p *PollingDeviationChecker) JobID() *models.ID {
return p.initr.JobSpecID
}

// OutsideDeviation checks whether the next price is outside the threshold.
Expand Down
20 changes: 0 additions & 20 deletions core/services/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,23 +319,3 @@ func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) er

return fe.CoerceEmptyToNil()
}

// ValidateLogConsumption validates a LogConsumption record
func ValidateLogConsumption(lc models.LogConsumption, store *store.Store) error {
fe := models.NewJSONAPIErrors()

if _, err := store.ORM.FindLogConsumer(&lc); err != nil {
errorMessage := errors.Wrapf(err, "Unable to find LogConsumer of type %s with id %s", lc.ConsumerType, lc.ConsumerID.String())
fe.Add(errorMessage.Error())
}

exists, err := store.ORM.LogConsumptionExists(&lc)

if err != nil {
fe.Add(err.Error())
} else if exists {
fe.Add("LogConsumption record already exists")
}

return fe.CoerceEmptyToNil()
}
Loading

0 comments on commit 0cfad53

Please sign in to comment.