Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1730 Implementing Healthy function in LogPoller #584

Merged
merged 7 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -70,6 +71,8 @@ const (
Unconfirmed = Confirmations(0)
)

var ErrFinalityViolated = errors.New("finality violated")

type LogPollerTest interface {
LogPoller
PollAndSaveLogs(ctx context.Context, currentBlockNumber int64)
Expand Down Expand Up @@ -118,6 +121,12 @@ type logPoller struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// This flag is raised whenever the log poller detects that the chain's finality has been violated.
// It can happen when reorg is deeper than the latest finalized block that LogPoller saw in a previous PollAndSave tick.
// Usually the only way to recover is to manually remove the offending logs and block from the database.
// LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should
// recover automatically without needing to restart the LogPoller.
finalityViolated *atomic.Bool
roman-kashitsyn marked this conversation as resolved.
Show resolved Hide resolved
}

// NewLogPoller creates a log poller. Note there is an assumption
Expand Down Expand Up @@ -149,6 +158,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
keepFinalizedBlocksDepth: keepFinalizedBlocksDepth,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
}
}

Expand Down Expand Up @@ -203,6 +213,13 @@ func (filter *Filter) Contains(other *Filter) bool {
return true
}

func (lp *logPoller) Ready() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks good overall but what's the difference between Ready and Healthy in this context? We already have a HealthReport method that calls lp.Healthy() but since that's not implemented on the LP itself it calls it on the StateMachine object from chainlink-common which is embedded in the LP struct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added "benefit" of implementing Healthy is that HealthReport will return a meaningful report now since we return whether finality has been violated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind I'm not 100% sure how these functions are consumed outside of this particular context we want to consume them in now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice catch, I wasn't aware of healthy function when going through the interfaces. @reductionista could you please advise which is better for this case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe healthy/ready are inherited from k8s probes. If not ready k8s won't send traffic yet and if not healthy k8s will restart the node. We only expose the ready one to k8s directly

// NOTE: We only implement the k8s readiness check, *not* the liveness check. Liveness checks are only recommended in cases
and have a separate health report. Healthy definitely makes more sense here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that Healthy() error is not present in LogPoller's interface, probably it has to be added first 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in the services.StateMachine that's embedded in the logPoller struct - we have to override it to get useful behavior IIUC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you mean the LogPoller interface - that's a good point. We should add it there if we want clients to call it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, already added it, I'm just a bit worried about what @connorwstein said about that function being used for k8s tooling. If that's the case, then we don't want to restart pod whenever LogPoller is marked as not healthy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is the SME on this? Maybe someone from infra or maybe core?

if lp.finalityViolated.Load() {
return ErrFinalityViolated
}
return nil
}

// RegisterFilter adds the provided EventSigs and Addresses to the log poller's log filter query.
// If any eventSig is emitted from any address, it will be captured by the log poller.
// If an event matching any of the given event signatures is emitted from any of the provided Addresses,
Expand Down Expand Up @@ -758,9 +775,11 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
return nil, err2
}
lp.finalityViolated.Store(false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think still an edge case here: say a finality violation is detected and you wipe the db. We'll hit the "first poll ever on new chain" return case here and leave the bool true. I think you want a defer where any non-error return value from this function we set to false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you suggest marking it as false at the end of PollAndSave tick? It would be definitely easier if errors were bubbled up, but still doable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imagining something like this, can be a follow up

func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (*evmtypes.Head, err error) {
	defer func() {
                // Note careful about shadowing etc
		if err != nil {
			lp.finalityViolated.Store(false)
		}
	}()

return blockAfterLCA, nil
}
// No reorg, return current block.
lp.finalityViolated.Store(false)
return currentBlock, nil
}

Expand Down Expand Up @@ -924,6 +943,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
rerr := errors.New("Reorg greater than finality depth")
lp.SvcErrBuffer.Append(rerr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can use this but the semantics seem weird, it seems like the buffer is flushed whenever you read it, so it's not really super usable for our use case.

lp.finalityViolated.Store(true)
return nil, rerr
}

Expand Down
82 changes: 82 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,86 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) {
}
}

func TestLogPoller_ReorgDeeperThanFinality(t *testing.T) {
tests := []struct {
name string
finalityDepth int64
finalityTag bool
}{
{
name: "fixed finality depth without finality tag",
finalityDepth: 1,
finalityTag: false,
},
{
name: "chain finality in use",
finalityDepth: 0,
finalityTag: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.finalityTag, tt.finalityDepth, 3, 2, 1000)
// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "Test Emitter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)

// Test scenario
// Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1)
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)})
require.NoError(t, err)
th.Client.Commit()
th.Client.Commit()
th.Client.Commit()
markBlockAsFinalized(t, th, 3)

// Polling should get us the L1 log.
firstPoll := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.Equal(t, int64(5), firstPoll)
assert.NoError(t, th.LogPoller.Ready())

// Fork deeper than finality depth
// Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1)
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
lca, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(1))
require.NoError(t, err)
require.NoError(t, th.Client.Fork(testutils.Context(t), lca.Hash()))

// Create 2'
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(2)})
require.NoError(t, err)
th.Client.Commit()

// Create 3-10
for i := 3; i < 10; i++ {
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))})
require.NoError(t, err)
th.Client.Commit()
}
markBlockAsFinalized(t, th, 6)

secondPoll := th.PollAndSaveLogs(testutils.Context(t), firstPoll)
assert.Equal(t, firstPoll, secondPoll)
assert.Equal(t, logpoller.ErrFinalityViolated, th.LogPoller.Ready())

// Manually remove latest block from the log poller to bring it back to life
// LogPoller should be healthy again after first poll
// Chain gen <- 1
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
require.NoError(t, th.ORM.DeleteLogsAndBlocksAfter(2))
// Poll from latest
recoveryPoll := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.Equal(t, int64(10), recoveryPoll)
assert.NoError(t, th.LogPoller.Ready())
})
}
}

func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1027,6 +1107,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {
// Polling should get us the L1 log.
newStart := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.Equal(t, int64(3), newStart)
assert.NoError(t, th.LogPoller.Ready())
// Check that L1_1 has a proper data payload
lgs, err := th.ORM.SelectLogsByBlockRange(2, 2)
require.NoError(t, err)
Expand All @@ -1052,6 +1133,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {

newStart = th.PollAndSaveLogs(testutils.Context(t), newStart)
assert.Equal(t, int64(10), newStart)
assert.NoError(t, th.LogPoller.Ready())

// Expect L1_2 to be properly updated
lgs, err = th.ORM.SelectLogsByBlockRange(2, 2)
Expand Down
Loading