diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 8001b5344c..ed9d2b720d 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -28,7 +28,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/otel_metrics" - "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/pua" "github.com/PeerDB-io/peer-flow/shared" @@ -759,11 +758,10 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { return } - slotMetricGauges := peerdb_gauges.SlotMetricGauges{} + slotMetricGauges := otel_metrics.SlotMetricGauges{} if a.OtelManager != nil { - slotLagGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter, - a.OtelManager.Float64GaugesCache, - peerdb_gauges.BuildGaugeName(peerdb_gauges.SlotLagGaugeName), + slotLagGauge, err := a.OtelManager.GetOrInitFloat64Gauge( + otel_metrics.BuildMetricName(otel_metrics.SlotLagGaugeName), metric.WithUnit("MiBy"), metric.WithDescription("Postgres replication slot lag in MB")) if err != nil { @@ -772,9 +770,8 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { } slotMetricGauges.SlotLagGauge = slotLagGauge - openConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter, - a.OtelManager.Int64GaugesCache, - peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenConnectionsGaugeName), + openConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge( + otel_metrics.BuildMetricName(otel_metrics.OpenConnectionsGaugeName), metric.WithDescription("Current open connections for PeerDB user")) if err != nil { logger.Error("Failed to get open connections gauge", slog.Any("error", err)) @@ -782,9 +779,8 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { } slotMetricGauges.OpenConnectionsGauge = openConnectionsGauge - openReplicationConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter, - a.OtelManager.Int64GaugesCache, - peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenReplicationConnectionsGaugeName), + openReplicationConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge( + otel_metrics.BuildMetricName(otel_metrics.OpenReplicationConnectionsGaugeName), metric.WithDescription("Current open replication connections for PeerDB user")) if err != nil { logger.Error("Failed to get open replication connections gauge", slog.Any("error", err)) @@ -792,9 +788,8 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { } slotMetricGauges.OpenReplicationConnectionsGauge = openReplicationConnectionsGauge - intervalSinceLastNormalizeGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter, - a.OtelManager.Float64GaugesCache, - peerdb_gauges.BuildGaugeName(peerdb_gauges.IntervalSinceLastNormalizeGaugeName), + intervalSinceLastNormalizeGauge, err := a.OtelManager.GetOrInitFloat64Gauge( + otel_metrics.BuildMetricName(otel_metrics.IntervalSinceLastNormalizeGaugeName), metric.WithUnit("s"), metric.WithDescription("Interval since last normalize")) if err != nil { diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index d583044b0c..2d1f7e1f3e 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -23,6 +23,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -113,7 +114,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon options *protos.SyncFlowOptions, sessionID string, adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error), - pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[Items]) error, + pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error, sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error), ) (*model.SyncCompositeResponse, error) { flowName := config.FlowJobName @@ -181,7 +182,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon startTime := time.Now() errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[Items]{ + return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{ FlowJobName: flowName, SrcTableIDNameMapping: options.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index cca0202ec7..51415a1dc1 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -35,9 +35,18 @@ type WorkerSetupOptions struct { } type workerSetupResponse struct { - Client client.Client - Worker worker.Worker - Cleanup func() + Client client.Client + Worker worker.Worker + OtelManager *otel_metrics.OtelManager +} + +func (w *workerSetupResponse) Close() { + w.Client.Close() + if w.OtelManager != nil { + if err := w.OtelManager.Close(context.Background()); err != nil { + slog.Error("Failed to shutdown metrics provider", slog.Any("error", err)) + } + } } func setupPyroscope(opts *WorkerSetupOptions) { @@ -148,7 +157,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { }) peerflow.RegisterFlowWorkerWorkflows(w) - cleanupOtelManagerFunc := func() {} var otelManager *otel_metrics.OtelManager if opts.EnableOtelMetrics { metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker") @@ -161,12 +169,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { Float64GaugesCache: make(map[string]metric.Float64Gauge), Int64GaugesCache: make(map[string]metric.Int64Gauge), } - cleanupOtelManagerFunc = func() { - shutDownErr := otelManager.MetricsProvider.Shutdown(context.Background()) - if shutDownErr != nil { - slog.Error("Failed to shutdown metrics provider", slog.Any("error", shutDownErr)) - } - } } w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, @@ -182,11 +184,8 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { }) return &workerSetupResponse{ - Client: c, - Worker: w, - Cleanup: func() { - cleanupOtelManagerFunc() - c.Close() - }, + Client: c, + Worker: w, + OtelManager: otelManager, }, nil } diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 8a6bbbc0e2..073d9d82b4 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -23,7 +23,7 @@ import ( connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -85,7 +85,7 @@ type CDCPullConnectorCore interface { alerter *alerting.Alerter, catalogPool *pgxpool.Pool, alertKeys *alerting.AlertKeys, - slotMetricGauges peerdb_gauges.SlotMetricGauges, + slotMetricGauges otel_metrics.SlotMetricGauges, ) error // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. @@ -102,7 +102,12 @@ type CDCPullConnector interface { CDCPullConnectorCore // This method should be idempotent, and should be able to be called multiple times with the same request. - PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.RecordItems]) error + PullRecords( + ctx context.Context, + catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, + req *model.PullRecordsRequest[model.RecordItems], + ) error } type CDCPullPgConnector interface { @@ -110,7 +115,12 @@ type CDCPullPgConnector interface { // This method should be idempotent, and should be able to be called multiple times with the same request. // It's signature, aside from type parameter, should match CDCPullConnector.PullRecords. - PullPg(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.PgItems]) error + PullPg( + ctx context.Context, + catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, + req *model.PullRecordsRequest[model.PgItems], + ) error } type NormalizedTablesConnector interface { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 9f4bd0b966..6ff2f335eb 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -14,6 +14,8 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.temporal.io/sdk/activity" connmetadata "github.com/PeerDB-io/peer-flow/connectors/external_metadata" @@ -22,6 +24,8 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/otel_metrics" + "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -41,12 +45,14 @@ type PostgresCDCSource struct { // for storing schema delta audit logs to catalog catalogPool *pgxpool.Pool + otelManager *otel_metrics.OtelManager hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{} flowJobName string } type PostgresCDCConfig struct { CatalogPool *pgxpool.Pool + OtelManager *otel_metrics.OtelManager SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude TableNameSchemaMapping map[string]*protos.TableSchema @@ -71,6 +77,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) * commitLock: nil, childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, catalogPool: cdcConfig.CatalogPool, + otelManager: cdcConfig.OtelManager, flowJobName: cdcConfig.FlowJobName, hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}), } @@ -331,8 +338,7 @@ func PullCdcRecords[Items model.Items]( records.SignalAsEmpty() } logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len())) - err := cdcRecordsStorage.Close() - if err != nil { + if err := cdcRecordsStorage.Close(); err != nil { logger.Warn("failed to clean up records storage", slog.Any("error", err)) } }() @@ -361,6 +367,16 @@ func PullCdcRecords[Items model.Items]( return nil } + var fetchedBytesCounter metric.Int64Counter + if p.otelManager != nil { + var err error + fetchedBytesCounter, err = p.otelManager.GetOrInitInt64Counter(otel_metrics.BuildMetricName(otel_metrics.FetchedBytesCounterName), + metric.WithUnit("By"), metric.WithDescription("Bytes received of CopyData over replication slot")) + if err != nil { + return fmt.Errorf("could not get FetchedBytesCounter: %w", err) + } + } + pkmRequiresResponse := false waitingForCommit := false @@ -439,8 +455,7 @@ func PullCdcRecords[Items model.Items]( }() cancel() - ctxErr := ctx.Err() - if ctxErr != nil { + if ctxErr := ctx.Err(); ctxErr != nil { return fmt.Errorf("consumeStream preempted: %w", ctxErr) } @@ -463,6 +478,13 @@ func PullCdcRecords[Items model.Items]( continue } + if fetchedBytesCounter != nil { + fetchedBytesCounter.Add(ctx, int64(len(msg.Data)), metric.WithAttributeSet(attribute.NewSet( + attribute.String(otel_metrics.FlowNameKey, req.FlowJobName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()), + ))) + } + switch msg.Data[0] { case pglogrepl.PrimaryKeepaliveMessageByteID: pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:]) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e685b5c128..4845946aa7 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -28,7 +28,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/otel_metrics" - "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -330,17 +329,19 @@ func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, l func (c *PostgresConnector) PullRecords( ctx context.Context, catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, req *model.PullRecordsRequest[model.RecordItems], ) error { - return pullCore(ctx, c, catalogPool, req, qProcessor{}) + return pullCore(ctx, c, catalogPool, otelManager, req, qProcessor{}) } func (c *PostgresConnector) PullPg( ctx context.Context, catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, req *model.PullRecordsRequest[model.PgItems], ) error { - return pullCore(ctx, c, catalogPool, req, pgProcessor{}) + return pullCore(ctx, c, catalogPool, otelManager, req, pgProcessor{}) } // PullRecords pulls records from the source. @@ -348,6 +349,7 @@ func pullCore[Items model.Items]( ctx context.Context, c *PostgresConnector, catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, req *model.PullRecordsRequest[Items], processor replProcessor[Items], ) error { @@ -414,6 +416,7 @@ func pullCore[Items model.Items]( cdc := c.NewPostgresCDCSource(&PostgresCDCConfig{ CatalogPool: catalogPool, + OtelManager: otelManager, SrcTableIDNameMapping: req.SrcTableIDNameMapping, TableNameMapping: req.TableNameMapping, TableNameSchemaMapping: req.TableNameSchemaMapping, @@ -435,8 +438,7 @@ func pullCore[Items model.Items]( return fmt.Errorf("failed to get current LSN: %w", err) } - err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN)) - if err != nil { + if err := monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN)); err != nil { c.logger.Error("error updating latest LSN at source for CDC flow", slog.Any("error", err)) return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err) } @@ -1197,7 +1199,7 @@ func (c *PostgresConnector) HandleSlotInfo( alerter *alerting.Alerter, catalogPool *pgxpool.Pool, alertKeys *alerting.AlertKeys, - slotMetricGauges peerdb_gauges.SlotMetricGauges, + slotMetricGauges otel_metrics.SlotMetricGauges, ) error { logger := shared.LoggerFromCtx(ctx) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index f3c915c133..339c54a633 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -212,8 +212,8 @@ func (qe *QRepQueryExecutor) processFetchedRows( if err := rows.Err(); err != nil { stream.Close(err) qe.logger.Error("[pg_query_executor] row iteration failed", - slog.String("query", query), slog.Any("error", rows.Err())) - return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, rows.Err()) + slog.String("query", query), slog.Any("error", err)) + return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, err) } return numRows, nil diff --git a/flow/main.go b/flow/main.go index 9d499e957d..613c426340 100644 --- a/flow/main.go +++ b/flow/main.go @@ -144,7 +144,7 @@ func main() { if err != nil { return err } - defer res.Cleanup() + defer res.Close() return res.Worker.Run(worker.InterruptCh()) }, Flags: []cli.Flag{ diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 3bb2d1f248..054d6a42b1 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -30,8 +30,8 @@ func (s *QRecordStream) Schema() qvalue.QRecordSchema { func (s *QRecordStream) SetSchema(schema qvalue.QRecordSchema) { if !s.schemaSet { s.schema = schema - close(s.schemaLatch) s.schemaSet = true + close(s.schemaLatch) } } diff --git a/flow/otel_metrics/env.go b/flow/otel_metrics/env.go deleted file mode 100644 index 81b5d0c3ea..0000000000 --- a/flow/otel_metrics/env.go +++ /dev/null @@ -1,11 +0,0 @@ -package otel_metrics - -import "github.com/PeerDB-io/peer-flow/peerdbenv" - -func GetPeerDBOtelMetricsNamespace() string { - return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") -} - -func GetPeerDBOtelTemporalMetricsExportListEnv() string { - return peerdbenv.GetEnvString("PEERDB_OTEL_TEMPORAL_METRICS_EXPORT_LIST", "") -} diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index c59adecd41..099c733d22 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -17,46 +17,90 @@ import ( "github.com/PeerDB-io/peer-flow/peerdbenv" ) +const ( + SlotLagGaugeName string = "cdc_slot_lag" + OpenConnectionsGaugeName string = "open_connections" + OpenReplicationConnectionsGaugeName string = "open_replication_connections" + IntervalSinceLastNormalizeGaugeName string = "interval_since_last_normalize" + FetchedBytesCounterName string = "fetched_bytes" +) + +type SlotMetricGauges struct { + SlotLagGauge metric.Float64Gauge + OpenConnectionsGauge metric.Int64Gauge + OpenReplicationConnectionsGauge metric.Int64Gauge + IntervalSinceLastNormalizeGauge metric.Float64Gauge + FetchedBytesCounter metric.Int64Counter +} + +func BuildMetricName(baseName string) string { + return peerdbenv.GetPeerDBOtelMetricsNamespace() + baseName +} + type OtelManager struct { MetricsProvider *sdkmetric.MeterProvider Meter metric.Meter Float64GaugesCache map[string]metric.Float64Gauge Int64GaugesCache map[string]metric.Int64Gauge + Int64CountersCache map[string]metric.Int64Counter +} + +func (om *OtelManager) Close(ctx context.Context) error { + return om.MetricsProvider.Shutdown(ctx) +} + +func getOrInitMetric[M any, O any]( + cons func(metric.Meter, string, ...O) (M, error), + meter metric.Meter, + cache map[string]M, + name string, + opts ...O, +) (M, error) { + gauge, ok := cache[name] + if !ok { + var err error + gauge, err = cons(meter, name, opts...) + if err != nil { + var none M + return none, err + } + cache[name] = gauge + } + return gauge, nil +} + +func (om *OtelManager) GetOrInitInt64Gauge(name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { + return getOrInitMetric(metric.Meter.Int64Gauge, om.Meter, om.Int64GaugesCache, name, opts...) +} + +func (om *OtelManager) GetOrInitFloat64Gauge(name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { + return getOrInitMetric(metric.Meter.Float64Gauge, om.Meter, om.Float64GaugesCache, name, opts...) +} + +func (om *OtelManager) GetOrInitInt64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { + return getOrInitMetric(metric.Meter.Int64Counter, om.Meter, om.Int64CountersCache, name, opts...) } // newOtelResource returns a resource describing this application. func newOtelResource(otelServiceName string, attrs ...attribute.KeyValue) (*resource.Resource, error) { - allAttrs := []attribute.KeyValue{ - semconv.ServiceNameKey.String(otelServiceName), - } - allAttrs = append(allAttrs, attrs...) - r, err := resource.Merge( + allAttrs := append([]attribute.KeyValue{semconv.ServiceNameKey.String(otelServiceName)}, attrs...) + return resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, allAttrs..., ), ) - - return r, err -} - -func setupHttpOtelMetricsExporter() (sdkmetric.Exporter, error) { - return otlpmetrichttp.New(context.Background()) -} - -func setupGrpcOtelMetricsExporter() (sdkmetric.Exporter, error) { - return otlpmetricgrpc.New(context.Background()) } func temporalMetricsFilteringView() sdkmetric.View { - exportListString := GetPeerDBOtelTemporalMetricsExportListEnv() + exportListString := peerdbenv.GetPeerDBOtelTemporalMetricsExportListEnv() slog.Info("Found export list for temporal metrics", slog.String("exportList", exportListString)) // Special case for exporting all metrics if exportListString == "__ALL__" { return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { stream := sdkmetric.Stream{ - Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Name: BuildMetricName("temporal." + instrument.Name), Description: instrument.Description, Unit: instrument.Unit, } @@ -68,7 +112,7 @@ func temporalMetricsFilteringView() sdkmetric.View { if len(exportList) == 0 { return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { return sdkmetric.Stream{ - Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Name: BuildMetricName("temporal." + instrument.Name), Description: instrument.Description, Unit: instrument.Unit, Aggregation: sdkmetric.AggregationDrop{}, @@ -84,7 +128,7 @@ func temporalMetricsFilteringView() sdkmetric.View { } return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { stream := sdkmetric.Stream{ - Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Name: BuildMetricName("temporal." + instrument.Name), Description: instrument.Description, Unit: instrument.Unit, } @@ -95,16 +139,16 @@ func temporalMetricsFilteringView() sdkmetric.View { } } -func setupExporter() (sdkmetric.Exporter, error) { +func setupExporter(ctx context.Context) (sdkmetric.Exporter, error) { otlpMetricProtocol := peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_PROTOCOL", peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", "http/protobuf")) var metricExporter sdkmetric.Exporter var err error switch otlpMetricProtocol { case "http/protobuf": - metricExporter, err = setupHttpOtelMetricsExporter() + metricExporter, err = otlpmetrichttp.New(ctx) case "grpc": - metricExporter, err = setupGrpcOtelMetricsExporter() + metricExporter, err = otlpmetricgrpc.New(ctx) default: return nil, fmt.Errorf("unsupported otel metric protocol: %s", otlpMetricProtocol) } @@ -114,8 +158,8 @@ func setupExporter() (sdkmetric.Exporter, error) { return metricExporter, err } -func setupMetricsProvider(otelResource *resource.Resource, views ...sdkmetric.View) (*sdkmetric.MeterProvider, error) { - metricExporter, err := setupExporter() +func setupMetricsProvider(ctx context.Context, otelResource *resource.Resource, views ...sdkmetric.View) (*sdkmetric.MeterProvider, error) { + metricExporter, err := setupExporter(ctx) if err != nil { return nil, err } @@ -133,7 +177,7 @@ func SetupPeerDBMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvide if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) } - return setupMetricsProvider(otelResource) + return setupMetricsProvider(context.Background(), otelResource) } func SetupTemporalMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) { @@ -141,5 +185,5 @@ func SetupTemporalMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvi if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) } - return setupMetricsProvider(otelResource, temporalMetricsFilteringView()) + return setupMetricsProvider(context.Background(), otelResource, temporalMetricsFilteringView()) } diff --git a/flow/otel_metrics/peerdb_gauges/gauges.go b/flow/otel_metrics/peerdb_gauges/gauges.go deleted file mode 100644 index a3b7d5c3e8..0000000000 --- a/flow/otel_metrics/peerdb_gauges/gauges.go +++ /dev/null @@ -1,25 +0,0 @@ -package peerdb_gauges - -import ( - "go.opentelemetry.io/otel/metric" - - "github.com/PeerDB-io/peer-flow/otel_metrics" -) - -const ( - SlotLagGaugeName string = "cdc_slot_lag" - OpenConnectionsGaugeName string = "open_connections" - OpenReplicationConnectionsGaugeName string = "open_replication_connections" - IntervalSinceLastNormalizeGaugeName string = "interval_since_last_normalize" -) - -type SlotMetricGauges struct { - SlotLagGauge metric.Float64Gauge - OpenConnectionsGauge metric.Int64Gauge - OpenReplicationConnectionsGauge metric.Int64Gauge - IntervalSinceLastNormalizeGauge metric.Float64Gauge -} - -func BuildGaugeName(baseGaugeName string) string { - return otel_metrics.GetPeerDBOtelMetricsNamespace() + baseGaugeName -} diff --git a/flow/otel_metrics/sync_gauges.go b/flow/otel_metrics/sync_gauges.go deleted file mode 100644 index e9da02c875..0000000000 --- a/flow/otel_metrics/sync_gauges.go +++ /dev/null @@ -1,33 +0,0 @@ -package otel_metrics - -import ( - "go.opentelemetry.io/otel/metric" -) - -func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]metric.Int64Gauge, name string, opts ...metric.Int64GaugeOption, -) (metric.Int64Gauge, error) { - gauge, ok := cache[name] - if !ok { - var err error - gauge, err = meter.Int64Gauge(name, opts...) - if err != nil { - return nil, err - } - cache[name] = gauge - } - return gauge, nil -} - -func GetOrInitFloat64SyncGauge(meter metric.Meter, cache map[string]metric.Float64Gauge, name string, opts ...metric.Float64GaugeOption, -) (metric.Float64Gauge, error) { - gauge, ok := cache[name] - if !ok { - var err error - gauge, err = meter.Float64Gauge(name, opts...) - if err != nil { - return nil, err - } - cache[name] = gauge - } - return gauge, nil -} diff --git a/flow/peerdbenv/otel.go b/flow/peerdbenv/otel.go new file mode 100644 index 0000000000..d7f3cb68a6 --- /dev/null +++ b/flow/peerdbenv/otel.go @@ -0,0 +1,9 @@ +package peerdbenv + +func GetPeerDBOtelMetricsNamespace() string { + return GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") +} + +func GetPeerDBOtelTemporalMetricsExportListEnv() string { + return GetEnvString("PEERDB_OTEL_TEMPORAL_METRICS_EXPORT_LIST", "") +}