Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mercury db multicast performance issue #13435

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eighty-hotels-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix panic if mercury server returns error #bugfix
5 changes: 5 additions & 0 deletions .changeset/twelve-wolves-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Performance improvements for mercury single insert for multiple mercury servers #internal
36 changes: 30 additions & 6 deletions core/services/relay/evm/mercury/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"crypto/sha256"
"database/sql"
"errors"
"fmt"
"strings"
"sync"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -19,7 +21,7 @@ import (
)

type ORM interface {
InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error
InsertTransmitRequest(ctx context.Context, serverURLs []string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error
DeleteTransmitRequests(ctx context.Context, serverURL string, reqs []*pb.TransmitRequest) error
GetTransmitRequests(ctx context.Context, serverURL string, jobID int32) ([]*Transmission, error)
PruneTransmitRequests(ctx context.Context, serverURL string, jobID int32, maxSize int) error
Expand All @@ -42,23 +44,45 @@ func NewORM(ds sqlutil.DataSource) ORM {
}

// InsertTransmitRequest inserts one transmit request if the payload does not exist already.
func (o *orm) InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error {
func (o *orm) InsertTransmitRequest(ctx context.Context, serverURLs []string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error {
feedID, err := FeedIDFromReport(req.Payload)
if err != nil {
return err
}
if len(serverURLs) == 0 {
return errors.New("no server URLs provided")
}

var wg sync.WaitGroup
wg.Add(2)
var err1, err2 error

go func() {
defer wg.Done()
_, err1 = o.ds.ExecContext(ctx, `
INSERT INTO mercury_transmit_requests (server_url, payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)

values := make([]string, len(serverURLs))
args := []interface{}{
req.Payload,
hashPayload(req.Payload),
reportCtx.ConfigDigest[:],
reportCtx.Epoch,
reportCtx.Round,
reportCtx.ExtraHash[:],
jobID,
feedID[:],
}
for i, serverURL := range serverURLs {
// server url is the only thing that changes, might as well re-use
// the same parameters for each insert
values[i] = fmt.Sprintf("($1, $2, $3, $4, $5, $6, $7, $8, $%d)", i+9)
args = append(args, serverURL)
}

_, err1 = o.ds.ExecContext(ctx, fmt.Sprintf(`
INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id, server_url)
VALUES %s
ON CONFLICT (server_url, payload_hash) DO NOTHING
`, serverURL, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:])
`, strings.Join(values, ",")), args...)
}()

go func() {
Expand Down
87 changes: 66 additions & 21 deletions core/services/relay/evm/mercury/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func TestORM(t *testing.T) {

// Test insert and get requests.
// s1
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0])
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1])
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2])
require.NoError(t, err)

// s2
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0])
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0])
require.NoError(t, err)

transmissions, err := orm.GetTransmitRequests(ctx, sURL, jobID)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestORM(t *testing.T) {
require.Empty(t, transmissions)

// More inserts.
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests(ctx, sURL, jobID)
Expand All @@ -129,9 +129,9 @@ func TestORM(t *testing.T) {
})

// Duplicate requests are ignored.
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3])
require.NoError(t, err)

transmissions, err = orm.GetTransmitRequests(ctx, sURL, jobID)
Expand All @@ -151,6 +151,51 @@ func TestORM(t *testing.T) {

}

func TestORM_InsertTransmitRequest_MultipleServerURLs(t *testing.T) {
ctx := testutils.Context(t)
db := pgtest.NewSqlxDB(t)

jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter
pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`)
pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`)
orm := NewORM(db)
feedID := sampleFeedID

reports := sampleReports
reportContexts := make([]ocrtypes.ReportContext, 4)
for i := range reportContexts {
reportContexts[i] = ocrtypes.ReportContext{
ReportTimestamp: ocrtypes.ReportTimestamp{
ConfigDigest: ocrtypes.ConfigDigest{'1'},
Epoch: 10,
Round: uint8(i),
},
ExtraHash: [32]byte{'2'},
}
}
err := orm.InsertTransmitRequest(ctx, []string{sURL, sURL2, sURL3}, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0])
require.NoError(t, err)

transmissions, err := orm.GetTransmitRequests(ctx, sURL, jobID)
require.NoError(t, err)
require.Len(t, transmissions, 1)
assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]})

transmissions, err = orm.GetTransmitRequests(ctx, sURL2, jobID)
require.NoError(t, err)
require.Len(t, transmissions, 1)
assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]})

transmissions, err = orm.GetTransmitRequests(ctx, sURL3, jobID)
require.NoError(t, err)
require.Len(t, transmissions, 1)
assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]})

l, err := orm.LatestReport(testutils.Context(t), feedID)
require.NoError(t, err)
assert.Equal(t, reports[0], l)
}

func TestORM_PruneTransmitRequests(t *testing.T) {
ctx := testutils.Context(t)
db := pgtest.NewSqlxDB(t)
Expand All @@ -174,18 +219,18 @@ func TestORM_PruneTransmitRequests(t *testing.T) {
}

// s1
err := orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1))
err := orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2))
require.NoError(t, err)
// s2 - should not be touched
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0))
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1))
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2))
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3))
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3))
require.NoError(t, err)

// Max size greater than number of records, expect no-op
Expand Down Expand Up @@ -221,9 +266,9 @@ func TestORM_PruneTransmitRequests(t *testing.T) {
{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)},
}, transmissions)

err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1))
require.NoError(t, err)
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2))
require.NoError(t, err)

// Max size is table size - 1, expect the oldest row to be pruned.
Expand Down Expand Up @@ -267,13 +312,13 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) {
}
}

err := orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(
err := orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(
0, 0,
))
require.NoError(t, err)

// this should be ignored, because report context is the same
err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(
err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(
0, 0,
))
require.NoError(t, err)
Expand All @@ -283,31 +328,31 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) {
assert.Equal(t, reports[0], l)

t.Run("replaces if epoch and round are larger", func(t *testing.T) {
err = orm.InsertTransmitRequest(ctx, "foo", &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1))
err = orm.InsertTransmitRequest(ctx, []string{"foo"}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1))
require.NoError(t, err)

l, err = orm.LatestReport(testutils.Context(t), feedID)
require.NoError(t, err)
assert.Equal(t, reports[1], l)
})
t.Run("replaces if epoch is the same but round is greater", func(t *testing.T) {
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2))
require.NoError(t, err)

l, err = orm.LatestReport(testutils.Context(t), feedID)
require.NoError(t, err)
assert.Equal(t, reports[2], l)
})
t.Run("replaces if epoch is larger but round is smaller", func(t *testing.T) {
err = orm.InsertTransmitRequest(ctx, "bar", &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1))
err = orm.InsertTransmitRequest(ctx, []string{"bar"}, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1))
require.NoError(t, err)

l, err = orm.LatestReport(testutils.Context(t), feedID)
require.NoError(t, err)
assert.Equal(t, reports[3], l)
})
t.Run("does not overwrite if epoch/round is the same", func(t *testing.T) {
err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1))
err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1))
require.NoError(t, err)

l, err = orm.LatestReport(testutils.Context(t), feedID)
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/mercury/persistence_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (pm *PersistenceManager) Close() error {
}

func (pm *PersistenceManager) Insert(ctx context.Context, req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) error {
return pm.orm.InsertTransmitRequest(ctx, pm.serverURL, req, pm.jobID, reportCtx)
return pm.orm.InsertTransmitRequest(ctx, []string{pm.serverURL}, req, pm.jobID, reportCtx)
}

func (pm *PersistenceManager) Delete(ctx context.Context, req *pb.TransmitRequest) error {
Expand Down
11 changes: 7 additions & 4 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pkgerrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
Expand Down Expand Up @@ -109,6 +110,7 @@ type mercuryTransmitter struct {
lggr logger.Logger
cfg TransmitterConfig

orm ORM
servers map[string]*server

codec TransmitterReportDecoder
Expand Down Expand Up @@ -307,6 +309,7 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin
services.StateMachine{},
lggr.Named("MercuryTransmitter").With("feedID", feedIDHex),
cfg,
orm,
servers,
codec,
feedID,
Expand Down Expand Up @@ -407,14 +410,14 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R

mt.lggr.Tracew("Transmit enqueue", "req.Payload", req.Payload, "report", report, "reportCtx", reportCtx, "signatures", signatures)

if err := mt.orm.InsertTransmitRequest(ctx, maps.Keys(mt.servers), req, mt.jobID, reportCtx); err != nil {
return err
}

g := new(errgroup.Group)
for _, s := range mt.servers {
s := s // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
if err := s.pm.Insert(ctx, req, reportCtx); err != nil {
s.transmitQueueInsertErrorCount.Inc()
return err
}
if ok := s.q.Push(req, reportCtx); !ok {
s.transmitQueuePushErrorCount.Inc()
return errors.New("transmit queue is closed")
Expand Down
Loading