diff --git a/docker-compose.yml b/docker-compose.yml index 315b7b01d0..9085191e34 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,9 @@ x-flow-worker-env: &flow-worker-env ENABLE_METRICS: "true" PYROSCOPE_SERVER_ADDRESS: http://pyroscope:4040 +x-peerdb-temporal-namespace: &peerdb-temporal-namespace + PEERDB_TEMPORAL_NAMESPACE: default + services: catalog: container_name: catalog @@ -108,7 +111,7 @@ services: - 8112:8112 - 8113:8113 environment: - <<: [*catalog-config] + <<: [*catalog-config, *peerdb-temporal-namespace] TEMPORAL_HOST_PORT: temporal:7233 depends_on: temporal-admin-tools: @@ -120,6 +123,7 @@ services: context: . dockerfile: stacks/flow-snapshot-worker.Dockerfile environment: + <<: [*peerdb-temporal-namespace] TEMPORAL_HOST_PORT: temporal:7233 depends_on: temporal-admin-tools: @@ -131,7 +135,7 @@ services: context: . dockerfile: stacks/flow-worker.Dockerfile environment: - <<: [*catalog-config, *flow-worker-env] + <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] METRICS_SERVER: 0.0.0.0:6061 ports: - 6060:6060 @@ -146,7 +150,7 @@ services: context: . dockerfile: stacks/flow-worker.Dockerfile environment: - <<: [*catalog-config, *flow-worker-env] + <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] METRICS_SERVER: 0.0.0.0:6063 ports: - 6062:6062 @@ -164,7 +168,7 @@ services: context: . dockerfile: stacks/flow-worker.Dockerfile environment: - <<: [*catalog-config, *flow-worker-env] + <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] METRICS_SERVER: 0.0.0.0:6065 ports: - 6064:6064 diff --git a/flow/cmd/api.go b/flow/cmd/api.go index a3401826d1..9126dc2c06 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -21,10 +21,11 @@ import ( ) type APIServerParams struct { - ctx context.Context - Port uint - GatewayPort uint - TemporalHostPort string + ctx context.Context + Port uint + GatewayPort uint + TemporalHostPort string + TemporalNamespace string } // setupGRPCGatewayServer sets up the grpc-gateway mux @@ -58,7 +59,8 @@ func APIMain(args *APIServerParams) error { ctx := args.ctx tc, err := client.Dial(client.Options{ - HostPort: args.TemporalHostPort, + HostPort: args.TemporalHostPort, + Namespace: args.TemporalNamespace, }) if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) diff --git a/flow/cmd/main.go b/flow/cmd/main.go index 68b01c58b8..66e96996ce 100644 --- a/flow/cmd/main.go +++ b/flow/cmd/main.go @@ -58,6 +58,13 @@ func main() { EnvVars: []string{"METRICS_SERVER"}, } + temporalNamespaceFlag := &cli.StringFlag{ + Name: "temporal-namespace", + Value: "default", + Usage: "Temporal namespace to use for workflow orchestration", + EnvVars: []string{"PEERDB_TEMPORAL_NAMESPACE"}, + } + app := &cli.App{ Name: "PeerDB Flows CLI", Commands: []*cli.Command{ @@ -66,11 +73,12 @@ func main() { Action: func(ctx *cli.Context) error { temporalHostPort := ctx.String("temporal-host-port") return WorkerMain(&WorkerOptions{ - TemporalHostPort: temporalHostPort, - EnableProfiling: ctx.Bool("enable-profiling"), - EnableMetrics: ctx.Bool("enable-metrics"), - PyroscopeServer: ctx.String("pyroscope-server-address"), - MetricsServer: ctx.String("metrics-server"), + TemporalHostPort: temporalHostPort, + EnableProfiling: ctx.Bool("enable-profiling"), + EnableMetrics: ctx.Bool("enable-metrics"), + PyroscopeServer: ctx.String("pyroscope-server-address"), + MetricsServer: ctx.String("metrics-server"), + TemporalNamespace: ctx.String("temporal-namespace"), }) }, Flags: []cli.Flag{ @@ -79,6 +87,7 @@ func main() { metricsFlag, pyroscopeServerFlag, metricsServerFlag, + temporalNamespaceFlag, }, }, { @@ -86,11 +95,13 @@ func main() { Action: func(ctx *cli.Context) error { temporalHostPort := ctx.String("temporal-host-port") return SnapshotWorkerMain(&SnapshotWorkerOptions{ - TemporalHostPort: temporalHostPort, + TemporalHostPort: temporalHostPort, + TemporalNamespace: ctx.String("temporal-namespace"), }) }, Flags: []cli.Flag{ temporalHostPortFlag, + temporalNamespaceFlag, }, }, { @@ -107,15 +118,17 @@ func main() { Value: 8111, }, temporalHostPortFlag, + temporalNamespaceFlag, }, Action: func(ctx *cli.Context) error { temporalHostPort := ctx.String("temporal-host-port") return APIMain(&APIServerParams{ - ctx: appCtx, - Port: ctx.Uint("port"), - TemporalHostPort: temporalHostPort, - GatewayPort: ctx.Uint("gateway-port"), + ctx: appCtx, + Port: ctx.Uint("port"), + TemporalHostPort: temporalHostPort, + GatewayPort: ctx.Uint("gateway-port"), + TemporalNamespace: ctx.String("temporal-namespace"), }) }, }, diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 043d5069cd..2b09bbc0f7 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -12,12 +12,14 @@ import ( ) type SnapshotWorkerOptions struct { - TemporalHostPort string + TemporalHostPort string + TemporalNamespace string } func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { clientOptions := client.Options{ - HostPort: opts.TemporalHostPort, + HostPort: opts.TemporalHostPort, + Namespace: opts.TemporalNamespace, } c, err := client.Dial(clientOptions) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 4ffe2b7b95..2e9746b127 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -25,11 +25,12 @@ import ( ) type WorkerOptions struct { - TemporalHostPort string - EnableProfiling bool - EnableMetrics bool - PyroscopeServer string - MetricsServer string + TemporalHostPort string + EnableProfiling bool + EnableMetrics bool + PyroscopeServer string + MetricsServer string + TemporalNamespace string } func setupPyroscope(opts *WorkerOptions) { @@ -90,21 +91,17 @@ func WorkerMain(opts *WorkerOptions) error { } }() - var clientOptions client.Options + clientOptions := client.Options{ + HostPort: opts.TemporalHostPort, + Namespace: opts.TemporalNamespace, + } if opts.EnableMetrics { - clientOptions = client.Options{ - HostPort: opts.TemporalHostPort, - MetricsHandler: sdktally.NewMetricsHandler(newPrometheusScope( - prometheus.Configuration{ - ListenAddress: opts.MetricsServer, - TimerType: "histogram", - }, - )), - } - } else { - clientOptions = client.Options{ - HostPort: opts.TemporalHostPort, - } + clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope( + prometheus.Configuration{ + ListenAddress: opts.MetricsServer, + TimerType: "histogram", + }, + )) } conn, err := utils.GetCatalogConnectionPoolFromEnv()