diff --git a/.changeset/eighty-hotels-sit.md b/.changeset/eighty-hotels-sit.md new file mode 100644 index 00000000000..e83b70c7695 --- /dev/null +++ b/.changeset/eighty-hotels-sit.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix panic if mercury server returns error #bugfix diff --git a/.changeset/twelve-wolves-clean.md b/.changeset/twelve-wolves-clean.md new file mode 100644 index 00000000000..c38fbe3fd83 --- /dev/null +++ b/.changeset/twelve-wolves-clean.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Performance improvements for mercury single insert for multiple mercury servers #internal diff --git a/core/services/relay/evm/mercury/orm.go b/core/services/relay/evm/mercury/orm.go index 6426ef54a5d..65df9ab4cc6 100644 --- a/core/services/relay/evm/mercury/orm.go +++ b/core/services/relay/evm/mercury/orm.go @@ -5,6 +5,8 @@ import ( "crypto/sha256" "database/sql" "errors" + "fmt" + "strings" "sync" "github.com/ethereum/go-ethereum/common" @@ -19,7 +21,7 @@ import ( ) type ORM interface { - InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error + InsertTransmitRequest(ctx context.Context, serverURLs []string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error DeleteTransmitRequests(ctx context.Context, serverURL string, reqs []*pb.TransmitRequest) error GetTransmitRequests(ctx context.Context, serverURL string, jobID int32) ([]*Transmission, error) PruneTransmitRequests(ctx context.Context, serverURL string, jobID int32, maxSize int) error @@ -42,11 +44,14 @@ func NewORM(ds sqlutil.DataSource) ORM { } // InsertTransmitRequest inserts one transmit request if the payload does not exist already. -func (o *orm) InsertTransmitRequest(ctx context.Context, serverURL string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error { +func (o *orm) InsertTransmitRequest(ctx context.Context, serverURLs []string, req *pb.TransmitRequest, jobID int32, reportCtx ocrtypes.ReportContext) error { feedID, err := FeedIDFromReport(req.Payload) if err != nil { return err } + if len(serverURLs) == 0 { + return errors.New("no server URLs provided") + } var wg sync.WaitGroup wg.Add(2) @@ -54,11 +59,30 @@ func (o *orm) InsertTransmitRequest(ctx context.Context, serverURL string, req * go func() { defer wg.Done() - _, err1 = o.ds.ExecContext(ctx, ` - INSERT INTO mercury_transmit_requests (server_url, payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + + values := make([]string, len(serverURLs)) + args := []interface{}{ + req.Payload, + hashPayload(req.Payload), + reportCtx.ConfigDigest[:], + reportCtx.Epoch, + reportCtx.Round, + reportCtx.ExtraHash[:], + jobID, + feedID[:], + } + for i, serverURL := range serverURLs { + // server url is the only thing that changes, might as well re-use + // the same parameters for each insert + values[i] = fmt.Sprintf("($1, $2, $3, $4, $5, $6, $7, $8, $%d)", i+9) + args = append(args, serverURL) + } + + _, err1 = o.ds.ExecContext(ctx, fmt.Sprintf(` + INSERT INTO mercury_transmit_requests (payload, payload_hash, config_digest, epoch, round, extra_hash, job_id, feed_id, server_url) + VALUES %s ON CONFLICT (server_url, payload_hash) DO NOTHING - `, serverURL, req.Payload, hashPayload(req.Payload), reportCtx.ConfigDigest[:], reportCtx.Epoch, reportCtx.Round, reportCtx.ExtraHash[:], jobID, feedID[:]) + `, strings.Join(values, ",")), args...) }() go func() { diff --git a/core/services/relay/evm/mercury/orm_test.go b/core/services/relay/evm/mercury/orm_test.go index 2b2e15ffd53..bffc35522f3 100644 --- a/core/services/relay/evm/mercury/orm_test.go +++ b/core/services/relay/evm/mercury/orm_test.go @@ -48,15 +48,15 @@ func TestORM(t *testing.T) { // Test insert and get requests. // s1 - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0]) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[1]}, jobID, reportContexts[1]) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, reportContexts[2]) require.NoError(t, err) // s2 - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0]) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[0]) require.NoError(t, err) transmissions, err := orm.GetTransmitRequests(ctx, sURL, jobID) @@ -119,7 +119,7 @@ func TestORM(t *testing.T) { require.Empty(t, transmissions) // More inserts. - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) transmissions, err = orm.GetTransmitRequests(ctx, sURL, jobID) @@ -129,9 +129,9 @@ func TestORM(t *testing.T) { }) // Duplicate requests are ignored. - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, reportContexts[3]) require.NoError(t, err) transmissions, err = orm.GetTransmitRequests(ctx, sURL, jobID) @@ -151,6 +151,51 @@ func TestORM(t *testing.T) { } +func TestORM_InsertTransmitRequest_MultipleServerURLs(t *testing.T) { + ctx := testutils.Context(t) + db := pgtest.NewSqlxDB(t) + + jobID := rand.Int32() // foreign key constraints disabled so value doesn't matter + pgtest.MustExec(t, db, `SET CONSTRAINTS mercury_transmit_requests_job_id_fkey DEFERRED`) + pgtest.MustExec(t, db, `SET CONSTRAINTS feed_latest_reports_job_id_fkey DEFERRED`) + orm := NewORM(db) + feedID := sampleFeedID + + reports := sampleReports + reportContexts := make([]ocrtypes.ReportContext, 4) + for i := range reportContexts { + reportContexts[i] = ocrtypes.ReportContext{ + ReportTimestamp: ocrtypes.ReportTimestamp{ + ConfigDigest: ocrtypes.ConfigDigest{'1'}, + Epoch: 10, + Round: uint8(i), + }, + ExtraHash: [32]byte{'2'}, + } + } + err := orm.InsertTransmitRequest(ctx, []string{sURL, sURL2, sURL3}, &pb.TransmitRequest{Payload: reports[0]}, jobID, reportContexts[0]) + require.NoError(t, err) + + transmissions, err := orm.GetTransmitRequests(ctx, sURL, jobID) + require.NoError(t, err) + require.Len(t, transmissions, 1) + assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]}) + + transmissions, err = orm.GetTransmitRequests(ctx, sURL2, jobID) + require.NoError(t, err) + require.Len(t, transmissions, 1) + assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]}) + + transmissions, err = orm.GetTransmitRequests(ctx, sURL3, jobID) + require.NoError(t, err) + require.Len(t, transmissions, 1) + assert.Equal(t, transmissions[0], &Transmission{Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: reportContexts[0]}) + + l, err := orm.LatestReport(testutils.Context(t), feedID) + require.NoError(t, err) + assert.Equal(t, reports[0], l) +} + func TestORM_PruneTransmitRequests(t *testing.T) { ctx := testutils.Context(t) db := pgtest.NewSqlxDB(t) @@ -174,18 +219,18 @@ func TestORM_PruneTransmitRequests(t *testing.T) { } // s1 - err := orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) + err := orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) require.NoError(t, err) // s2 - should not be touched - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0)) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 0)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(1, 1)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 2)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3)) + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 3)) require.NoError(t, err) // Max size greater than number of records, expect no-op @@ -221,9 +266,9 @@ func TestORM_PruneTransmitRequests(t *testing.T) { {Req: &pb.TransmitRequest{Payload: reports[0]}, ReportCtx: makeReportContext(1, 1)}, }, transmissions) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 2)) require.NoError(t, err) // Max size is table size - 1, expect the oldest row to be pruned. @@ -267,13 +312,13 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { } } - err := orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext( + err := orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext( 0, 0, )) require.NoError(t, err) // this should be ignored, because report context is the same - err = orm.InsertTransmitRequest(ctx, sURL2, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext( + err = orm.InsertTransmitRequest(ctx, []string{sURL2}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext( 0, 0, )) require.NoError(t, err) @@ -283,7 +328,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[0], l) t.Run("replaces if epoch and round are larger", func(t *testing.T) { - err = orm.InsertTransmitRequest(ctx, "foo", &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1)) + err = orm.InsertTransmitRequest(ctx, []string{"foo"}, &pb.TransmitRequest{Payload: reports[1]}, jobID, makeReportContext(1, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -291,7 +336,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[1], l) }) t.Run("replaces if epoch is the same but round is greater", func(t *testing.T) { - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[2]}, jobID, makeReportContext(1, 2)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -299,7 +344,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[2], l) }) t.Run("replaces if epoch is larger but round is smaller", func(t *testing.T) { - err = orm.InsertTransmitRequest(ctx, "bar", &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest(ctx, []string{"bar"}, &pb.TransmitRequest{Payload: reports[3]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) @@ -307,7 +352,7 @@ func TestORM_InsertTransmitRequest_LatestReport(t *testing.T) { assert.Equal(t, reports[3], l) }) t.Run("does not overwrite if epoch/round is the same", func(t *testing.T) { - err = orm.InsertTransmitRequest(ctx, sURL, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1)) + err = orm.InsertTransmitRequest(ctx, []string{sURL}, &pb.TransmitRequest{Payload: reports[0]}, jobID, makeReportContext(2, 1)) require.NoError(t, err) l, err = orm.LatestReport(testutils.Context(t), feedID) diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index d49d0d4ed01..38576174423 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -69,7 +69,7 @@ func (pm *PersistenceManager) Close() error { } func (pm *PersistenceManager) Insert(ctx context.Context, req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) error { - return pm.orm.InsertTransmitRequest(ctx, pm.serverURL, req, pm.jobID, reportCtx) + return pm.orm.InsertTransmitRequest(ctx, []string{pm.serverURL}, req, pm.jobID, reportCtx) } func (pm *PersistenceManager) Delete(ctx context.Context, req *pb.TransmitRequest) error { diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index d8ef981ff0c..7abd926255a 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -18,6 +18,7 @@ import ( pkgerrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil" @@ -109,6 +110,7 @@ type mercuryTransmitter struct { lggr logger.Logger cfg TransmitterConfig + orm ORM servers map[string]*server codec TransmitterReportDecoder @@ -307,6 +309,7 @@ func NewTransmitter(lggr logger.Logger, cfg TransmitterConfig, clients map[strin services.StateMachine{}, lggr.Named("MercuryTransmitter").With("feedID", feedIDHex), cfg, + orm, servers, codec, feedID, @@ -407,14 +410,14 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R mt.lggr.Tracew("Transmit enqueue", "req.Payload", req.Payload, "report", report, "reportCtx", reportCtx, "signatures", signatures) + if err := mt.orm.InsertTransmitRequest(ctx, maps.Keys(mt.servers), req, mt.jobID, reportCtx); err != nil { + return err + } + g := new(errgroup.Group) for _, s := range mt.servers { s := s // https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error { - if err := s.pm.Insert(ctx, req, reportCtx); err != nil { - s.transmitQueueInsertErrorCount.Inc() - return err - } if ok := s.q.Push(req, reportCtx); !ok { s.transmitQueuePushErrorCount.Inc() return errors.New("transmit queue is closed")