From b23911e935f7af949fa4ec8adeb9463986c9e307 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Wed, 30 Oct 2024 02:06:29 +0530 Subject: [PATCH] feat(temporal): enable sdk metrics (#2200) - Adds a new env variable to enable specific metric from Temporal SDK: - `PEERDB_TEMPORAL_OTEL_METRICS_EXPORT_LIST` can be a comma separated list of metrics to expose. If the list is empty or no metric matches the elements in the list, then they are not exported - If the value is set to `__ALL__`, then all Temporal metrics are exposed - Additionally adds an initial interface of how temporal interceptors would look like - The added metrics include `temporal_workflow_task_execution_failed` metric having an attribute of `failure_reason` whose value can tell us about nondeterminism if the value is `nondeterminismerror` (https://github.com/temporalio/sdk-go/pull/1295) - Additionally, the metrics are prefixed with `temporal.` PeerDB metrics should not be affected as they use a separate exporter and meterprovider. The rationale behind exporting a subset of metrics is noise and cases where metrics ingestion cannot be ignored Full list of metrics can be viewed at https://docs.temporal.io/references/sdk-metrics --- flow/cmd/logged_interceptor.go | 79 +++++++++++++++++ flow/cmd/worker.go | 16 +++- flow/connectors/postgres/postgres.go | 29 +++---- flow/go.mod | 6 +- flow/go.sum | 10 +++ .../{peerdb_gauges => }/attributes.go | 2 +- flow/otel_metrics/env.go | 11 +++ flow/otel_metrics/otel_manager.go | 84 +++++++++++++++++-- flow/otel_metrics/peerdb_gauges/gauges.go | 3 +- 9 files changed, 214 insertions(+), 26 deletions(-) create mode 100644 flow/cmd/logged_interceptor.go rename flow/otel_metrics/{peerdb_gauges => }/attributes.go (88%) create mode 100644 flow/otel_metrics/env.go diff --git a/flow/cmd/logged_interceptor.go b/flow/cmd/logged_interceptor.go new file mode 100644 index 0000000000..d5ee4fc972 --- /dev/null +++ b/flow/cmd/logged_interceptor.go @@ -0,0 +1,79 @@ +package cmd + +import ( + "context" + + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/workflow" +) + +type LoggedWorkflowInboundInterceptor struct { + interceptor.WorkflowInboundInterceptorBase + Next interceptor.WorkflowInboundInterceptor +} + +func NewLoggedWorkflowInboundInterceptor(next interceptor.WorkflowInboundInterceptor) *LoggedWorkflowInboundInterceptor { + return &LoggedWorkflowInboundInterceptor{ + WorkflowInboundInterceptorBase: interceptor.WorkflowInboundInterceptorBase{Next: next}, + Next: next, + } +} + +func (w *LoggedWorkflowInboundInterceptor) ExecuteWorkflow( + ctx workflow.Context, + in *interceptor.ExecuteWorkflowInput, +) (interface{}, error) { + // Workflow starts here + result, err := w.Next.ExecuteWorkflow(ctx, in) + // Workflow ends here + return result, err +} + +type LoggedActivityInboundInterceptor struct { + interceptor.ActivityInboundInterceptorBase + Next interceptor.ActivityInboundInterceptor +} + +func NewLoggedActivityInboundInterceptor(next interceptor.ActivityInboundInterceptor) *LoggedActivityInboundInterceptor { + return &LoggedActivityInboundInterceptor{ + ActivityInboundInterceptorBase: interceptor.ActivityInboundInterceptorBase{Next: next}, + Next: next, + } +} + +func (c *LoggedActivityInboundInterceptor) ExecuteActivity( + ctx context.Context, + in *interceptor.ExecuteActivityInput, +) (interface{}, error) { + // Activity starts here + out, err := c.Next.ExecuteActivity(ctx, in) + // Activity ends here + return out, err +} + +type LoggedWorkerInterceptor struct { + interceptor.WorkerInterceptorBase +} + +func (c LoggedWorkerInterceptor) InterceptActivity( + ctx context.Context, + next interceptor.ActivityInboundInterceptor, +) interceptor.ActivityInboundInterceptor { + return NewLoggedActivityInboundInterceptor(next) +} + +func (c LoggedWorkerInterceptor) InterceptWorkflow( + ctx workflow.Context, + next interceptor.WorkflowInboundInterceptor, +) interceptor.WorkflowInboundInterceptor { + // Workflow intercepted here + intercepted := NewLoggedWorkflowInboundInterceptor(next) + // Workflow intercepting ends here + return intercepted +} + +func NewLoggedWorkerInterceptor() *LoggedWorkerInterceptor { + return &LoggedWorkerInterceptor{ + WorkerInterceptorBase: interceptor.WorkerInterceptorBase{}, + } +} diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index bf2809d10c..9db97288cc 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/pyroscope-go" "go.temporal.io/sdk/client" + temporalotel "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" @@ -88,6 +89,15 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { Namespace: opts.TemporalNamespace, Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))), } + if opts.EnableOtelMetrics { + metricsProvider, metricsErr := otel_metrics.SetupTemporalMetricsProvider("flow-worker") + if metricsErr != nil { + return nil, metricsErr + } + clientOptions.MetricsHandler = temporalotel.NewMetricsHandler(temporalotel.MetricsHandlerOptions{ + Meter: metricsProvider.Meter("temporal-sdk-go"), + }) + } if peerdbenv.PeerDBTemporalEnableCertAuth() { slog.Info("Using temporal certificate/key for authentication") @@ -136,9 +146,9 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { cleanupOtelManagerFunc := func() {} var otelManager *otel_metrics.OtelManager if opts.EnableOtelMetrics { - metricsProvider, metricErr := otel_metrics.SetupOtelMetricsExporter("flow-worker") - if metricErr != nil { - return nil, metricErr + metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker") + if metricsErr != nil { + return nil, metricsErr } otelManager = &otel_metrics.OtelManager{ MetricsProvider: metricsProvider, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ad7ca3951d..d0087d3beb 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -26,7 +26,8 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - peerdb_gauges "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" + "github.com/PeerDB-io/peer-flow/otel_metrics" + "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -1192,10 +1193,10 @@ func (c *PostgresConnector) HandleSlotInfo( slog.Float64("LagInMB", float64(slotInfo[0].LagInMb))) alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0]) slotMetricGauges.SlotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.SlotNameKey, alertKeys.SlotName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) // Also handles alerts for PeerDB user connections exceeding a given limit here res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User) @@ -1205,9 +1206,9 @@ func (c *PostgresConnector) HandleSlotInfo( } alerter.AlertIfOpenConnections(ctx, alertKeys, res) slotMetricGauges.OpenConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User) if err != nil { @@ -1216,9 +1217,9 @@ func (c *PostgresConnector) HandleSlotInfo( } slotMetricGauges.OpenReplicationConnectionsGauge.Set(replicationRes.CurrentOpenConnections, attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) var intervalSinceLastNormalize *time.Duration err = alerter.CatalogPool.QueryRow(ctx, "SELECT now()-max(end_time) FROM peerdb_stats.cdc_batches WHERE flow_name=$1", @@ -1233,9 +1234,9 @@ func (c *PostgresConnector) HandleSlotInfo( } if intervalSinceLastNormalize != nil { slotMetricGauges.IntervalSinceLastNormalizeGauge.Set(intervalSinceLastNormalize.Seconds(), attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize) } diff --git a/flow/go.mod b/flow/go.mod index e6c76516d7..e24ffa9fb0 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -56,11 +56,16 @@ require ( go.opentelemetry.io/otel v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 go.opentelemetry.io/otel/metric v1.31.0 go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 + go.opentelemetry.io/otel/trace v1.31.0 go.temporal.io/api v1.40.0 go.temporal.io/sdk v1.29.1 + go.temporal.io/sdk/contrib/opentelemetry v0.6.0 go.uber.org/automaxprocs v1.6.0 golang.org/x/crypto v0.28.0 golang.org/x/sync v0.8.0 @@ -139,7 +144,6 @@ require ( go.opentelemetry.io/contrib/detectors/gcp v1.31.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect - go.opentelemetry.io/otel/trace v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/term v0.25.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index b7992ec679..8f783af565 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -490,6 +490,12 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0/go.mod h1:B5Ki776z/MBnVha1Nzwp5arlzBbE3+1jk+pGmaP5HME= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 h1:FFeLy03iVTXP6ffeN2iXrxfGsZGCjVx0/4KlizjyBwU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0/go.mod h1:TMu73/k1CP8nBUpDLc71Wj/Kf7ZS9FK5b53VapRsP9o= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= @@ -504,10 +510,14 @@ go.temporal.io/api v1.40.0 h1:rH3HvUUCFr0oecQTBW5tI6DdDQsX2Xb6OFVgt/bvLto= go.temporal.io/api v1.40.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0= go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ= +go.temporal.io/sdk/contrib/opentelemetry v0.6.0 h1:rNBArDj5iTUkcMwKocUShoAW59o6HdS7Nq4CTp4ldj8= +go.temporal.io/sdk/contrib/opentelemetry v0.6.0/go.mod h1:Lem8VrE2ks8P+FYcRM3UphPoBr+tfM3v/Kaf0qStzSg= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/flow/otel_metrics/peerdb_gauges/attributes.go b/flow/otel_metrics/attributes.go similarity index 88% rename from flow/otel_metrics/peerdb_gauges/attributes.go rename to flow/otel_metrics/attributes.go index 78b54b6119..bd17cfeeb2 100644 --- a/flow/otel_metrics/peerdb_gauges/attributes.go +++ b/flow/otel_metrics/attributes.go @@ -1,4 +1,4 @@ -package peerdb_gauges +package otel_metrics const ( PeerNameKey string = "peerName" diff --git a/flow/otel_metrics/env.go b/flow/otel_metrics/env.go new file mode 100644 index 0000000000..f388bf664d --- /dev/null +++ b/flow/otel_metrics/env.go @@ -0,0 +1,11 @@ +package otel_metrics + +import "github.com/PeerDB-io/peer-flow/peerdbenv" + +func GetPeerDBOtelMetricsNamespace() string { + return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") +} + +func GetPeerDBOtelMetricsExportListEnv() string { + return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_EXPORT_LIST", "") +} diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index 112124d203..f69a4aa30f 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -3,7 +3,10 @@ package otel_metrics import ( "context" "fmt" + "log/slog" + "strings" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" @@ -22,12 +25,16 @@ type OtelManager struct { } // newOtelResource returns a resource describing this application. -func newOtelResource(otelServiceName string) (*resource.Resource, error) { +func newOtelResource(otelServiceName string, attrs ...attribute.KeyValue) (*resource.Resource, error) { + allAttrs := []attribute.KeyValue{ + semconv.ServiceNameKey.String(otelServiceName), + } + allAttrs = append(allAttrs, attrs...) r, err := resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceNameKey.String(otelServiceName), + allAttrs..., ), ) @@ -42,7 +49,53 @@ func setupGrpcOtelMetricsExporter() (sdkmetric.Exporter, error) { return otlpmetricgrpc.New(context.Background()) } -func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, error) { +func temporalMetricsFilteringView() sdkmetric.View { + exportListString := GetPeerDBOtelMetricsExportListEnv() + slog.Info("Found export list for temporal metrics", slog.String("exportList", exportListString)) + // Special case for exporting all metrics + if exportListString == "__ALL__" { + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + stream := sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + } + return stream, true + } + } + exportList := strings.Split(exportListString, ",") + // Don't export any metrics if the list is empty + if len(exportList) == 0 { + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + return sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + Aggregation: sdkmetric.AggregationDrop{}, + }, true + } + } + + // Export only the metrics in the list + enabledMetrics := make(map[string]struct{}, len(exportList)) + for _, metricName := range exportList { + trimmedMetricName := strings.TrimSpace(metricName) + enabledMetrics[trimmedMetricName] = struct{}{} + } + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + stream := sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + } + if _, ok := enabledMetrics[instrument.Name]; !ok { + stream.Aggregation = sdkmetric.AggregationDrop{} + } + return stream, true + } +} + +func setupExporter() (sdkmetric.Exporter, error) { otlpMetricProtocol := peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_PROTOCOL", peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", "http/protobuf")) var metricExporter sdkmetric.Exporter @@ -58,14 +111,35 @@ func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err) } - otelResource, err := newOtelResource(otelServiceName) + return metricExporter, err +} + +func setupMetricsProvider(otelResource *resource.Resource, views ...sdkmetric.View) (*sdkmetric.MeterProvider, error) { + metricExporter, err := setupExporter() if err != nil { - return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + return nil, err } meterProvider := sdkmetric.NewMeterProvider( sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter)), sdkmetric.WithResource(otelResource), + sdkmetric.WithView(views...), ) return meterProvider, nil } + +func SetupPeerDBMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) { + otelResource, err := newOtelResource(otelServiceName) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + return setupMetricsProvider(otelResource) +} + +func SetupTemporalMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) { + otelResource, err := newOtelResource(otelServiceName, attribute.String(DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + return setupMetricsProvider(otelResource, temporalMetricsFilteringView()) +} diff --git a/flow/otel_metrics/peerdb_gauges/gauges.go b/flow/otel_metrics/peerdb_gauges/gauges.go index 6f8f4f0c54..767aac0945 100644 --- a/flow/otel_metrics/peerdb_gauges/gauges.go +++ b/flow/otel_metrics/peerdb_gauges/gauges.go @@ -2,7 +2,6 @@ package peerdb_gauges import ( "github.com/PeerDB-io/peer-flow/otel_metrics" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) const ( @@ -20,5 +19,5 @@ type SlotMetricGauges struct { } func BuildGaugeName(baseGaugeName string) string { - return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") + baseGaugeName + return otel_metrics.GetPeerDBOtelMetricsNamespace() + baseGaugeName }