diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index 5496bbdefc1..1594dc6eb56 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -13,6 +13,8 @@ import ( ethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -57,6 +59,13 @@ var ( _ connector.GatewayConnectorHandler = &functionsConnectorHandler{} ) +var ( + promStorageUserUpdatesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "storage_user_updates", + Help: "Number of storage updates performed by users", + }, []string{}) +) + // internal request ID is a hash of (sender, requestID) func InternalId(sender []byte, requestId []byte) RequestID { return RequestID(crypto.Keccak256Hash(append(sender, requestId...)).Bytes()) @@ -185,6 +194,7 @@ func (h *functionsConnectorHandler) handleSecretsSet(ctx context.Context, gatewa err = h.storage.Put(ctx, &key, &record, request.Signature) if err == nil { response.Success = true + promStorageUserUpdatesCount.WithLabelValues().Inc() } else { response.ErrorMessage = fmt.Sprintf("Failed to set secret: %v", err) } diff --git a/core/services/ocr2/plugins/s4/plugin.go b/core/services/ocr2/plugins/s4/plugin.go index fcb025b21cd..677743c091d 100644 --- a/core/services/ocr2/plugins/s4/plugin.go +++ b/core/services/ocr2/plugins/s4/plugin.go @@ -4,13 +4,28 @@ import ( "context" "time" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/libocr/commontypes" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/s4" +) - "github.com/pkg/errors" - "github.com/smartcontractkit/libocr/commontypes" - "github.com/smartcontractkit/libocr/offchainreporting2plus/types" +var ( + promStoragePluginUpdatesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "storage_plugin_updates", + Help: "Number of storage updates fetched from other nodes", + }, []string{}) + + promStorageTotalByteSize = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "storage_total_byte_size", + Help: "Current byte size of data stored in S4", + }, []string{}) ) type plugin struct { @@ -59,6 +74,7 @@ func (c *plugin) Query(ctx context.Context, ts types.ReportTimestamp) (types.Que return nil, errors.Wrap(err, "failed to GetVersions in Query()") } + var storageTotalByteSize uint64 rows := make([]*SnapshotRow, len(snapshot)) for i, v := range snapshot { rows[i] = &SnapshotRow{ @@ -66,6 +82,8 @@ func (c *plugin) Query(ctx context.Context, ts types.ReportTimestamp) (types.Que Slotid: uint32(v.SlotId), Version: v.Version, } + + storageTotalByteSize += v.PayloadSize } queryBytes, err := MarshalQuery(rows, c.addressRange) @@ -76,6 +94,8 @@ func (c *plugin) Query(ctx context.Context, ts types.ReportTimestamp) (types.Que promReportingPluginsQueryRowsCount.WithLabelValues(c.config.ProductName).Set(float64(len(rows))) promReportingPluginsQueryByteSize.WithLabelValues(c.config.ProductName).Set(float64(len(queryBytes))) + promStorageTotalByteSize.WithLabelValues().Set(float64(storageTotalByteSize)) + c.addressRange.Advance() c.logger.Debug("S4StorageReporting Query", commontypes.LogFields{ @@ -268,6 +288,7 @@ func (c *plugin) ShouldAcceptFinalizedReport(ctx context.Context, ts types.Repor c.logger.Error("Failed to Update a row in ShouldAcceptFinalizedReport()", commontypes.LogFields{"err": err}) continue } + promStoragePluginUpdatesCount.WithLabelValues().Inc() } c.logger.Debug("S4StorageReporting ShouldAcceptFinalizedReport", commontypes.LogFields{ diff --git a/core/services/s4/orm.go b/core/services/s4/orm.go index 59f3410e143..4d3cee9312a 100644 --- a/core/services/s4/orm.go +++ b/core/services/s4/orm.go @@ -20,11 +20,12 @@ type Row struct { // SnapshotRow(s) are returned by GetSnapshot function. type SnapshotRow struct { - Address *big.Big - SlotId uint - Version uint64 - Expiration int64 - Confirmed bool + Address *big.Big + SlotId uint + Version uint64 + Expiration int64 + Confirmed bool + PayloadSize uint64 } //go:generate mockery --quiet --name ORM --output ./mocks/ --case=underscore diff --git a/core/services/s4/postgres_orm.go b/core/services/s4/postgres_orm.go index dba98b64aa6..1f92f2e1281 100644 --- a/core/services/s4/postgres_orm.go +++ b/core/services/s4/postgres_orm.go @@ -90,7 +90,7 @@ func (o orm) GetSnapshot(addressRange *AddressRange, qopts ...pg.QOpt) ([]*Snaps q := o.q.WithOpts(qopts...) rows := make([]*SnapshotRow, 0) - stmt := fmt.Sprintf(`SELECT address, slot_id, version, expiration, confirmed FROM %s WHERE namespace = $1 AND address >= $2 AND address <= $3;`, o.tableName) + stmt := fmt.Sprintf(`SELECT address, slot_id, version, expiration, confirmed, octet_length(payload) AS payload_size FROM %s WHERE namespace = $1 AND address >= $2 AND address <= $3;`, o.tableName) if err := q.Select(&rows, stmt, o.namespace, addressRange.MinAddress, addressRange.MaxAddress); err != nil { if !errors.Is(err, sql.ErrNoRows) { return nil, err diff --git a/core/services/s4/postgres_orm_test.go b/core/services/s4/postgres_orm_test.go index 4d07524b4ea..d26f082ce5b 100644 --- a/core/services/s4/postgres_orm_test.go +++ b/core/services/s4/postgres_orm_test.go @@ -181,6 +181,7 @@ func TestPostgresORM_GetSnapshot(t *testing.T) { assert.Equal(t, snapshotRow.Version, sr.Version) assert.Equal(t, snapshotRow.Expiration, sr.Expiration) assert.Equal(t, snapshotRow.Confirmed, sr.Confirmed) + assert.Equal(t, snapshotRow.PayloadSize, uint64(len(sr.Payload))) } })