From ea58d02d1301e81a081692a2ee7ea8163d9bf581 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Mon, 6 May 2024 20:39:21 +0530 Subject: [PATCH] OpenTelemetry: initial metrics and export via otlpmetrichttp (#1664) `flow-worker` now exports `slotlag` and `open_connections` metrics (both are gauges) for each Postgres peer. Metrics enabled by the environment variable `ENABLE_OTEL_METRICS` and export endpoint set by the environment variable `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT` on `flow-worker`. Metrics update at the frequency of the `RecordSlotSizesWorkflow`, which is currently 5 minutes. --- flow/activities/flowable.go | 30 ++++++- flow/cmd/api.go | 2 +- flow/cmd/cert.go | 2 +- flow/cmd/snapshot_worker.go | 2 +- flow/cmd/worker.go | 52 ++++++++++-- flow/connectors/core.go | 5 +- flow/connectors/postgres/postgres.go | 12 +++ flow/go.mod | 9 ++- flow/go.sum | 12 ++- flow/main.go | 14 +++- flow/otel_metrics/otel_manager.go | 54 +++++++++++++ flow/otel_metrics/sync_gauges.go | 114 +++++++++++++++++++++++++++ 12 files changed, 288 insertions(+), 20 deletions(-) create mode 100644 flow/otel_metrics/otel_manager.go create mode 100644 flow/otel_metrics/sync_gauges.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 7c3a41ac58..9c03346734 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "go.opentelemetry.io/otel/metric" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -26,6 +27,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -44,6 +46,7 @@ type FlowableActivity struct { CatalogPool *pgxpool.Pool Alerter *alerting.Alerter CdcCache map[string]CdcCacheEntry + OtelManager *otel_metrics.OtelManager CdcCacheRw sync.RWMutex } @@ -592,7 +595,32 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { if ctx.Err() != nil { return } - err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName) + + var slotLagGauge *otel_metrics.Float64Gauge + var openConnectionsGauge *otel_metrics.Int64Gauge + if a.OtelManager != nil { + slotLagGauge, err = otel_metrics.GetOrInitFloat64Gauge(a.OtelManager.Meter, + a.OtelManager.Float64GaugesCache, + "cdc_slot_lag", + metric.WithUnit("MB"), + metric.WithDescription("Postgres replication slot lag in MB")) + if err != nil { + logger.Error("Failed to get slot lag gauge", slog.Any("error", err)) + return + } + + openConnectionsGauge, err = otel_metrics.GetOrInitInt64Gauge(a.OtelManager.Meter, + a.OtelManager.Int64GaugesCache, + "open_connections", + metric.WithDescription("Current open connections for PeerDB user")) + if err != nil { + logger.Error("Failed to get open connections gauge", slog.Any("error", err)) + return + } + } + + err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName, + slotLagGauge, openConnectionsGauge) if err != nil { logger.Error("Failed to handle slot info", slog.Any("error", err)) } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 8b8be80c6e..5b010916db 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -96,7 +96,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error { if args.TemporalCert != "" && args.TemporalKey != "" { slog.Info("Using temporal certificate/key for authentication") - certs, err := Base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey) + certs, err := base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey) if err != nil { return fmt.Errorf("unable to base64 decode certificate and key: %w", err) } diff --git a/flow/cmd/cert.go b/flow/cmd/cert.go index 9031d55b6c..9537b0f1e7 100644 --- a/flow/cmd/cert.go +++ b/flow/cmd/cert.go @@ -7,7 +7,7 @@ import ( "strings" ) -func Base64DecodeCertAndKey(cert string, key string) ([]tls.Certificate, error) { +func base64DecodeCertAndKey(cert string, key string) ([]tls.Certificate, error) { temporalCert := strings.TrimSpace(cert) certBytes, err := base64.StdEncoding.DecodeString(temporalCert) if err != nil { diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 128759a474..ee49dbc039 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -33,7 +33,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work } if opts.TemporalCert != "" && opts.TemporalKey != "" { - certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey) + certs, err := base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey) if err != nil { return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err) } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 654cfad163..ef4f0bbca2 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -10,18 +10,20 @@ import ( "runtime" "github.com/grafana/pyroscope-go" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" "github.com/PeerDB-io/peer-flow/alerting" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) -type WorkerOptions struct { +type WorkerSetupOptions struct { TemporalHostPort string PyroscopeServer string TemporalNamespace string @@ -30,9 +32,16 @@ type WorkerOptions struct { TemporalMaxConcurrentActivities int TemporalMaxConcurrentWorkflowTasks int EnableProfiling bool + EnableOtelMetrics bool } -func setupPyroscope(opts *WorkerOptions) { +type workerSetupResponse struct { + Client client.Client + Worker worker.Worker + Cleanup func() +} + +func setupPyroscope(opts *WorkerSetupOptions) { if opts.PyroscopeServer == "" { log.Fatal("pyroscope server address is not set but profiling is enabled") } @@ -73,7 +82,7 @@ func setupPyroscope(opts *WorkerOptions) { } } -func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) { +func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { if opts.EnableProfiling { setupPyroscope(opts) } @@ -86,9 +95,9 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) { if opts.TemporalCert != "" && opts.TemporalKey != "" { slog.Info("Using temporal certificate/key for authentication") - certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey) + certs, err := base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey) if err != nil { - return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err) + return nil, fmt.Errorf("unable to process certificate and key: %w", err) } connOptions := client.ConnectionOptions{ TLS: &tls.Config{ @@ -101,12 +110,12 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) { conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(context.Background()) if err != nil { - return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err) + return nil, fmt.Errorf("unable to create catalog connection pool: %w", err) } c, err := client.Dial(clientOptions) if err != nil { - return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err) + return nil, fmt.Errorf("unable to create Temporal client: %w", err) } slog.Info("Created temporal client") @@ -128,11 +137,38 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) { }) peerflow.RegisterFlowWorkerWorkflows(w) + var metricsProvider *sdkmetric.MeterProvider + var otelManager *otel_metrics.OtelManager + if opts.EnableOtelMetrics { + metricsProvider, err = otel_metrics.SetupOtelMetricsExporter("flow-worker") + if err != nil { + return nil, err + } + otelManager = &otel_metrics.OtelManager{ + MetricsProvider: metricsProvider, + Meter: metricsProvider.Meter("io.peerdb.flow-worker"), + Float64GaugesCache: make(map[string]*otel_metrics.Float64Gauge), + Int64GaugesCache: make(map[string]*otel_metrics.Int64Gauge), + } + } w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, Alerter: alerting.NewAlerter(context.Background(), conn), CdcCache: make(map[string]activities.CdcCacheEntry), + OtelManager: otelManager, }) - return c, w, nil + return &workerSetupResponse{ + Client: c, + Worker: w, + Cleanup: func() { + if otelManager != nil { + err := otelManager.MetricsProvider.Shutdown(context.Background()) + if err != nil { + slog.Error("Failed to shutdown metrics provider", slog.Any("error", err)) + } + } + c.Close() + }, + }, nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 9a9a42f1a3..b6f75c4828 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -22,6 +22,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/otel_metrics" ) var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality") @@ -73,7 +74,9 @@ type CDCPullConnectorCore interface { PullFlowCleanup(ctx context.Context, jobName string) error // HandleSlotInfo update monitoring info on slot size etc - HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error + HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, + catalogPool *pgxpool.Pool, slotName string, peerName string, + slotLagGauge *otel_metrics.Float64Gauge, openConnectionsGauge *otel_metrics.Int64Gauge) error // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. GetSlotInfo(ctx context.Context, slotName string) ([]*protos.SlotInfo, error) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1f5a7545f4..03ee0c02b1 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -15,6 +15,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "go.opentelemetry.io/otel/attribute" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -26,6 +27,8 @@ import ( "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/otel_metrics" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -1109,6 +1112,8 @@ func (c *PostgresConnector) HandleSlotInfo( catalogPool *pgxpool.Pool, slotName string, peerName string, + slotLagGauge *otel_metrics.Float64Gauge, + openConnectionsGauge *otel_metrics.Int64Gauge, ) error { logger := logger.LoggerFromCtx(ctx) @@ -1125,6 +1130,10 @@ func (c *PostgresConnector) HandleSlotInfo( logger.Info(fmt.Sprintf("Checking %s lag for %s", slotName, peerName), slog.Float64("LagInMB", float64(slotInfo[0].LagInMb))) alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0]) + slotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet( + attribute.String("peerName", peerName), + attribute.String("slotName", slotName), + attribute.String("deploymentUID", peerdbenv.PeerDBDeploymentUID()))) // Also handles alerts for PeerDB user connections exceeding a given limit here res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User) @@ -1133,6 +1142,9 @@ func (c *PostgresConnector) HandleSlotInfo( return err } alerter.AlertIfOpenConnections(ctx, peerName, res) + openConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet( + attribute.String("peerName", peerName), + attribute.String("deploymentUID", peerdbenv.PeerDBDeploymentUID()))) return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0]) } diff --git a/flow/go.mod b/flow/go.mod index 71ab6c6557..af7033eae9 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -50,6 +50,11 @@ require ( github.com/urfave/cli/v3 v3.0.0-alpha9 github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 github.com/yuin/gopher-lua v1.1.1 + go.opentelemetry.io/otel v1.26.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0 + go.opentelemetry.io/otel/metric v1.26.0 + go.opentelemetry.io/otel/sdk v1.26.0 + go.opentelemetry.io/otel/sdk/metric v1.26.0 go.temporal.io/api v1.32.0 go.temporal.io/sdk v1.26.1 go.uber.org/automaxprocs v1.5.3 @@ -77,6 +82,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cockroachdb/errors v1.11.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect @@ -110,9 +116,8 @@ require ( github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect - go.opentelemetry.io/otel v1.26.0 // indirect - go.opentelemetry.io/otel/metric v1.26.0 // indirect go.opentelemetry.io/otel/trace v1.26.0 // indirect + go.opentelemetry.io/proto/otlp v1.2.0 // indirect golang.org/x/term v0.19.0 // indirect ) diff --git a/flow/go.sum b/flow/go.sum index 5857b2424b..150c58bafe 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -127,6 +127,8 @@ github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -427,12 +429,18 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc= go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0 h1:HGZWGmCVRCVyAs2GQaiHQPbDHo+ObFWeUEOd+zDnp64= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0/go.mod h1:SaH+v38LSCHddyk7RGlU9uZyQoRrKao6IBnJw6Kbn+c= go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30= go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8= +go.opentelemetry.io/otel/sdk v1.26.0/go.mod h1:0p8MXpqLeJ0pzcszQQN4F0S5FVjBLgypeGSngLsmirs= +go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZHcAyHw5aU9Y= +go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE= go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA= go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0= +go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94= +go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A= go.temporal.io/api v1.32.0 h1:Jv0FieWDq0HJVqoHRE/kRHM+tIaRtR16RbXZZl+8Qb4= go.temporal.io/api v1.32.0/go.mod h1:MClRjMCgXZTKmxyItEJPRR5NuJRBhSEpuF9wuh97N6U= go.temporal.io/sdk v1.26.1 h1:ggmFBythnuuW3yQRp0VzOTrmbOf+Ddbe00TZl+CQ+6U= diff --git a/flow/main.go b/flow/main.go index 06843b9254..3b1696161b 100644 --- a/flow/main.go +++ b/flow/main.go @@ -47,6 +47,12 @@ func main() { Usage: "Enable profiling for the application", Sources: cli.EnvVars("ENABLE_PROFILING"), } + otelMetricsFlag := &cli.BoolFlag{ + Name: "enable-otel-metrics", + Value: false, // Default is off + Usage: "Enable OpenTelemetry metrics for the application", + Sources: cli.EnvVars("ENABLE_OTEL_METRICS"), + } pyroscopeServerFlag := &cli.StringFlag{ Name: "pyroscope-server-address", @@ -83,9 +89,10 @@ func main() { Name: "worker", Action: func(ctx context.Context, clicmd *cli.Command) error { temporalHostPort := clicmd.String("temporal-host-port") - c, w, err := cmd.WorkerMain(&cmd.WorkerOptions{ + res, err := cmd.WorkerSetup(&cmd.WorkerSetupOptions{ TemporalHostPort: temporalHostPort, EnableProfiling: clicmd.Bool("enable-profiling"), + EnableOtelMetrics: clicmd.Bool("enable-otel-metrics"), PyroscopeServer: clicmd.String("pyroscope-server-address"), TemporalNamespace: clicmd.String("temporal-namespace"), TemporalCert: clicmd.String("temporal-cert"), @@ -96,12 +103,13 @@ func main() { if err != nil { return err } - defer c.Close() - return w.Run(worker.InterruptCh()) + defer res.Cleanup() + return res.Worker.Run(worker.InterruptCh()) }, Flags: []cli.Flag{ temporalHostPortFlag, profilingFlag, + otelMetricsFlag, pyroscopeServerFlag, temporalNamespaceFlag, &temporalCertFlag, diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go new file mode 100644 index 0000000000..ceb7a511db --- /dev/null +++ b/flow/otel_metrics/otel_manager.go @@ -0,0 +1,54 @@ +package otel_metrics + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +type OtelManager struct { + MetricsProvider *sdkmetric.MeterProvider + Meter metric.Meter + Float64GaugesCache map[string]*Float64Gauge + Int64GaugesCache map[string]*Int64Gauge +} + +// newOtelResource returns a resource describing this application. +func newOtelResource(otelServiceName string) (*resource.Resource, error) { + r, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(otelServiceName), + ), + ) + + return r, err +} + +func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, error) { + metricExporter, err := otlpmetrichttp.New(context.Background(), + otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), + ) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err) + } + + resource, err := newOtelResource(otelServiceName) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, + sdkmetric.WithInterval(3*time.Second))), + sdkmetric.WithResource(resource), + ) + return meterProvider, nil +} diff --git a/flow/otel_metrics/sync_gauges.go b/flow/otel_metrics/sync_gauges.go new file mode 100644 index 0000000000..14a7058301 --- /dev/null +++ b/flow/otel_metrics/sync_gauges.go @@ -0,0 +1,114 @@ +package otel_metrics + +import ( + "context" + "fmt" + "math" + "sync/atomic" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// synchronous gauges are what we want, so we can control when the value is updated +// but they are "experimental", so we resort to using the asynchronous gauges +// but the callback is just a wrapper around the current value, so we can control by calling Set() +type Int64Gauge struct { + observableGauge metric.Int64ObservableGauge + observations map[attribute.Set]*atomic.Int64 +} + +func NewInt64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Int64ObservableGaugeOption) (*Int64Gauge, error) { + syncGauge := &Int64Gauge{} + observableGauge, err := meter.Int64ObservableGauge(gaugeName, append(opts, metric.WithInt64Callback(syncGauge.callback))...) + if err != nil { + return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err) + } + syncGauge.observableGauge = observableGauge + syncGauge.observations = make(map[attribute.Set]*atomic.Int64) + return syncGauge, nil +} + +func (g *Int64Gauge) callback(ctx context.Context, o metric.Int64Observer) error { + for attrs, val := range g.observations { + o.Observe(val.Load(), metric.WithAttributeSet(attrs)) + } + return nil +} + +func (g *Int64Gauge) Set(input int64, attrs attribute.Set) { + if g == nil { + return + } + val, ok := g.observations[attrs] + if !ok { + val = &atomic.Int64{} + g.observations[attrs] = val + } + val.Store(input) +} + +type Float64Gauge struct { + observableGauge metric.Float64ObservableGauge + observationsAsUint64 map[attribute.Set]*atomic.Uint64 +} + +func NewFloat64SyncGauge(meter metric.Meter, gaugeName string, opts ...metric.Float64ObservableGaugeOption) (*Float64Gauge, error) { + syncGauge := &Float64Gauge{} + observableGauge, err := meter.Float64ObservableGauge(gaugeName, append(opts, metric.WithFloat64Callback(syncGauge.callback))...) + if err != nil { + return nil, fmt.Errorf("failed to create Int64SyncGauge: %w", err) + } + syncGauge.observableGauge = observableGauge + syncGauge.observationsAsUint64 = make(map[attribute.Set]*atomic.Uint64) + return syncGauge, nil +} + +func (g *Float64Gauge) callback(ctx context.Context, o metric.Float64Observer) error { + for attrs, val := range g.observationsAsUint64 { + o.Observe(math.Float64frombits(val.Load()), metric.WithAttributeSet(attrs)) + } + return nil +} + +func (g *Float64Gauge) Set(input float64, attrs attribute.Set) { + if g == nil { + return + } + val, ok := g.observationsAsUint64[attrs] + if !ok { + val = &atomic.Uint64{} + g.observationsAsUint64[attrs] = val + } + val.Store(math.Float64bits(input)) +} + +func GetOrInitInt64Gauge(meter metric.Meter, cache map[string]*Int64Gauge, + name string, opts ...metric.Int64ObservableGaugeOption, +) (*Int64Gauge, error) { + gauge, ok := cache[name] + if !ok { + var err error + gauge, err = NewInt64SyncGauge(meter, name, opts...) + if err != nil { + return nil, err + } + cache[name] = gauge + } + return gauge, nil +} + +func GetOrInitFloat64Gauge(meter metric.Meter, cache map[string]*Float64Gauge, + name string, opts ...metric.Float64ObservableGaugeOption, +) (*Float64Gauge, error) { + gauge, ok := cache[name] + if !ok { + var err error + gauge, err = NewFloat64SyncGauge(meter, name, opts...) + if err != nil { + return nil, err + } + cache[name] = gauge + } + return gauge, nil +}