Skip to content

Commit

Permalink
Merge branch 'main' into full-table-partition-patches
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 23, 2023
2 parents baf5cfe + 4b1447b commit ecffa96
Show file tree
Hide file tree
Showing 54 changed files with 1,286 additions and 3,093 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 3
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
10 changes: 0 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,6 @@ You can use Postgres’ eco-system to manage your ETL —

We support multiple target connectors to move data from Postgres and a couple of source connectors to move data into Postgres. Check the status of connectors [here](https://docs.peerdb.io/sql/commands/supported-connectors)

#### Metrics for MIRROR

Both types of MIRRORs export some crucial metrics with regards to the health of the MIRROR. By default, our development Docker stack does not capture or visualize these metrics. They are available in a Docker Compose profile called `metrics`, which can be enabled by:

```bash
# add --profile metrics like this in front of any docker compose command being used.
docker compose --profile metrics up --build
```

This sets up both a Prometheus instance on port 9090 that scrapes the metrics from the flow workers, and also a Grafana instance on port 3000 that reads and visualizes the metrics from mirrors in a preconfigured dashboard. To view the dashboard, access the Grafana instance on `localhost:3000` with the user `admin` and the password `peerdb`.

## License

Expand Down
46 changes: 1 addition & 45 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ x-flow-worker-env: &flow-worker-env
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
# enables worker profiling using Grafana Pyroscope
ENABLE_PROFILING: "true"
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"
PYROSCOPE_SERVER_ADDRESS: http://pyroscope:4040

services:
Expand Down Expand Up @@ -141,10 +139,6 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
- 6061:6061
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -157,13 +151,8 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
- 6063:6063
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -176,13 +165,8 @@ services:
target: flow-worker
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
- 6065:6065
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -205,45 +189,17 @@ services:
catalog:
condition: service_healthy

peerdb-prometheus:
container_name: peerdb-prometheus
build:
context: .
dockerfile: stacks/prometheus.Dockerfile
volumes:
- prometheusdata:/prometheus
ports:
- 9090:9090
profiles:
- multi-metrics
- metrics

peerdb-grafana:
container_name: peerdb-grafana
build:
context: .
dockerfile: stacks/grafana.Dockerfile
ports:
- 3000:3000
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: peerdb
profiles:
- multi-metrics
- metrics

peerdb-ui:
container_name: peerdb-ui
build:
context: .
dockerfile: stacks/peerdb-ui.Dockerfile
ports:
- 3001:3000
- 3000:3000
environment:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
PEERDB_FLOW_SERVER_HTTP: http://flow_api:8113

volumes:
pgdata:
prometheusdata:
45 changes: 1 addition & 44 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ x-flow-worker-env: &flow-worker-env
AWS_REGION: ${AWS_REGION:-}
# For GCS, set this as: https://storage.googleapis.com
AWS_ENDPOINT: ${AWS_ENDPOINT:-}
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"

services:
catalog:
Expand Down Expand Up @@ -119,10 +117,6 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
- 6061:6061
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -132,13 +126,8 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
- 6063:6063
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -148,13 +137,8 @@ services:
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
- 6065:6065
profiles:
- multi
- multi-metrics
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -175,38 +159,11 @@ services:
catalog:
condition: service_healthy

peerdb-prometheus:
container_name: peerdb-prometheus
build:
context: .
dockerfile: stacks/prometheus.Dockerfile
volumes:
- prometheusdata:/prometheus
ports:
- 9090:9090
profiles:
- multi-metrics
- metrics

peerdb-grafana:
container_name: peerdb-grafana
build:
context: .
dockerfile: stacks/grafana.Dockerfile
ports:
- 3000:3000
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: peerdb
profiles:
- multi-metrics
- metrics

peerdb-ui:
container_name: peerdb-ui
image: ghcr.io/peerdb-io/peerdb-ui:latest-dev
ports:
- 3001:3000
- 3000:3000
environment:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
Expand Down
5 changes: 0 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type SlotSnapshotSignal struct {
}

type FlowableActivity struct {
EnableMetrics bool
CatalogMirrorMonitor *monitoring.CatalogMirrorMonitor
}

Expand Down Expand Up @@ -163,7 +162,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
activity.RecordHeartbeat(ctx, "starting flow...")
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
ctx = context.WithValue(ctx, shared.CDCMirrorMonitorKey, a.CatalogMirrorMonitor)

dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -338,7 +336,6 @@ func (a *FlowableActivity) StartNormalize(
) (*model.NormalizeResponse, error) {
conn := input.FlowConnectionConfigs

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
dstConn, err := connectors.GetCDCNormalizeConnector(ctx, conn.Destination)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination)
Expand Down Expand Up @@ -505,7 +502,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -606,7 +602,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,

func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string) error {
ctx = context.WithValue(ctx, shared.EnableMetricsKey, a.EnableMetrics)
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return a.CatalogMirrorMonitor.UpdateEndTimeForQRepRun(ctx, runUUID)
Expand Down
1 change: 0 additions & 1 deletion flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

type SnapshotActivity struct {
EnableMetrics bool
SnapshotConnections map[string]*SlotSnapshotSignal
}

Expand Down
18 changes: 0 additions & 18 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,13 @@ func main() {
EnvVars: []string{"ENABLE_PROFILING"},
}

metricsFlag := &cli.BoolFlag{
Name: "enable-metrics",
Value: false, // Default is off
Usage: "Enable metrics collection for the application",
EnvVars: []string{"ENABLE_METRICS"},
}

pyroscopeServerFlag := &cli.StringFlag{
Name: "pyroscope-server-address",
Value: "http://pyroscope:4040",
Usage: "HTTP server address for pyroscope",
EnvVars: []string{"PYROSCOPE_SERVER_ADDRESS"},
}

metricsServerFlag := &cli.StringFlag{
Name: "metrics-server",
Value: "localhost:6061", // Default is localhost:6061
Usage: "HTTP server address for metrics collection",
EnvVars: []string{"METRICS_SERVER"},
}

temporalNamespaceFlag := &cli.StringFlag{
Name: "temporal-namespace",
Value: "default",
Expand All @@ -87,9 +73,7 @@ func main() {
return WorkerMain(&WorkerOptions{
TemporalHostPort: temporalHostPort,
EnableProfiling: ctx.Bool("enable-profiling"),
EnableMetrics: ctx.Bool("enable-metrics"),
PyroscopeServer: ctx.String("pyroscope-server-address"),
MetricsServer: ctx.String("metrics-server"),
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
Expand All @@ -98,9 +82,7 @@ func main() {
Flags: []cli.Flag{
temporalHostPortFlag,
profilingFlag,
metricsFlag,
pyroscopeServerFlag,
metricsServerFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
Expand Down
42 changes: 37 additions & 5 deletions flow/cmd/peer_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,40 @@ func (h *FlowRequestHandler) GetColumns(
}

defer peerPool.Close()
rows, err := peerPool.Query(ctx, "SELECT column_name, data_type"+
" FROM information_schema.columns"+
" WHERE table_schema = $1 AND table_name = $2;", req.SchemaName, req.TableName)
rows, err := peerPool.Query(ctx, `
SELECT
cols.column_name,
cols.data_type,
CASE
WHEN constraint_type = 'PRIMARY KEY' THEN true
ELSE false
END AS is_primary_key
FROM
information_schema.columns cols
LEFT JOIN
(
SELECT
kcu.column_name,
tc.constraint_type
FROM
information_schema.key_column_usage kcu
JOIN
information_schema.table_constraints tc
ON
kcu.constraint_name = tc.constraint_name
AND kcu.constraint_schema = tc.constraint_schema
AND kcu.constraint_name = tc.constraint_name
WHERE
tc.constraint_type = 'PRIMARY KEY'
AND kcu.table_schema = $1
AND kcu.table_name = $2
) AS pk
ON
cols.column_name = pk.column_name
WHERE
cols.table_schema = $3
AND cols.table_name = $4;
`, req.SchemaName, req.TableName, req.SchemaName, req.TableName)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
}
Expand All @@ -147,11 +178,12 @@ func (h *FlowRequestHandler) GetColumns(
for rows.Next() {
var columnName string
var datatype string
err := rows.Scan(&columnName, &datatype)
var isPkey bool
err := rows.Scan(&columnName, &datatype, &isPkey)
if err != nil {
return &protos.TableColumnsResponse{Columns: nil}, err
}
column := fmt.Sprintf("%s:%s", columnName, datatype)
column := fmt.Sprintf("%s:%s:%v", columnName, datatype, isPkey)
columns = append(columns, column)
}
return &protos.TableColumnsResponse{Columns: columns}, nil
Expand Down
Loading

0 comments on commit ecffa96

Please sign in to comment.