Skip to content

Commit

Permalink
[FUN-990] s4 observability improvements (#11512)
Browse files Browse the repository at this point in the history
* chore: count s4 number of updates performed by nodes

* chore: add storage total use and slots occupied

* chore: add plugin side of counting updates

* fix: modify total size counter to use snapshot len

* chore: take into account the payload size for total size

* chore: fix lint errors

* fix: fetch only payload_size, reword metrics help

* chore: remove don_id label

* chore: remove don_id for consistency
  • Loading branch information
agparadiso authored Dec 13, 2023
1 parent 1a26acd commit 6f13447
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 9 deletions.
10 changes: 10 additions & 0 deletions core/services/functions/connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-common/pkg/assets"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand Down Expand Up @@ -57,6 +59,13 @@ var (
_ connector.GatewayConnectorHandler = &functionsConnectorHandler{}
)

var (
promStorageUserUpdatesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "storage_user_updates",
Help: "Number of storage updates performed by users",
}, []string{})
)

// internal request ID is a hash of (sender, requestID)
func InternalId(sender []byte, requestId []byte) RequestID {
return RequestID(crypto.Keccak256Hash(append(sender, requestId...)).Bytes())
Expand Down Expand Up @@ -185,6 +194,7 @@ func (h *functionsConnectorHandler) handleSecretsSet(ctx context.Context, gatewa
err = h.storage.Put(ctx, &key, &record, request.Signature)
if err == nil {
response.Success = true
promStorageUserUpdatesCount.WithLabelValues().Inc()
} else {
response.ErrorMessage = fmt.Sprintf("Failed to set secret: %v", err)
}
Expand Down
27 changes: 24 additions & 3 deletions core/services/ocr2/plugins/s4/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,28 @@ import (
"context"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/s4"
)

"github.com/pkg/errors"
"github.com/smartcontractkit/libocr/commontypes"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
var (
promStoragePluginUpdatesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "storage_plugin_updates",
Help: "Number of storage updates fetched from other nodes",
}, []string{})

promStorageTotalByteSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "storage_total_byte_size",
Help: "Current byte size of data stored in S4",
}, []string{})
)

type plugin struct {
Expand Down Expand Up @@ -59,13 +74,16 @@ func (c *plugin) Query(ctx context.Context, ts types.ReportTimestamp) (types.Que
return nil, errors.Wrap(err, "failed to GetVersions in Query()")
}

var storageTotalByteSize uint64
rows := make([]*SnapshotRow, len(snapshot))
for i, v := range snapshot {
rows[i] = &SnapshotRow{
Address: v.Address.Bytes(),
Slotid: uint32(v.SlotId),
Version: v.Version,
}

storageTotalByteSize += v.PayloadSize
}

queryBytes, err := MarshalQuery(rows, c.addressRange)
Expand All @@ -76,6 +94,8 @@ func (c *plugin) Query(ctx context.Context, ts types.ReportTimestamp) (types.Que
promReportingPluginsQueryRowsCount.WithLabelValues(c.config.ProductName).Set(float64(len(rows)))
promReportingPluginsQueryByteSize.WithLabelValues(c.config.ProductName).Set(float64(len(queryBytes)))

promStorageTotalByteSize.WithLabelValues().Set(float64(storageTotalByteSize))

c.addressRange.Advance()

c.logger.Debug("S4StorageReporting Query", commontypes.LogFields{
Expand Down Expand Up @@ -268,6 +288,7 @@ func (c *plugin) ShouldAcceptFinalizedReport(ctx context.Context, ts types.Repor
c.logger.Error("Failed to Update a row in ShouldAcceptFinalizedReport()", commontypes.LogFields{"err": err})
continue
}
promStoragePluginUpdatesCount.WithLabelValues().Inc()
}

c.logger.Debug("S4StorageReporting ShouldAcceptFinalizedReport", commontypes.LogFields{
Expand Down
11 changes: 6 additions & 5 deletions core/services/s4/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ type Row struct {

// SnapshotRow(s) are returned by GetSnapshot function.
type SnapshotRow struct {
Address *big.Big
SlotId uint
Version uint64
Expiration int64
Confirmed bool
Address *big.Big
SlotId uint
Version uint64
Expiration int64
Confirmed bool
PayloadSize uint64
}

//go:generate mockery --quiet --name ORM --output ./mocks/ --case=underscore
Expand Down
2 changes: 1 addition & 1 deletion core/services/s4/postgres_orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (o orm) GetSnapshot(addressRange *AddressRange, qopts ...pg.QOpt) ([]*Snaps
q := o.q.WithOpts(qopts...)
rows := make([]*SnapshotRow, 0)

stmt := fmt.Sprintf(`SELECT address, slot_id, version, expiration, confirmed FROM %s WHERE namespace = $1 AND address >= $2 AND address <= $3;`, o.tableName)
stmt := fmt.Sprintf(`SELECT address, slot_id, version, expiration, confirmed, octet_length(payload) AS payload_size FROM %s WHERE namespace = $1 AND address >= $2 AND address <= $3;`, o.tableName)
if err := q.Select(&rows, stmt, o.namespace, addressRange.MinAddress, addressRange.MaxAddress); err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions core/services/s4/postgres_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func TestPostgresORM_GetSnapshot(t *testing.T) {
assert.Equal(t, snapshotRow.Version, sr.Version)
assert.Equal(t, snapshotRow.Expiration, sr.Expiration)
assert.Equal(t, snapshotRow.Confirmed, sr.Confirmed)
assert.Equal(t, snapshotRow.PayloadSize, uint64(len(sr.Payload)))
}
})

Expand Down

0 comments on commit 6f13447

Please sign in to comment.