Skip to content

Commit

Permalink
otel: fetched bytes (#2276)
Browse files Browse the repository at this point in the history
bit of code cleanup along with adding an otel counter of postgres cdc payload sizes
  • Loading branch information
serprex authored Nov 26, 2024
1 parent ff84606 commit 7deabd8
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 145 deletions.
23 changes: 9 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -759,11 +758,10 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
return
}

slotMetricGauges := peerdb_gauges.SlotMetricGauges{}
slotMetricGauges := otel_metrics.SlotMetricGauges{}
if a.OtelManager != nil {
slotLagGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.SlotLagGaugeName),
slotLagGauge, err := a.OtelManager.GetOrInitFloat64Gauge(
otel_metrics.BuildMetricName(otel_metrics.SlotLagGaugeName),
metric.WithUnit("MiBy"),
metric.WithDescription("Postgres replication slot lag in MB"))
if err != nil {
Expand All @@ -772,29 +770,26 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
}
slotMetricGauges.SlotLagGauge = slotLagGauge

openConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenConnectionsGaugeName),
openConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.OpenConnectionsGaugeName),
metric.WithDescription("Current open connections for PeerDB user"))
if err != nil {
logger.Error("Failed to get open connections gauge", slog.Any("error", err))
return
}
slotMetricGauges.OpenConnectionsGauge = openConnectionsGauge

openReplicationConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenReplicationConnectionsGaugeName),
openReplicationConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.OpenReplicationConnectionsGaugeName),
metric.WithDescription("Current open replication connections for PeerDB user"))
if err != nil {
logger.Error("Failed to get open replication connections gauge", slog.Any("error", err))
return
}
slotMetricGauges.OpenReplicationConnectionsGauge = openReplicationConnectionsGauge

intervalSinceLastNormalizeGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.IntervalSinceLastNormalizeGaugeName),
intervalSinceLastNormalizeGauge, err := a.OtelManager.GetOrInitFloat64Gauge(
otel_metrics.BuildMetricName(otel_metrics.IntervalSinceLastNormalizeGaugeName),
metric.WithUnit("s"),
metric.WithDescription("Interval since last normalize"))
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -113,7 +114,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
options *protos.SyncFlowOptions,
sessionID string,
adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error),
pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[Items]) error,
pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error),
) (*model.SyncCompositeResponse, error) {
flowName := config.FlowJobName
Expand Down Expand Up @@ -181,7 +182,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
startTime := time.Now()
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[Items]{
return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
Expand Down
31 changes: 15 additions & 16 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ type WorkerSetupOptions struct {
}

type workerSetupResponse struct {
Client client.Client
Worker worker.Worker
Cleanup func()
Client client.Client
Worker worker.Worker
OtelManager *otel_metrics.OtelManager
}

func (w *workerSetupResponse) Close() {
w.Client.Close()
if w.OtelManager != nil {
if err := w.OtelManager.Close(context.Background()); err != nil {
slog.Error("Failed to shutdown metrics provider", slog.Any("error", err))
}
}
}

func setupPyroscope(opts *WorkerSetupOptions) {
Expand Down Expand Up @@ -148,7 +157,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
})
peerflow.RegisterFlowWorkerWorkflows(w)

cleanupOtelManagerFunc := func() {}
var otelManager *otel_metrics.OtelManager
if opts.EnableOtelMetrics {
metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker")
Expand All @@ -161,12 +169,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
Float64GaugesCache: make(map[string]metric.Float64Gauge),
Int64GaugesCache: make(map[string]metric.Int64Gauge),
}
cleanupOtelManagerFunc = func() {
shutDownErr := otelManager.MetricsProvider.Shutdown(context.Background())
if shutDownErr != nil {
slog.Error("Failed to shutdown metrics provider", slog.Any("error", shutDownErr))
}
}
}
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Expand All @@ -182,11 +184,8 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
})

return &workerSetupResponse{
Client: c,
Worker: w,
Cleanup: func() {
cleanupOtelManagerFunc()
c.Close()
},
Client: c,
Worker: w,
OtelManager: otelManager,
}, nil
}
18 changes: 14 additions & 4 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -85,7 +85,7 @@ type CDCPullConnectorCore interface {
alerter *alerting.Alerter,
catalogPool *pgxpool.Pool,
alertKeys *alerting.AlertKeys,
slotMetricGauges peerdb_gauges.SlotMetricGauges,
slotMetricGauges otel_metrics.SlotMetricGauges,
) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
Expand All @@ -102,15 +102,25 @@ type CDCPullConnector interface {
CDCPullConnectorCore

// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.RecordItems]) error
PullRecords(
ctx context.Context,
catalogPool *pgxpool.Pool,
otelManager *otel_metrics.OtelManager,
req *model.PullRecordsRequest[model.RecordItems],
) error
}

type CDCPullPgConnector interface {
CDCPullConnectorCore

// This method should be idempotent, and should be able to be called multiple times with the same request.
// It's signature, aside from type parameter, should match CDCPullConnector.PullRecords.
PullPg(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.PgItems]) error
PullPg(
ctx context.Context,
catalogPool *pgxpool.Pool,
otelManager *otel_metrics.OtelManager,
req *model.PullRecordsRequest[model.PgItems],
) error
}

type NormalizedTablesConnector interface {
Expand Down
30 changes: 26 additions & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/lib/pq/oid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/activity"

connmetadata "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
Expand All @@ -22,6 +24,8 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand All @@ -41,12 +45,14 @@ type PostgresCDCSource struct {

// for storing schema delta audit logs to catalog
catalogPool *pgxpool.Pool
otelManager *otel_metrics.OtelManager
hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{}
flowJobName string
}

type PostgresCDCConfig struct {
CatalogPool *pgxpool.Pool
OtelManager *otel_metrics.OtelManager
SrcTableIDNameMapping map[uint32]string
TableNameMapping map[string]model.NameAndExclude
TableNameSchemaMapping map[string]*protos.TableSchema
Expand All @@ -71,6 +77,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *
commitLock: nil,
childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap,
catalogPool: cdcConfig.CatalogPool,
otelManager: cdcConfig.OtelManager,
flowJobName: cdcConfig.FlowJobName,
hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}),
}
Expand Down Expand Up @@ -331,8 +338,7 @@ func PullCdcRecords[Items model.Items](
records.SignalAsEmpty()
}
logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len()))
err := cdcRecordsStorage.Close()
if err != nil {
if err := cdcRecordsStorage.Close(); err != nil {
logger.Warn("failed to clean up records storage", slog.Any("error", err))
}
}()
Expand Down Expand Up @@ -361,6 +367,16 @@ func PullCdcRecords[Items model.Items](
return nil
}

var fetchedBytesCounter metric.Int64Counter
if p.otelManager != nil {
var err error
fetchedBytesCounter, err = p.otelManager.GetOrInitInt64Counter(otel_metrics.BuildMetricName(otel_metrics.FetchedBytesCounterName),
metric.WithUnit("By"), metric.WithDescription("Bytes received of CopyData over replication slot"))
if err != nil {
return fmt.Errorf("could not get FetchedBytesCounter: %w", err)
}
}

pkmRequiresResponse := false
waitingForCommit := false

Expand Down Expand Up @@ -439,8 +455,7 @@ func PullCdcRecords[Items model.Items](
}()
cancel()

ctxErr := ctx.Err()
if ctxErr != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
return fmt.Errorf("consumeStream preempted: %w", ctxErr)
}

Expand All @@ -463,6 +478,13 @@ func PullCdcRecords[Items model.Items](
continue
}

if fetchedBytesCounter != nil {
fetchedBytesCounter.Add(ctx, int64(len(msg.Data)), metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowNameKey, req.FlowJobName),
attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()),
)))
}

switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:])
Expand Down
14 changes: 8 additions & 6 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -330,24 +329,27 @@ func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, l
func (c *PostgresConnector) PullRecords(
ctx context.Context,
catalogPool *pgxpool.Pool,
otelManager *otel_metrics.OtelManager,
req *model.PullRecordsRequest[model.RecordItems],
) error {
return pullCore(ctx, c, catalogPool, req, qProcessor{})
return pullCore(ctx, c, catalogPool, otelManager, req, qProcessor{})
}

func (c *PostgresConnector) PullPg(
ctx context.Context,
catalogPool *pgxpool.Pool,
otelManager *otel_metrics.OtelManager,
req *model.PullRecordsRequest[model.PgItems],
) error {
return pullCore(ctx, c, catalogPool, req, pgProcessor{})
return pullCore(ctx, c, catalogPool, otelManager, req, pgProcessor{})
}

// PullRecords pulls records from the source.
func pullCore[Items model.Items](
ctx context.Context,
c *PostgresConnector,
catalogPool *pgxpool.Pool,
otelManager *otel_metrics.OtelManager,
req *model.PullRecordsRequest[Items],
processor replProcessor[Items],
) error {
Expand Down Expand Up @@ -414,6 +416,7 @@ func pullCore[Items model.Items](

cdc := c.NewPostgresCDCSource(&PostgresCDCConfig{
CatalogPool: catalogPool,
OtelManager: otelManager,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
TableNameMapping: req.TableNameMapping,
TableNameSchemaMapping: req.TableNameSchemaMapping,
Expand All @@ -435,8 +438,7 @@ func pullCore[Items model.Items](
return fmt.Errorf("failed to get current LSN: %w", err)
}

err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN))
if err != nil {
if err := monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN)); err != nil {
c.logger.Error("error updating latest LSN at source for CDC flow", slog.Any("error", err))
return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err)
}
Expand Down Expand Up @@ -1197,7 +1199,7 @@ func (c *PostgresConnector) HandleSlotInfo(
alerter *alerting.Alerter,
catalogPool *pgxpool.Pool,
alertKeys *alerting.AlertKeys,
slotMetricGauges peerdb_gauges.SlotMetricGauges,
slotMetricGauges otel_metrics.SlotMetricGauges,
) error {
logger := shared.LoggerFromCtx(ctx)

Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ func (qe *QRepQueryExecutor) processFetchedRows(
if err := rows.Err(); err != nil {
stream.Close(err)
qe.logger.Error("[pg_query_executor] row iteration failed",
slog.String("query", query), slog.Any("error", rows.Err()))
return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, rows.Err())
slog.String("query", query), slog.Any("error", err))
return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, err)
}

return numRows, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func main() {
if err != nil {
return err
}
defer res.Cleanup()
defer res.Close()
return res.Worker.Run(worker.InterruptCh())
},
Flags: []cli.Flag{
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func (s *QRecordStream) Schema() qvalue.QRecordSchema {
func (s *QRecordStream) SetSchema(schema qvalue.QRecordSchema) {
if !s.schemaSet {
s.schema = schema
close(s.schemaLatch)
s.schemaSet = true
close(s.schemaLatch)
}
}

Expand Down
11 changes: 0 additions & 11 deletions flow/otel_metrics/env.go

This file was deleted.

Loading

0 comments on commit 7deabd8

Please sign in to comment.