From 1ae80b2607fe68f5ed1741abb810c4ce3dfaa187 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sun, 5 May 2024 22:43:24 +0530 Subject: [PATCH] split into otel stuff into seperate module --- flow/activities/flowable.go | 21 +++----- flow/cmd/worker.go | 51 +++--------------- flow/connectors/core.go | 6 +-- flow/connectors/postgres/postgres.go | 5 +- flow/otel_metrics/otel_manager.go | 54 +++++++++++++++++++ .../sync_gauges.go} | 2 +- 6 files changed, 75 insertions(+), 64 deletions(-) create mode 100644 flow/otel_metrics/otel_manager.go rename flow/{shared/otel_metrics.go => otel_metrics/sync_gauges.go} (99%) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 537a936c11..9c03346734 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -12,7 +12,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "go.opentelemetry.io/otel/metric" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -28,6 +27,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -42,18 +42,11 @@ type CdcCacheEntry struct { done chan struct{} } -type OtelManager struct { - MetricsProvider *sdkmetric.MeterProvider - Meter metric.Meter - Float64GaugesCache map[string]*shared.Float64Gauge - Int64GaugesCache map[string]*shared.Int64Gauge -} - type FlowableActivity struct { CatalogPool *pgxpool.Pool Alerter *alerting.Alerter CdcCache map[string]CdcCacheEntry - OtelManager *OtelManager + OtelManager *otel_metrics.OtelManager CdcCacheRw sync.RWMutex } @@ -603,12 +596,12 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { return } - var slotLagGauge *shared.Float64Gauge - var openConnectionsGauge *shared.Int64Gauge + var slotLagGauge *otel_metrics.Float64Gauge + var openConnectionsGauge *otel_metrics.Int64Gauge if a.OtelManager != nil { - slotLagGauge, err = shared.GetOrInitFloat64Gauge(a.OtelManager.Meter, + slotLagGauge, err = otel_metrics.GetOrInitFloat64Gauge(a.OtelManager.Meter, a.OtelManager.Float64GaugesCache, - "cdc_slotlag", + "cdc_slot_lag", metric.WithUnit("MB"), metric.WithDescription("Postgres replication slot lag in MB")) if err != nil { @@ -616,7 +609,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { return } - openConnectionsGauge, err = shared.GetOrInitInt64Gauge(a.OtelManager.Meter, + openConnectionsGauge, err = otel_metrics.GetOrInitInt64Gauge(a.OtelManager.Meter, a.OtelManager.Int64GaugesCache, "open_connections", metric.WithDescription("Current open connections for PeerDB user")) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index a5b3c8a754..7702ba5ebd 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -8,19 +8,16 @@ import ( "log/slog" "os" "runtime" - "time" "github.com/grafana/pyroscope-go" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" "github.com/PeerDB-io/peer-flow/alerting" "github.com/PeerDB-io/peer-flow/logger" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -85,40 +82,6 @@ func setupPyroscope(opts *WorkerSetupOptions) { } } -// newResource returns a resource describing this application. -func newOtelResource(otelServiceName string) (*resource.Resource, error) { - r, err := resource.Merge( - resource.Default(), - resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceNameKey.String(otelServiceName), - ), - ) - - return r, err -} - -func setupOtelMetricsExporter() (*sdkmetric.MeterProvider, error) { - metricExporter, err := otlpmetrichttp.New(context.Background(), - otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), - ) - if err != nil { - return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err) - } - - resource, err := newOtelResource("flow-worker") - if err != nil { - return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) - } - - meterProvider := sdkmetric.NewMeterProvider( - sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, - sdkmetric.WithInterval(1*time.Minute))), - sdkmetric.WithResource(resource), - ) - return meterProvider, nil -} - func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { if opts.EnableProfiling { setupPyroscope(opts) @@ -175,17 +138,17 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { peerflow.RegisterFlowWorkerWorkflows(w) var metricsProvider *sdkmetric.MeterProvider - var otelManager *activities.OtelManager + var otelManager *otel_metrics.OtelManager if opts.EnableOtelMetrics { - metricsProvider, err = setupOtelMetricsExporter() + metricsProvider, err = otel_metrics.SetupOtelMetricsExporter() if err != nil { return nil, err } - otelManager = &activities.OtelManager{ + otelManager = &otel_metrics.OtelManager{ MetricsProvider: metricsProvider, - Meter: metricsProvider.Meter("flow-worker"), - Float64GaugesCache: make(map[string]*shared.Float64Gauge), - Int64GaugesCache: make(map[string]*shared.Int64Gauge), + Meter: metricsProvider.Meter("io.peerdb.flow-worker"), + Float64GaugesCache: make(map[string]*otel_metrics.Float64Gauge), + Int64GaugesCache: make(map[string]*otel_metrics.Int64Gauge), } } w.RegisterActivity(&activities.FlowableActivity{ diff --git a/flow/connectors/core.go b/flow/connectors/core.go index e70571ee28..b6f75c4828 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -22,7 +22,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/shared" + "github.com/PeerDB-io/peer-flow/otel_metrics" ) var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality") @@ -75,8 +75,8 @@ type CDCPullConnectorCore interface { // HandleSlotInfo update monitoring info on slot size etc HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, - catalogPool *pgxpool.Pool, slotName string, peerName string, slotLagGauge *shared.Float64Gauge, - openConnectionsGauge *shared.Int64Gauge) error + catalogPool *pgxpool.Pool, slotName string, peerName string, + slotLagGauge *otel_metrics.Float64Gauge, openConnectionsGauge *otel_metrics.Int64Gauge) error // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. GetSlotInfo(ctx context.Context, slotName string) ([]*protos.SlotInfo, error) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index f544c4c855..03ee0c02b1 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -27,6 +27,7 @@ import ( "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -1111,8 +1112,8 @@ func (c *PostgresConnector) HandleSlotInfo( catalogPool *pgxpool.Pool, slotName string, peerName string, - slotLagGauge *shared.Float64Gauge, - openConnectionsGauge *shared.Int64Gauge, + slotLagGauge *otel_metrics.Float64Gauge, + openConnectionsGauge *otel_metrics.Int64Gauge, ) error { logger := logger.LoggerFromCtx(ctx) diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go new file mode 100644 index 0000000000..85190f75b9 --- /dev/null +++ b/flow/otel_metrics/otel_manager.go @@ -0,0 +1,54 @@ +package otel_metrics + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +type OtelManager struct { + MetricsProvider *sdkmetric.MeterProvider + Meter metric.Meter + Float64GaugesCache map[string]*Float64Gauge + Int64GaugesCache map[string]*Int64Gauge +} + +// newOtelResource returns a resource describing this application. +func newOtelResource(otelServiceName string) (*resource.Resource, error) { + r, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(otelServiceName), + ), + ) + + return r, err +} + +func SetupOtelMetricsExporter() (*sdkmetric.MeterProvider, error) { + metricExporter, err := otlpmetrichttp.New(context.Background(), + otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression), + ) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err) + } + + resource, err := newOtelResource("flow-worker") + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter, + sdkmetric.WithInterval(3*time.Second))), + sdkmetric.WithResource(resource), + ) + return meterProvider, nil +} diff --git a/flow/shared/otel_metrics.go b/flow/otel_metrics/sync_gauges.go similarity index 99% rename from flow/shared/otel_metrics.go rename to flow/otel_metrics/sync_gauges.go index 3acc6e37cd..14a7058301 100644 --- a/flow/shared/otel_metrics.go +++ b/flow/otel_metrics/sync_gauges.go @@ -1,4 +1,4 @@ -package shared +package otel_metrics import ( "context"