Skip to content

Commit

Permalink
Merge branch 'develop' into chore/TT-1102-fix-fund-return
Browse files Browse the repository at this point in the history
  • Loading branch information
iljapavlovs authored May 17, 2024
2 parents 04aaa8f + c82399e commit 8b245d5
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 72 deletions.
5 changes: 5 additions & 0 deletions .changeset/eighty-hotels-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix panic if mercury server returns error #bugfix
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@
You may disable if this results in excessive log volume. Disable like so:

```
[Pipeline]
[JobPipeline]
VerboseLogging = false
```

Expand Down Expand Up @@ -219,7 +219,7 @@

- [#12404](https://github.com/smartcontractkit/chainlink/pull/12404) [`b74079b672`](https://github.com/smartcontractkit/chainlink/commit/b74079b672f36fb0c241f90ea1e875ea3a9524da) Thanks [@HenryNguyen5](https://github.com/HenryNguyen5)! - Add OCR3 capability contract wrapper

- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option Pipeline.VerboseLogging
- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option JobPipeline.VerboseLogging

VerboseLogging enables detailed logging of pipeline execution steps. This is
disabled by default because it increases log volume for pipeline runs, but can
Expand All @@ -230,7 +230,7 @@
Set it like the following example:

```
[Pipeline]
[JobPipeline]
VerboseLogging = true
```

Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.3
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1185,8 +1185,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs=
github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c h1:KiG8PAwUrdYn/AGBQ+B4p6erEUbEB+g6LJKhAaDjJ2s=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240514153505-0ddba5aa4d2c/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10 h1:IwJKWZHPBJbbh4oI3BGX8VNT3c/ChNiPZ/XI4iq6c0E=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240516150131-e1be553a9d10/go.mod h1:sj0pjL+METqeYL9ibp0T8SXquymlaQsofa6bdfLgXX8=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69 h1:Sec/GpBpUVaTEax1kSHlTvkzF/+d3w5roAQXaj5+SLA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240508101745-af1ed7bc8a69/go.mod h1:ZQKf+0OLzCLYIisH/OdOIQuFRI6bDuw+jPBTATyHfFM=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
14 changes: 3 additions & 11 deletions core/services/relay/evm/chain_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ const (
triggerWithAllTopics = "TriggeredWithFourTopics"
)

func TestChainReaderGetLatestValue(t *testing.T) {
func TestChainReaderInterfaceTests(t *testing.T) {
t.Parallel()
it := &chainReaderInterfaceTester{}

RunChainReaderGetLatestValueInterfaceTests(t, it)
RunChainReaderGetLatestValueInterfaceTests(t, commontestutils.WrapChainReaderTesterForLoop(it))
RunChainReaderInterfaceTests(t, it)
RunChainReaderInterfaceTests(t, commontestutils.WrapChainReaderTesterForLoop(it))

t.Run("Dynamically typed topics can be used to filter and have type correct in return", func(t *testing.T) {
it.Setup(t)
Expand Down Expand Up @@ -110,14 +110,6 @@ func TestChainReaderGetLatestValue(t *testing.T) {
})
}

func TestChainReaderQueryKey(t *testing.T) {
t.Parallel()
it := &chainReaderInterfaceTester{}

RunQueryKeyInterfaceTests(t, it)
RunQueryKeyInterfaceTests(t, commontestutils.WrapChainReaderTesterForLoop(it))
}

func triggerFourTopics(t *testing.T, it *chainReaderInterfaceTester, i1, i2, i3 int32) {
tx, err := it.evmTest.ChainReaderTesterTransactor.TriggerWithFourTopics(it.auth, i1, i2, i3)
require.NoError(t, err)
Expand Down
43 changes: 26 additions & 17 deletions core/services/relay/evm/mercury/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type asyncDeleter interface {
AsyncDelete(req *pb.TransmitRequest)
}

var _ services.Service = (*TransmitQueue)(nil)
var _ services.Service = (*transmitQueue)(nil)

var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "mercury_transmit_queue_load",
Expand All @@ -40,7 +40,7 @@ const promInterval = 6500 * time.Millisecond

// TransmitQueue is the high-level package that everything outside of this file should be using
// It stores pending transmissions, yielding the latest (highest priority) first to the caller
type TransmitQueue struct {
type transmitQueue struct {
services.StateMachine

cond sync.Cond
Expand All @@ -62,11 +62,20 @@ type Transmission struct {
ReportCtx ocrtypes.ReportContext // contains priority information (latest epoch/round wins)
}

type TransmitQueue interface {
services.Service

BlockingPop() (t *Transmission)
Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool)
Init(transmissions []*Transmission)
IsEmpty() bool
}

// maxlen controls how many items will be stored in the queue
// 0 means unlimited - be careful, this can cause memory leaks
func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) *TransmitQueue {
func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) TransmitQueue {
mu := new(sync.RWMutex)
return &TransmitQueue{
return &transmitQueue{
services.StateMachine{},
sync.Cond{L: mu},
lggr.Named("TransmitQueue"),
Expand All @@ -80,13 +89,13 @@ func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int,
}
}

func (tq *TransmitQueue) Init(transmissions []*Transmission) {
func (tq *transmitQueue) Init(transmissions []*Transmission) {
pq := priorityQueue(transmissions)
heap.Init(&pq) // ensure the heap is ordered
tq.pq = &pq
}

func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) {
func (tq *transmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) {
tq.cond.L.Lock()
defer tq.cond.L.Unlock()

Expand All @@ -111,7 +120,7 @@ func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.Report

// BlockingPop will block until at least one item is in the heap, and then return it
// If the queue is closed, it will immediately return nil
func (tq *TransmitQueue) BlockingPop() (t *Transmission) {
func (tq *transmitQueue) BlockingPop() (t *Transmission) {
tq.cond.L.Lock()
defer tq.cond.L.Unlock()
if tq.closed {
Expand All @@ -126,13 +135,13 @@ func (tq *TransmitQueue) BlockingPop() (t *Transmission) {
return t
}

func (tq *TransmitQueue) IsEmpty() bool {
func (tq *transmitQueue) IsEmpty() bool {
tq.mu.RLock()
defer tq.mu.RUnlock()
return tq.pq.Len() == 0
}

func (tq *TransmitQueue) Start(context.Context) error {
func (tq *transmitQueue) Start(context.Context) error {
return tq.StartOnce("TransmitQueue", func() error {
t := time.NewTicker(utils.WithJitter(promInterval))
wg := new(sync.WaitGroup)
Expand All @@ -148,7 +157,7 @@ func (tq *TransmitQueue) Start(context.Context) error {
})
}

func (tq *TransmitQueue) Close() error {
func (tq *transmitQueue) Close() error {
return tq.StopOnce("TransmitQueue", func() error {
tq.cond.L.Lock()
tq.closed = true
Expand All @@ -159,7 +168,7 @@ func (tq *TransmitQueue) Close() error {
})
}

func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) {
func (tq *transmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()

for {
Expand All @@ -172,25 +181,25 @@ func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{},
}
}

func (tq *TransmitQueue) report() {
func (tq *transmitQueue) report() {
tq.mu.RLock()
length := tq.pq.Len()
tq.mu.RUnlock()
tq.transmitQueueLoad.Set(float64(length))
}

func (tq *TransmitQueue) Ready() error {
func (tq *transmitQueue) Ready() error {
return nil
}
func (tq *TransmitQueue) Name() string { return tq.lggr.Name() }
func (tq *TransmitQueue) HealthReport() map[string]error {
func (tq *transmitQueue) Name() string { return tq.lggr.Name() }
func (tq *transmitQueue) HealthReport() map[string]error {
report := map[string]error{tq.Name(): errors.Join(
tq.status(),
)}
return report
}

func (tq *TransmitQueue) status() (merr error) {
func (tq *transmitQueue) status() (merr error) {
tq.mu.RLock()
length := tq.pq.Len()
closed := tq.closed
Expand All @@ -206,7 +215,7 @@ func (tq *TransmitQueue) status() (merr error) {

// pop latest Transmission from the heap
// Not thread-safe
func (tq *TransmitQueue) pop() *Transmission {
func (tq *transmitQueue) pop() *Transmission {
if tq.pq.Len() == 0 {
return nil
}
Expand Down
39 changes: 23 additions & 16 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,12 @@ type server struct {

c wsrpc.Client
pm *PersistenceManager
q *TransmitQueue
q TransmitQueue

deleteQueue chan *pb.TransmitRequest

url string

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
Expand Down Expand Up @@ -268,7 +270,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
s.transmitDuplicateCount.Inc()
s.lggr.Debugw("Transmit report success; duplicate report", "payload", hexutil.Encode(t.Req.Payload), "response", res, "repts", t.ReportCtx.ReportTimestamp)
default:
transmitServerErrorCount.WithLabelValues(feedIDHex, fmt.Sprintf("%d", res.Code)).Inc()
transmitServerErrorCount.WithLabelValues(feedIDHex, s.url, fmt.Sprintf("%d", res.Code)).Inc()
s.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "reportCtx", t.ReportCtx, "err", res.Error, "code", res.Code)
}
}
Expand All @@ -281,26 +283,31 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed
}
}

func newServer(lggr logger.Logger, cfg TransmitterConfig, client wsrpc.Client, pm *PersistenceManager, serverURL, feedIDHex string) *server {
return &server{
lggr,
cfg.TransmitTimeout().Duration(),
client,
pm,
NewTransmitQueue(lggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm),
make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())),
serverURL,
transmitSuccessCount.WithLabelValues(feedIDHex, serverURL),
transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL),
transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL),
}
}

func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[string]wsrpc.Client, fromAccount ed25519.PublicKey, jobID int32, feedID [32]byte, orm ORM, codec TransmitterReportDecoder, triggerCapability *triggers.MercuryTriggerService) *mercuryTransmitter {
feedIDHex := fmt.Sprintf("0x%x", feedID[:])
servers := make(map[string]*server, len(clients))
for serverURL, client := range clients {
cLggr := lggr.Named(serverURL).With("serverURL", serverURL)
pm := NewPersistenceManager(cLggr, serverURL, orm, jobID, int(cfg.TransmitQueueMaxSize()), flushDeletesFrequency, pruneFrequency)
servers[serverURL] = &server{
cLggr,
cfg.TransmitTimeout().Duration(),
client,
pm,
NewTransmitQueue(cLggr, serverURL, feedIDHex, int(cfg.TransmitQueueMaxSize()), pm),
make(chan *pb.TransmitRequest, int(cfg.TransmitQueueMaxSize())),
transmitSuccessCount.WithLabelValues(feedIDHex, serverURL),
transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL),
transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueueInsertErrorCount.WithLabelValues(feedIDHex, serverURL),
transmitQueuePushErrorCount.WithLabelValues(feedIDHex, serverURL),
}
servers[serverURL] = newServer(cLggr, cfg, client, pm, serverURL, feedIDHex)
}
return &mercuryTransmitter{
services.StateMachine{},
Expand Down
Loading

0 comments on commit 8b245d5

Please sign in to comment.