Skip to content

Commit

Permalink
split into otel stuff into seperate module
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed May 5, 2024
1 parent 8676ad8 commit 1ae80b2
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 64 deletions.
21 changes: 7 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand All @@ -42,18 +42,11 @@ type CdcCacheEntry struct {
done chan struct{}
}

type OtelManager struct {
MetricsProvider *sdkmetric.MeterProvider
Meter metric.Meter
Float64GaugesCache map[string]*shared.Float64Gauge
Int64GaugesCache map[string]*shared.Int64Gauge
}

type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
CdcCache map[string]CdcCacheEntry
OtelManager *OtelManager
OtelManager *otel_metrics.OtelManager
CdcCacheRw sync.RWMutex
}

Expand Down Expand Up @@ -603,20 +596,20 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
return
}

var slotLagGauge *shared.Float64Gauge
var openConnectionsGauge *shared.Int64Gauge
var slotLagGauge *otel_metrics.Float64Gauge
var openConnectionsGauge *otel_metrics.Int64Gauge
if a.OtelManager != nil {
slotLagGauge, err = shared.GetOrInitFloat64Gauge(a.OtelManager.Meter,
slotLagGauge, err = otel_metrics.GetOrInitFloat64Gauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
"cdc_slotlag",
"cdc_slot_lag",
metric.WithUnit("MB"),
metric.WithDescription("Postgres replication slot lag in MB"))
if err != nil {
logger.Error("Failed to get slot lag gauge", slog.Any("error", err))
return
}

openConnectionsGauge, err = shared.GetOrInitInt64Gauge(a.OtelManager.Meter,
openConnectionsGauge, err = otel_metrics.GetOrInitInt64Gauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
"open_connections",
metric.WithDescription("Current open connections for PeerDB user"))
Expand Down
51 changes: 7 additions & 44 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,16 @@ import (
"log/slog"
"os"
"runtime"
"time"

"github.com/grafana/pyroscope-go"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -85,40 +82,6 @@ func setupPyroscope(opts *WorkerSetupOptions) {
}
}

// newResource returns a resource describing this application.
func newOtelResource(otelServiceName string) (*resource.Resource, error) {
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(otelServiceName),
),
)

return r, err
}

func setupOtelMetricsExporter() (*sdkmetric.MeterProvider, error) {
metricExporter, err := otlpmetrichttp.New(context.Background(),
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
)
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err)
}

resource, err := newOtelResource("flow-worker")
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err)
}

meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter,
sdkmetric.WithInterval(1*time.Minute))),
sdkmetric.WithResource(resource),
)
return meterProvider, nil
}

func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
if opts.EnableProfiling {
setupPyroscope(opts)
Expand Down Expand Up @@ -175,17 +138,17 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
peerflow.RegisterFlowWorkerWorkflows(w)

var metricsProvider *sdkmetric.MeterProvider
var otelManager *activities.OtelManager
var otelManager *otel_metrics.OtelManager
if opts.EnableOtelMetrics {
metricsProvider, err = setupOtelMetricsExporter()
metricsProvider, err = otel_metrics.SetupOtelMetricsExporter()
if err != nil {
return nil, err
}
otelManager = &activities.OtelManager{
otelManager = &otel_metrics.OtelManager{
MetricsProvider: metricsProvider,
Meter: metricsProvider.Meter("flow-worker"),
Float64GaugesCache: make(map[string]*shared.Float64Gauge),
Int64GaugesCache: make(map[string]*shared.Int64Gauge),
Meter: metricsProvider.Meter("io.peerdb.flow-worker"),
Float64GaugesCache: make(map[string]*otel_metrics.Float64Gauge),
Int64GaugesCache: make(map[string]*otel_metrics.Int64Gauge),
}
}
w.RegisterActivity(&activities.FlowableActivity{
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/otel_metrics"
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
Expand Down Expand Up @@ -75,8 +75,8 @@ type CDCPullConnectorCore interface {

// HandleSlotInfo update monitoring info on slot size etc
HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter,
catalogPool *pgxpool.Pool, slotName string, peerName string, slotLagGauge *shared.Float64Gauge,
openConnectionsGauge *shared.Int64Gauge) error
catalogPool *pgxpool.Pool, slotName string, peerName string,
slotLagGauge *otel_metrics.Float64Gauge, openConnectionsGauge *otel_metrics.Int64Gauge) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(ctx context.Context, slotName string) ([]*protos.SlotInfo, error)
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -1111,8 +1112,8 @@ func (c *PostgresConnector) HandleSlotInfo(
catalogPool *pgxpool.Pool,
slotName string,
peerName string,
slotLagGauge *shared.Float64Gauge,
openConnectionsGauge *shared.Int64Gauge,
slotLagGauge *otel_metrics.Float64Gauge,
openConnectionsGauge *otel_metrics.Int64Gauge,
) error {
logger := logger.LoggerFromCtx(ctx)

Expand Down
54 changes: 54 additions & 0 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package otel_metrics

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
)

type OtelManager struct {
MetricsProvider *sdkmetric.MeterProvider
Meter metric.Meter
Float64GaugesCache map[string]*Float64Gauge
Int64GaugesCache map[string]*Int64Gauge
}

// newOtelResource returns a resource describing this application.
func newOtelResource(otelServiceName string) (*resource.Resource, error) {
r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(otelServiceName),
),
)

return r, err
}

func SetupOtelMetricsExporter() (*sdkmetric.MeterProvider, error) {
metricExporter, err := otlpmetrichttp.New(context.Background(),
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
)
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err)
}

resource, err := newOtelResource("flow-worker")
if err != nil {
return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err)
}

meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter,
sdkmetric.WithInterval(3*time.Second))),
sdkmetric.WithResource(resource),
)
return meterProvider, nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package shared
package otel_metrics

import (
"context"
Expand Down

0 comments on commit 1ae80b2

Please sign in to comment.