Skip to content

Commit

Permalink
Merge branch 'main' into wait-for-at-least-one
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 23, 2023
2 parents 41fd15d + 4b1447b commit e80b7c0
Show file tree
Hide file tree
Showing 65 changed files with 1,341 additions and 3,168 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
10 changes: 0 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,6 @@ You can use Postgres’ eco-system to manage your ETL —

We support multiple target connectors to move data from Postgres and a couple of source connectors to move data into Postgres. Check the status of connectors [here](https://docs.peerdb.io/sql/commands/supported-connectors)

#### Metrics for MIRROR

Both types of MIRRORs export some crucial metrics with regards to the health of the MIRROR. By default, our development Docker stack does not capture or visualize these metrics. They are available in a Docker Compose profile called `metrics`, which can be enabled by:

```bash
# add --profile metrics like this in front of any docker compose command being used.
docker compose --profile metrics up --build
```

This sets up both a Prometheus instance on port 9090 that scrapes the metrics from the flow workers, and also a Grafana instance on port 3000 that reads and visualizes the metrics from mirrors in a preconfigured dashboard. To view the dashboard, access the Grafana instance on `localhost:3000` with the user `admin` and the password `peerdb`.

## License

Expand Down
2 changes: 1 addition & 1 deletion dev-peerdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ then
exit 1
fi

docker compose -f docker-compose-dev.yml up --build\
docker compose -f docker-compose-dev.yml up --build \
--no-attach temporal --no-attach pyroscope --no-attach temporal-ui
52 changes: 4 additions & 48 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ x-flow-worker-env: &flow-worker-env
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
# enables worker profiling using Grafana Pyroscope
ENABLE_PROFILING: "true"
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"
PYROSCOPE_SERVER_ADDRESS: http://pyroscope:4040

services:
Expand Down Expand Up @@ -64,7 +62,7 @@ services:
- POSTGRES_PWD=postgres
- POSTGRES_SEEDS=catalog
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
image: temporalio/auto-setup:1.22.1
image: temporalio/auto-setup:1.22
ports:
- 7233:7233
volumes:
Expand All @@ -85,7 +83,7 @@ services:
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
image: temporalio/admin-tools:1.22.1
image: temporalio/admin-tools:1.22
stdin_open: true
tty: true
healthcheck:
Expand All @@ -102,7 +100,7 @@ services:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
- TEMPORAL_CSRF_COOKIE_INSECURE=true
image: temporalio/ui:2.17.2
image: temporalio/ui:2.21.3
ports:
- 8085:8080

Expand Down Expand Up @@ -141,10 +139,6 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
- 6061:6061
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -157,13 +151,8 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
- 6063:6063
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -176,13 +165,8 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
- 6065:6065
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -205,45 +189,17 @@ services:
catalog:
condition: service_healthy

peerdb-prometheus:
container_name: peerdb-prometheus
build:
context: .
dockerfile: stacks/prometheus.Dockerfile
volumes:
- prometheusdata:/prometheus
ports:
- 9090:9090
profiles:
- multi-metrics
- metrics

peerdb-grafana:
container_name: peerdb-grafana
build:
context: .
dockerfile: stacks/grafana.Dockerfile
ports:
- 3000:3000
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: peerdb
profiles:
- multi-metrics
- metrics

peerdb-ui:
container_name: peerdb-ui
build:
context: .
dockerfile: stacks/peerdb-ui.Dockerfile
ports:
- 3001:3000
- 3000:3000
environment:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113

volumes:
pgdata:
prometheusdata:
51 changes: 4 additions & 47 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ x-flow-worker-env: &flow-worker-env
AWS_REGION: ${AWS_REGION:-}
# For GCS, set this as: https://storage.googleapis.com
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"

services:
catalog:
Expand Down Expand Up @@ -57,7 +55,7 @@ services:
- POSTGRES_PWD=postgres
- POSTGRES_SEEDS=catalog
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
image: temporalio/auto-setup:1.22.1
image: temporalio/auto-setup:1.22
ports:
- 7233:7233
volumes:
Expand All @@ -72,7 +70,7 @@ services:
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
image: temporalio/admin-tools:1.22.1
image: temporalio/admin-tools:1.22
stdin_open: true
tty: true
healthcheck:
Expand All @@ -89,7 +87,7 @@ services:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CORS_ORIGINS=http://localhost:3000
- TEMPORAL_CSRF_COOKIE_INSECURE=true
image: temporalio/ui:2.17.2
image: temporalio/ui:2.21.3
ports:
- 8085:8080

Expand Down Expand Up @@ -119,10 +117,6 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
- 6061:6061
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -132,13 +126,8 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
- 6063:6063
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -148,13 +137,8 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
- 6065:6065
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -175,38 +159,11 @@ services:
catalog:
condition: service_healthy

peerdb-prometheus:
container_name: peerdb-prometheus
build:
context: .
dockerfile: stacks/prometheus.Dockerfile
volumes:
- prometheusdata:/prometheus
ports:
- 9090:9090
profiles:
- multi-metrics
- metrics

peerdb-grafana:
container_name: peerdb-grafana
build:
context: .
dockerfile: stacks/grafana.Dockerfile
ports:
- 3000:3000
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: peerdb
profiles:
- multi-metrics
- metrics

peerdb-ui:
container_name: peerdb-ui
image: ghcr.io/peerdb-io/peerdb-ui:latest-dev
ports:
- 3001:3000
- 3000:3000
environment:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
Expand Down
5 changes: 0 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type SlotSnapshotSignal struct {
}

type FlowableActivity struct {
EnableMetrics bool
CatalogMirrorMonitor *monitoring.CatalogMirrorMonitor
}

Expand Down Expand Up @@ -163,7 +162,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -338,7 +336,6 @@ func (a *FlowableActivity) StartNormalize(
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -505,7 +502,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -606,7 +602,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string) error {
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
Expand Down
1 change: 0 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

type SnapshotActivity struct {
EnableMetrics bool
SnapshotConnections map[string]*SlotSnapshotSignal
}

Expand Down
18 changes: 0 additions & 18 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,13 @@ func main() {
EnvVars: []string{"ENABLE_PROFILING"},
}

metricsFlag := &cli.BoolFlag{
Name: "enable-metrics",
Value: false, // Default is off
Usage: "Enable metrics collection for the application",
EnvVars: []string{"ENABLE_METRICS"},
}

pyroscopeServerFlag := &cli.StringFlag{
Name: "pyroscope-server-address",
Value: "http://pyroscope:4040",
Usage: "HTTP server address for pyroscope",
EnvVars: []string{"PYROSCOPE_SERVER_ADDRESS"},
}

metricsServerFlag := &cli.StringFlag{
Name: "metrics-server",
Value: "localhost:6061", // Default is localhost:6061
Usage: "HTTP server address for metrics collection",
EnvVars: []string{"METRICS_SERVER"},
}

temporalNamespaceFlag := &cli.StringFlag{
Name: "temporal-namespace",
Value: "default",
Expand All @@ -87,9 +73,7 @@ func main() {
return WorkerMain(&WorkerOptions{
TemporalHostPort: temporalHostPort,
EnableProfiling: ctx.Bool("enable-profiling"),
EnableMetrics: ctx.Bool("enable-metrics"),
PyroscopeServer: ctx.String("pyroscope-server-address"),
MetricsServer: ctx.String("metrics-server"),
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
Expand All @@ -98,9 +82,7 @@ func main() {
Flags: []cli.Flag{
temporalHostPortFlag,
profilingFlag,
metricsFlag,
pyroscopeServerFlag,
metricsServerFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
Expand Down
Loading

0 comments on commit e80b7c0

Please sign in to comment.