Skip to content

Commit

Permalink
Merge branch 'main' into ui/queue-sync-interval
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored May 6, 2024
2 parents 18804a0 + ea58d02 commit fca9652
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 20 deletions.
30 changes: 29 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand All @@ -44,6 +46,7 @@ type FlowableActivity struct {
CatalogPool *pgxpool.Pool
Alerter *alerting.Alerter
CdcCache map[string]CdcCacheEntry
OtelManager *otel_metrics.OtelManager
CdcCacheRw sync.RWMutex
}

Expand Down Expand Up @@ -592,7 +595,32 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
if ctx.Err() != nil {
return
}
err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName)

var slotLagGauge *otel_metrics.Float64Gauge
var openConnectionsGauge *otel_metrics.Int64Gauge
if a.OtelManager != nil {
slotLagGauge, err = otel_metrics.GetOrInitFloat64Gauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
"cdc_slot_lag",
metric.WithUnit("MB"),
metric.WithDescription("Postgres replication slot lag in MB"))
if err != nil {
logger.Error("Failed to get slot lag gauge", slog.Any("error", err))
return
}

openConnectionsGauge, err = otel_metrics.GetOrInitInt64Gauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
"open_connections",
metric.WithDescription("Current open connections for PeerDB user"))
if err != nil {
logger.Error("Failed to get open connections gauge", slog.Any("error", err))
return
}
}

err = srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName,
slotLagGauge, openConnectionsGauge)
if err != nil {
logger.Error("Failed to handle slot info", slog.Any("error", err))
}
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func APIMain(ctx context.Context, args *APIServerParams) error {
if args.TemporalCert != "" && args.TemporalKey != "" {
slog.Info("Using temporal certificate/key for authentication")

certs, err := Base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey)
certs, err := base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey)
if err != nil {
return fmt.Errorf("unable to base64 decode certificate and key: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/cert.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"
)

func Base64DecodeCertAndKey(cert string, key string) ([]tls.Certificate, error) {
func base64DecodeCertAndKey(cert string, key string) ([]tls.Certificate, error) {
temporalCert := strings.TrimSpace(cert)
certBytes, err := base64.StdEncoding.DecodeString(temporalCert)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
certs, err := base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err)
}
Expand Down
52 changes: 44 additions & 8 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (
"runtime"

"github.com/grafana/pyroscope-go"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)

type WorkerOptions struct {
type WorkerSetupOptions struct {
TemporalHostPort string
PyroscopeServer string
TemporalNamespace string
Expand All @@ -30,9 +32,16 @@ type WorkerOptions struct {
TemporalMaxConcurrentActivities int
TemporalMaxConcurrentWorkflowTasks int
EnableProfiling bool
EnableOtelMetrics bool
}

func setupPyroscope(opts *WorkerOptions) {
type workerSetupResponse struct {
Client client.Client
Worker worker.Worker
Cleanup func()
}

func setupPyroscope(opts *WorkerSetupOptions) {
if opts.PyroscopeServer == "" {
log.Fatal("pyroscope server address is not set but profiling is enabled")
}
Expand Down Expand Up @@ -73,7 +82,7 @@ func setupPyroscope(opts *WorkerOptions) {
}
}

func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {
func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
if opts.EnableProfiling {
setupPyroscope(opts)
}
Expand All @@ -86,9 +95,9 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {

if opts.TemporalCert != "" && opts.TemporalKey != "" {
slog.Info("Using temporal certificate/key for authentication")
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
certs, err := base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return nil, nil, fmt.Errorf("unable to process certificate and key: %w", err)
return nil, fmt.Errorf("unable to process certificate and key: %w", err)
}
connOptions := client.ConnectionOptions{
TLS: &tls.Config{
Expand All @@ -101,12 +110,12 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {

conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(context.Background())
if err != nil {
return nil, nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
return nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

c, err := client.Dial(clientOptions)
if err != nil {
return nil, nil, fmt.Errorf("unable to create Temporal client: %w", err)
return nil, fmt.Errorf("unable to create Temporal client: %w", err)
}
slog.Info("Created temporal client")

Expand All @@ -128,11 +137,38 @@ func WorkerMain(opts *WorkerOptions) (client.Client, worker.Worker, error) {
})
peerflow.RegisterFlowWorkerWorkflows(w)

var metricsProvider *sdkmetric.MeterProvider
var otelManager *otel_metrics.OtelManager
if opts.EnableOtelMetrics {
metricsProvider, err = otel_metrics.SetupOtelMetricsExporter("flow-worker")
if err != nil {
return nil, err
}
otelManager = &otel_metrics.OtelManager{
MetricsProvider: metricsProvider,
Meter: metricsProvider.Meter("io.peerdb.flow-worker"),
Float64GaugesCache: make(map[string]*otel_metrics.Float64Gauge),
Int64GaugesCache: make(map[string]*otel_metrics.Int64Gauge),
}
}
w.RegisterActivity(&activities.FlowableActivity{
CatalogPool: conn,
Alerter: alerting.NewAlerter(context.Background(), conn),
CdcCache: make(map[string]activities.CdcCacheEntry),
OtelManager: otelManager,
})

return c, w, nil
return &workerSetupResponse{
Client: c,
Worker: w,
Cleanup: func() {
if otelManager != nil {
err := otelManager.MetricsProvider.Shutdown(context.Background())
if err != nil {
slog.Error("Failed to shutdown metrics provider", slog.Any("error", err))
}
}
c.Close()
},
}, nil
}
5 changes: 4 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
)

var ErrUnsupportedFunctionality = errors.New("requested connector does not support functionality")
Expand Down Expand Up @@ -73,7 +74,9 @@ type CDCPullConnectorCore interface {
PullFlowCleanup(ctx context.Context, jobName string) error

// HandleSlotInfo update monitoring info on slot size etc
HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter, catalogPool *pgxpool.Pool, slotName string, peerName string) error
HandleSlotInfo(ctx context.Context, alerter *alerting.Alerter,
catalogPool *pgxpool.Pool, slotName string, peerName string,
slotLagGauge *otel_metrics.Float64Gauge, openConnectionsGauge *otel_metrics.Int64Gauge) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(ctx context.Context, slotName string) ([]*protos.SlotInfo, error)
Expand Down
12 changes: 12 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel/attribute"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand All @@ -26,6 +27,8 @@ import (
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -1109,6 +1112,8 @@ func (c *PostgresConnector) HandleSlotInfo(
catalogPool *pgxpool.Pool,
slotName string,
peerName string,
slotLagGauge *otel_metrics.Float64Gauge,
openConnectionsGauge *otel_metrics.Int64Gauge,
) error {
logger := logger.LoggerFromCtx(ctx)

Expand All @@ -1125,6 +1130,10 @@ func (c *PostgresConnector) HandleSlotInfo(

logger.Info(fmt.Sprintf("Checking %s lag for %s", slotName, peerName), slog.Float64("LagInMB", float64(slotInfo[0].LagInMb)))
alerter.AlertIfSlotLag(ctx, peerName, slotInfo[0])
slotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet(
attribute.String("peerName", peerName),
attribute.String("slotName", slotName),
attribute.String("deploymentUID", peerdbenv.PeerDBDeploymentUID())))

// Also handles alerts for PeerDB user connections exceeding a given limit here
res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User)
Expand All @@ -1133,6 +1142,9 @@ func (c *PostgresConnector) HandleSlotInfo(
return err
}
alerter.AlertIfOpenConnections(ctx, peerName, res)
openConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet(
attribute.String("peerName", peerName),
attribute.String("deploymentUID", peerdbenv.PeerDBDeploymentUID())))

return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0])
}
Expand Down
9 changes: 7 additions & 2 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ require (
github.com/urfave/cli/v3 v3.0.0-alpha9
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76
github.com/yuin/gopher-lua v1.1.1
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0
go.opentelemetry.io/otel/metric v1.26.0
go.opentelemetry.io/otel/sdk v1.26.0
go.opentelemetry.io/otel/sdk/metric v1.26.0
go.temporal.io/api v1.32.0
go.temporal.io/sdk v1.26.1
go.uber.org/automaxprocs v1.5.3
Expand Down Expand Up @@ -77,6 +82,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
Expand Down Expand Up @@ -110,9 +116,8 @@ require (
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/term v0.19.0 // indirect
)

Expand Down
12 changes: 10 additions & 2 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -427,12 +429,18 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc=
go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs=
go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0 h1:HGZWGmCVRCVyAs2GQaiHQPbDHo+ObFWeUEOd+zDnp64=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0/go.mod h1:SaH+v38LSCHddyk7RGlU9uZyQoRrKao6IBnJw6Kbn+c=
go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30=
go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8=
go.opentelemetry.io/otel/sdk v1.26.0/go.mod h1:0p8MXpqLeJ0pzcszQQN4F0S5FVjBLgypeGSngLsmirs=
go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZHcAyHw5aU9Y=
go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE=
go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA=
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94=
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
go.temporal.io/api v1.32.0 h1:Jv0FieWDq0HJVqoHRE/kRHM+tIaRtR16RbXZZl+8Qb4=
go.temporal.io/api v1.32.0/go.mod h1:MClRjMCgXZTKmxyItEJPRR5NuJRBhSEpuF9wuh97N6U=
go.temporal.io/sdk v1.26.1 h1:ggmFBythnuuW3yQRp0VzOTrmbOf+Ddbe00TZl+CQ+6U=
Expand Down
14 changes: 11 additions & 3 deletions flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func main() {
Usage: "Enable profiling for the application",
Sources: cli.EnvVars("ENABLE_PROFILING"),
}
otelMetricsFlag := &cli.BoolFlag{
Name: "enable-otel-metrics",
Value: false, // Default is off
Usage: "Enable OpenTelemetry metrics for the application",
Sources: cli.EnvVars("ENABLE_OTEL_METRICS"),
}

pyroscopeServerFlag := &cli.StringFlag{
Name: "pyroscope-server-address",
Expand Down Expand Up @@ -83,9 +89,10 @@ func main() {
Name: "worker",
Action: func(ctx context.Context, clicmd *cli.Command) error {
temporalHostPort := clicmd.String("temporal-host-port")
c, w, err := cmd.WorkerMain(&cmd.WorkerOptions{
res, err := cmd.WorkerSetup(&cmd.WorkerSetupOptions{
TemporalHostPort: temporalHostPort,
EnableProfiling: clicmd.Bool("enable-profiling"),
EnableOtelMetrics: clicmd.Bool("enable-otel-metrics"),
PyroscopeServer: clicmd.String("pyroscope-server-address"),
TemporalNamespace: clicmd.String("temporal-namespace"),
TemporalCert: clicmd.String("temporal-cert"),
Expand All @@ -96,12 +103,13 @@ func main() {
if err != nil {
return err
}
defer c.Close()
return w.Run(worker.InterruptCh())
defer res.Cleanup()
return res.Worker.Run(worker.InterruptCh())
},
Flags: []cli.Flag{
temporalHostPortFlag,
profilingFlag,
otelMetricsFlag,
pyroscopeServerFlag,
temporalNamespaceFlag,
&temporalCertFlag,
Expand Down
Loading

0 comments on commit fca9652

Please sign in to comment.