Skip to content

Commit

Permalink
Merge branch 'main' into tree-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 23, 2023
2 parents 01b9536 + 7e84b31 commit 6883f14
Show file tree
Hide file tree
Showing 19 changed files with 23 additions and 1,483 deletions.
10 changes: 0 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,6 @@ You can use Postgres’ eco-system to manage your ETL —

We support multiple target connectors to move data from Postgres and a couple of source connectors to move data into Postgres. Check the status of connectors [here](https://docs.peerdb.io/sql/commands/supported-connectors)

#### Metrics for MIRROR

Both types of MIRRORs export some crucial metrics with regards to the health of the MIRROR. By default, our development Docker stack does not capture or visualize these metrics. They are available in a Docker Compose profile called `metrics`, which can be enabled by:

```bash
# add --profile metrics like this in front of any docker compose command being used.
docker compose --profile metrics up --build
```

This sets up both a Prometheus instance on port 9090 that scrapes the metrics from the flow workers, and also a Grafana instance on port 3000 that reads and visualizes the metrics from mirrors in a preconfigured dashboard. To view the dashboard, access the Grafana instance on `localhost:3000` with the user `admin` and the password `peerdb`.

## License

Expand Down
46 changes: 1 addition & 45 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ x-flow-worker-env: &flow-worker-env
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
# enables worker profiling using Grafana Pyroscope
ENABLE_PROFILING: "true"
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"
PYROSCOPE_SERVER_ADDRESS: http://pyroscope:4040

services:
Expand Down Expand Up @@ -141,10 +139,6 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
- 6061:6061
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -157,13 +151,8 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
- 6063:6063
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -176,13 +165,8 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
- 6065:6065
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -205,45 +189,17 @@ services:
catalog:
condition: service_healthy

peerdb-prometheus:
container_name: peerdb-prometheus
build:
context: .
dockerfile: stacks/prometheus.Dockerfile
volumes:
- prometheusdata:/prometheus
ports:
- 9090:9090
profiles:
- multi-metrics
- metrics

peerdb-grafana:
container_name: peerdb-grafana
build:
context: .
dockerfile: stacks/grafana.Dockerfile
ports:
- 3000:3000
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: peerdb
profiles:
- multi-metrics
- metrics

peerdb-ui:
container_name: peerdb-ui
build:
context: .
dockerfile: stacks/peerdb-ui.Dockerfile
ports:
- 3001:3000
- 3000:3000
environment:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113

volumes:
pgdata:
prometheusdata:
45 changes: 1 addition & 44 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ x-flow-worker-env: &flow-worker-env
AWS_REGION: ${AWS_REGION:-}
# For GCS, set this as: https://storage.googleapis.com
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"

services:
catalog:
Expand Down Expand Up @@ -119,10 +117,6 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
- 6061:6061
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -132,13 +126,8 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
- 6063:6063
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -148,13 +137,8 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
- 6065:6065
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -175,38 +159,11 @@ services:
catalog:
condition: service_healthy

peerdb-prometheus:
container_name: peerdb-prometheus
build:
context: .
dockerfile: stacks/prometheus.Dockerfile
volumes:
- prometheusdata:/prometheus
ports:
- 9090:9090
profiles:
- multi-metrics
- metrics

peerdb-grafana:
container_name: peerdb-grafana
build:
context: .
dockerfile: stacks/grafana.Dockerfile
ports:
- 3000:3000
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: peerdb
profiles:
- multi-metrics
- metrics

peerdb-ui:
container_name: peerdb-ui
image: ghcr.io/peerdb-io/peerdb-ui:latest-dev
ports:
- 3001:3000
- 3000:3000
environment:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
Expand Down
5 changes: 0 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type SlotSnapshotSignal struct {
}

type FlowableActivity struct {
EnableMetrics bool
CatalogMirrorMonitor *monitoring.CatalogMirrorMonitor
}

Expand Down Expand Up @@ -163,7 +162,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -338,7 +336,6 @@ func (a *FlowableActivity) StartNormalize(
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -505,7 +502,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -606,7 +602,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string) error {
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
Expand Down
1 change: 0 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

type SnapshotActivity struct {
EnableMetrics bool
SnapshotConnections map[string]*SlotSnapshotSignal
}

Expand Down
18 changes: 0 additions & 18 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,13 @@ func main() {
EnvVars: []string{"ENABLE_PROFILING"},
}

metricsFlag := &cli.BoolFlag{
Name: "enable-metrics",
Value: false, // Default is off
Usage: "Enable metrics collection for the application",
EnvVars: []string{"ENABLE_METRICS"},
}

pyroscopeServerFlag := &cli.StringFlag{
Name: "pyroscope-server-address",
Value: "http://pyroscope:4040",
Usage: "HTTP server address for pyroscope",
EnvVars: []string{"PYROSCOPE_SERVER_ADDRESS"},
}

metricsServerFlag := &cli.StringFlag{
Name: "metrics-server",
Value: "localhost:6061", // Default is localhost:6061
Usage: "HTTP server address for metrics collection",
EnvVars: []string{"METRICS_SERVER"},
}

temporalNamespaceFlag := &cli.StringFlag{
Name: "temporal-namespace",
Value: "default",
Expand All @@ -87,9 +73,7 @@ func main() {
return WorkerMain(&WorkerOptions{
TemporalHostPort: temporalHostPort,
EnableProfiling: ctx.Bool("enable-profiling"),
EnableMetrics: ctx.Bool("enable-metrics"),
PyroscopeServer: ctx.String("pyroscope-server-address"),
MetricsServer: ctx.String("metrics-server"),
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
Expand All @@ -98,9 +82,7 @@ func main() {
Flags: []cli.Flag{
temporalHostPortFlag,
profilingFlag,
metricsFlag,
pyroscopeServerFlag,
metricsServerFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
Expand Down
42 changes: 0 additions & 42 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,23 @@ import (
"os/signal"
"runtime"
"syscall"
"time"

"github.com/PeerDB-io/peer-flow/activities"
utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
"github.com/uber-go/tally/v4"
"github.com/uber-go/tally/v4/prometheus"

"github.com/grafana/pyroscope-go"
prom "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.temporal.io/sdk/client"
sdktally "go.temporal.io/sdk/contrib/tally"
"go.temporal.io/sdk/worker"
)

type WorkerOptions struct {
TemporalHostPort string
EnableProfiling bool
EnableMetrics bool
PyroscopeServer string
MetricsServer string
TemporalNamespace string
TemporalCert string
TemporalKey string
Expand Down Expand Up @@ -111,15 +104,6 @@ func WorkerMain(opts *WorkerOptions) error {
clientOptions.ConnectionOptions = connOptions
}

if opts.EnableMetrics {
clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope(
prometheus.Configuration{
ListenAddress: opts.MetricsServer,
TimerType: "histogram",
},
))
}

conn, err := utils.GetCatalogConnectionPoolFromEnv()
if err != nil {
return fmt.Errorf("unable to create catalog connection pool: %w", err)
Expand All @@ -143,7 +127,6 @@ func WorkerMain(opts *WorkerOptions) error {
w.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterActivity(&activities.FlowableActivity{
EnableMetrics: opts.EnableMetrics,
CatalogMirrorMonitor: catalogMirrorMonitor,
})

Expand All @@ -154,28 +137,3 @@ func WorkerMain(opts *WorkerOptions) error {

return nil
}

func newPrometheusScope(c prometheus.Configuration) tally.Scope {
reporter, err := c.NewReporter(
prometheus.ConfigurationOptions{
Registry: prom.NewRegistry(),
OnError: func(err error) {
log.Println("error in prometheus reporter", err)
},
},
)
if err != nil {
log.Fatalln("error creating prometheus reporter", err)
}
scopeOpts := tally.ScopeOptions{
CachedReporter: reporter,
Separator: prometheus.DefaultSeparator,
SanitizeOptions: &sdktally.PrometheusSanitizeOptions,
Prefix: "flow_worker",
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
scope = sdktally.NewPrometheusNamingScope(scope)

log.Println("prometheus metrics scope created")
return scope
}
Loading

0 comments on commit 6883f14

Please sign in to comment.