Skip to content

Commit

Permalink
feat(temporal): enable sdk metrics (#2200)
Browse files Browse the repository at this point in the history
- Adds a new env variable to enable specific metric from Temporal SDK:
- `PEERDB_TEMPORAL_OTEL_METRICS_EXPORT_LIST` can be a comma separated
list of metrics to expose. If the list is empty or no metric matches the
elements in the list, then they are not exported
- If the value is set to `__ALL__`, then all Temporal metrics are
exposed
- Additionally adds an initial interface of how temporal interceptors
would look like
- The added metrics include `temporal_workflow_task_execution_failed`
metric having an attribute of `failure_reason` whose value can tell us
about nondeterminism if the value is `nondeterminismerror`
(temporalio/sdk-go#1295)
 - Additionally, the metrics are prefixed with `temporal.`

PeerDB metrics should not be affected as they use a separate exporter
and meterprovider.

The rationale behind exporting a subset of metrics is noise and cases
where metrics ingestion cannot be ignored

Full list of metrics can be viewed at
https://docs.temporal.io/references/sdk-metrics
  • Loading branch information
iamKunalGupta authored Oct 29, 2024
1 parent 877534a commit b23911e
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 26 deletions.
79 changes: 79 additions & 0 deletions flow/cmd/logged_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package cmd

import (
"context"

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/workflow"
)

type LoggedWorkflowInboundInterceptor struct {
interceptor.WorkflowInboundInterceptorBase
Next interceptor.WorkflowInboundInterceptor
}

func NewLoggedWorkflowInboundInterceptor(next interceptor.WorkflowInboundInterceptor) *LoggedWorkflowInboundInterceptor {
return &LoggedWorkflowInboundInterceptor{
WorkflowInboundInterceptorBase: interceptor.WorkflowInboundInterceptorBase{Next: next},
Next: next,
}
}

func (w *LoggedWorkflowInboundInterceptor) ExecuteWorkflow(
ctx workflow.Context,
in *interceptor.ExecuteWorkflowInput,
) (interface{}, error) {
// Workflow starts here
result, err := w.Next.ExecuteWorkflow(ctx, in)
// Workflow ends here
return result, err
}

type LoggedActivityInboundInterceptor struct {
interceptor.ActivityInboundInterceptorBase
Next interceptor.ActivityInboundInterceptor
}

func NewLoggedActivityInboundInterceptor(next interceptor.ActivityInboundInterceptor) *LoggedActivityInboundInterceptor {
return &LoggedActivityInboundInterceptor{
ActivityInboundInterceptorBase: interceptor.ActivityInboundInterceptorBase{Next: next},
Next: next,
}
}

func (c *LoggedActivityInboundInterceptor) ExecuteActivity(
ctx context.Context,
in *interceptor.ExecuteActivityInput,
) (interface{}, error) {
// Activity starts here
out, err := c.Next.ExecuteActivity(ctx, in)
// Activity ends here
return out, err
}

type LoggedWorkerInterceptor struct {
interceptor.WorkerInterceptorBase
}

func (c LoggedWorkerInterceptor) InterceptActivity(
ctx context.Context,
next interceptor.ActivityInboundInterceptor,
) interceptor.ActivityInboundInterceptor {
return NewLoggedActivityInboundInterceptor(next)
}

func (c LoggedWorkerInterceptor) InterceptWorkflow(
ctx workflow.Context,
next interceptor.WorkflowInboundInterceptor,
) interceptor.WorkflowInboundInterceptor {
// Workflow intercepted here
intercepted := NewLoggedWorkflowInboundInterceptor(next)
// Workflow intercepting ends here
return intercepted
}

func NewLoggedWorkerInterceptor() *LoggedWorkerInterceptor {
return &LoggedWorkerInterceptor{
WorkerInterceptorBase: interceptor.WorkerInterceptorBase{},
}
}
16 changes: 13 additions & 3 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/grafana/pyroscope-go"
"go.temporal.io/sdk/client"
temporalotel "go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
Expand Down Expand Up @@ -88,6 +89,15 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
Namespace: opts.TemporalNamespace,
Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))),
}
if opts.EnableOtelMetrics {
metricsProvider, metricsErr := otel_metrics.SetupTemporalMetricsProvider("flow-worker")
if metricsErr != nil {
return nil, metricsErr
}
clientOptions.MetricsHandler = temporalotel.NewMetricsHandler(temporalotel.MetricsHandlerOptions{
Meter: metricsProvider.Meter("temporal-sdk-go"),
})
}

if peerdbenv.PeerDBTemporalEnableCertAuth() {
slog.Info("Using temporal certificate/key for authentication")
Expand Down Expand Up @@ -136,9 +146,9 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
cleanupOtelManagerFunc := func() {}
var otelManager *otel_metrics.OtelManager
if opts.EnableOtelMetrics {
metricsProvider, metricErr := otel_metrics.SetupOtelMetricsExporter("flow-worker")
if metricErr != nil {
return nil, metricErr
metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker")
if metricsErr != nil {
return nil, metricsErr
}
otelManager = &otel_metrics.OtelManager{
MetricsProvider: metricsProvider,
Expand Down
29 changes: 15 additions & 14 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
peerdb_gauges "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -1192,10 +1193,10 @@ func (c *PostgresConnector) HandleSlotInfo(
slog.Float64("LagInMB", float64(slotInfo[0].LagInMb)))
alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0])
slotMetricGauges.SlotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet(
attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName),
attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName),
attribute.String(peerdb_gauges.SlotNameKey, alertKeys.SlotName),
attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))

// Also handles alerts for PeerDB user connections exceeding a given limit here
res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User)
Expand All @@ -1205,9 +1206,9 @@ func (c *PostgresConnector) HandleSlotInfo(
}
alerter.AlertIfOpenConnections(ctx, alertKeys, res)
slotMetricGauges.OpenConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet(
attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName),
attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName),
attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))

replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User)
if err != nil {
Expand All @@ -1216,9 +1217,9 @@ func (c *PostgresConnector) HandleSlotInfo(
}

slotMetricGauges.OpenReplicationConnectionsGauge.Set(replicationRes.CurrentOpenConnections, attribute.NewSet(
attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName),
attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName),
attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))

var intervalSinceLastNormalize *time.Duration
err = alerter.CatalogPool.QueryRow(ctx, "SELECT now()-max(end_time) FROM peerdb_stats.cdc_batches WHERE flow_name=$1",
Expand All @@ -1233,9 +1234,9 @@ func (c *PostgresConnector) HandleSlotInfo(
}
if intervalSinceLastNormalize != nil {
slotMetricGauges.IntervalSinceLastNormalizeGauge.Set(intervalSinceLastNormalize.Seconds(), attribute.NewSet(
attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName),
attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName),
attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName),
attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())))
alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize)
}

Expand Down
6 changes: 5 additions & 1 deletion flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ require (
go.opentelemetry.io/otel v1.31.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0
go.opentelemetry.io/otel/metric v1.31.0
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.temporal.io/api v1.40.0
go.temporal.io/sdk v1.29.1
go.temporal.io/sdk/contrib/opentelemetry v0.6.0
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.28.0
golang.org/x/sync v0.8.0
Expand Down Expand Up @@ -139,7 +144,6 @@ require (
go.opentelemetry.io/contrib/detectors/gcp v1.31.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/term v0.25.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,12 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0/go.mod h1:B5Ki776z/MBnVha1Nzwp5arlzBbE3+1jk+pGmaP5HME=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 h1:FFeLy03iVTXP6ffeN2iXrxfGsZGCjVx0/4KlizjyBwU=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0/go.mod h1:TMu73/k1CP8nBUpDLc71Wj/Kf7ZS9FK5b53VapRsP9o=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU=
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE=
go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY=
go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk=
Expand All @@ -504,10 +510,14 @@ go.temporal.io/api v1.40.0 h1:rH3HvUUCFr0oecQTBW5tI6DdDQsX2Xb6OFVgt/bvLto=
go.temporal.io/api v1.40.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0=
go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ=
go.temporal.io/sdk/contrib/opentelemetry v0.6.0 h1:rNBArDj5iTUkcMwKocUShoAW59o6HdS7Nq4CTp4ldj8=
go.temporal.io/sdk/contrib/opentelemetry v0.6.0/go.mod h1:Lem8VrE2ks8P+FYcRM3UphPoBr+tfM3v/Kaf0qStzSg=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package peerdb_gauges
package otel_metrics

const (
PeerNameKey string = "peerName"
Expand Down
11 changes: 11 additions & 0 deletions flow/otel_metrics/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package otel_metrics

import "github.com/PeerDB-io/peer-flow/peerdbenv"

func GetPeerDBOtelMetricsNamespace() string {
return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "")
}

func GetPeerDBOtelMetricsExportListEnv() string {
return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_EXPORT_LIST", "")
}
84 changes: 79 additions & 5 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package otel_metrics
import (
"context"
"fmt"
"log/slog"
"strings"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
Expand All @@ -22,12 +25,16 @@ type OtelManager struct {
}

// newOtelResource returns a resource describing this application.
func newOtelResource(otelServiceName string) (*resource.Resource, error) {
func newOtelResource(otelServiceName string, attrs ...attribute.KeyValue) (*resource.Resource, error) {
allAttrs := []attribute.KeyValue{
semconv.ServiceNameKey.String(otelServiceName),
}
allAttrs = append(allAttrs, attrs...)
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(otelServiceName),
allAttrs...,
),
)

Expand All @@ -42,7 +49,53 @@ func setupGrpcOtelMetricsExporter() (sdkmetric.Exporter, error) {
return otlpmetricgrpc.New(context.Background())
}

func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, error) {
func temporalMetricsFilteringView() sdkmetric.View {
exportListString := GetPeerDBOtelMetricsExportListEnv()
slog.Info("Found export list for temporal metrics", slog.String("exportList", exportListString))
// Special case for exporting all metrics
if exportListString == "__ALL__" {
return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) {
stream := sdkmetric.Stream{
Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name,
Description: instrument.Description,
Unit: instrument.Unit,
}
return stream, true
}
}
exportList := strings.Split(exportListString, ",")
// Don't export any metrics if the list is empty
if len(exportList) == 0 {
return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) {
return sdkmetric.Stream{
Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name,
Description: instrument.Description,
Unit: instrument.Unit,
Aggregation: sdkmetric.AggregationDrop{},
}, true
}
}

// Export only the metrics in the list
enabledMetrics := make(map[string]struct{}, len(exportList))
for _, metricName := range exportList {
trimmedMetricName := strings.TrimSpace(metricName)
enabledMetrics[trimmedMetricName] = struct{}{}
}
return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) {
stream := sdkmetric.Stream{
Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name,
Description: instrument.Description,
Unit: instrument.Unit,
}
if _, ok := enabledMetrics[instrument.Name]; !ok {
stream.Aggregation = sdkmetric.AggregationDrop{}
}
return stream, true
}
}

func setupExporter() (sdkmetric.Exporter, error) {
otlpMetricProtocol := peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_PROTOCOL",
peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", "http/protobuf"))
var metricExporter sdkmetric.Exporter
Expand All @@ -58,14 +111,35 @@ func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider,
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err)
}
otelResource, err := newOtelResource(otelServiceName)
return metricExporter, err
}

func setupMetricsProvider(otelResource *resource.Resource, views ...sdkmetric.View) (*sdkmetric.MeterProvider, error) {
metricExporter, err := setupExporter()
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err)
return nil, err
}

meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter)),
sdkmetric.WithResource(otelResource),
sdkmetric.WithView(views...),
)
return meterProvider, nil
}

func SetupPeerDBMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) {
otelResource, err := newOtelResource(otelServiceName)
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err)
}
return setupMetricsProvider(otelResource)
}

func SetupTemporalMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) {
otelResource, err := newOtelResource(otelServiceName, attribute.String(DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err)
}
return setupMetricsProvider(otelResource, temporalMetricsFilteringView())
}
3 changes: 1 addition & 2 deletions flow/otel_metrics/peerdb_gauges/gauges.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package peerdb_gauges

import (
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

const (
Expand All @@ -20,5 +19,5 @@ type SlotMetricGauges struct {
}

func BuildGaugeName(baseGaugeName string) string {
return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") + baseGaugeName
return otel_metrics.GetPeerDBOtelMetricsNamespace() + baseGaugeName
}

0 comments on commit b23911e

Please sign in to comment.