Skip to content

Commit

Permalink
added env var for configuring Temporal namespace (#533)
Browse files Browse the repository at this point in the history
Closes #497
  • Loading branch information
heavycrystal authored Oct 18, 2023
1 parent 47078b8 commit 6207e23
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 40 deletions.
12 changes: 8 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ x-flow-worker-env: &flow-worker-env
ENABLE_METRICS: "true"
PYROSCOPE_SERVER_ADDRESS: http://pyroscope:4040

x-peerdb-temporal-namespace: &peerdb-temporal-namespace
PEERDB_TEMPORAL_NAMESPACE: default

services:
catalog:
container_name: catalog
Expand Down Expand Up @@ -108,7 +111,7 @@ services:
- 8112:8112
- 8113:8113
environment:
<<: [*catalog-config]
<<: [*catalog-config, *peerdb-temporal-namespace]
TEMPORAL_HOST_PORT: temporal:7233
depends_on:
temporal-admin-tools:
Expand All @@ -120,6 +123,7 @@ services:
context: .
dockerfile: stacks/flow-snapshot-worker.Dockerfile
environment:
<<: [*peerdb-temporal-namespace]
TEMPORAL_HOST_PORT: temporal:7233
depends_on:
temporal-admin-tools:
Expand All @@ -131,7 +135,7 @@ services:
context: .
dockerfile: stacks/flow-worker.Dockerfile
environment:
<<: [*catalog-config, *flow-worker-env]
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
Expand All @@ -146,7 +150,7 @@ services:
context: .
dockerfile: stacks/flow-worker.Dockerfile
environment:
<<: [*catalog-config, *flow-worker-env]
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
Expand All @@ -164,7 +168,7 @@ services:
context: .
dockerfile: stacks/flow-worker.Dockerfile
environment:
<<: [*catalog-config, *flow-worker-env]
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
Expand Down
12 changes: 7 additions & 5 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
)

type APIServerParams struct {
ctx context.Context
Port uint
GatewayPort uint
TemporalHostPort string
ctx context.Context
Port uint
GatewayPort uint
TemporalHostPort string
TemporalNamespace string
}

// setupGRPCGatewayServer sets up the grpc-gateway mux
Expand Down Expand Up @@ -58,7 +59,8 @@ func APIMain(args *APIServerParams) error {
ctx := args.ctx

tc, err := client.Dial(client.Options{
HostPort: args.TemporalHostPort,
HostPort: args.TemporalHostPort,
Namespace: args.TemporalNamespace,
})
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
Expand Down
33 changes: 23 additions & 10 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ func main() {
EnvVars: []string{"METRICS_SERVER"},
}

temporalNamespaceFlag := &cli.StringFlag{
Name: "temporal-namespace",
Value: "default",
Usage: "Temporal namespace to use for workflow orchestration",
EnvVars: []string{"PEERDB_TEMPORAL_NAMESPACE"},
}

app := &cli.App{
Name: "PeerDB Flows CLI",
Commands: []*cli.Command{
Expand All @@ -66,11 +73,12 @@ func main() {
Action: func(ctx *cli.Context) error {
temporalHostPort := ctx.String("temporal-host-port")
return WorkerMain(&WorkerOptions{
TemporalHostPort: temporalHostPort,
EnableProfiling: ctx.Bool("enable-profiling"),
EnableMetrics: ctx.Bool("enable-metrics"),
PyroscopeServer: ctx.String("pyroscope-server-address"),
MetricsServer: ctx.String("metrics-server"),
TemporalHostPort: temporalHostPort,
EnableProfiling: ctx.Bool("enable-profiling"),
EnableMetrics: ctx.Bool("enable-metrics"),
PyroscopeServer: ctx.String("pyroscope-server-address"),
MetricsServer: ctx.String("metrics-server"),
TemporalNamespace: ctx.String("temporal-namespace"),
})
},
Flags: []cli.Flag{
Expand All @@ -79,18 +87,21 @@ func main() {
metricsFlag,
pyroscopeServerFlag,
metricsServerFlag,
temporalNamespaceFlag,
},
},
{
Name: "snapshot-worker",
Action: func(ctx *cli.Context) error {
temporalHostPort := ctx.String("temporal-host-port")
return SnapshotWorkerMain(&SnapshotWorkerOptions{
TemporalHostPort: temporalHostPort,
TemporalHostPort: temporalHostPort,
TemporalNamespace: ctx.String("temporal-namespace"),
})
},
Flags: []cli.Flag{
temporalHostPortFlag,
temporalNamespaceFlag,
},
},
{
Expand All @@ -107,15 +118,17 @@ func main() {
Value: 8111,
},
temporalHostPortFlag,
temporalNamespaceFlag,
},
Action: func(ctx *cli.Context) error {
temporalHostPort := ctx.String("temporal-host-port")

return APIMain(&APIServerParams{
ctx: appCtx,
Port: ctx.Uint("port"),
TemporalHostPort: temporalHostPort,
GatewayPort: ctx.Uint("gateway-port"),
ctx: appCtx,
Port: ctx.Uint("port"),
TemporalHostPort: temporalHostPort,
GatewayPort: ctx.Uint("gateway-port"),
TemporalNamespace: ctx.String("temporal-namespace"),
})
},
},
Expand Down
6 changes: 4 additions & 2 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
)

type SnapshotWorkerOptions struct {
TemporalHostPort string
TemporalHostPort string
TemporalNamespace string
}

func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
}

c, err := client.Dial(clientOptions)
Expand Down
35 changes: 16 additions & 19 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
)

type WorkerOptions struct {
TemporalHostPort string
EnableProfiling bool
EnableMetrics bool
PyroscopeServer string
MetricsServer string
TemporalHostPort string
EnableProfiling bool
EnableMetrics bool
PyroscopeServer string
MetricsServer string
TemporalNamespace string
}

func setupPyroscope(opts *WorkerOptions) {
Expand Down Expand Up @@ -90,21 +91,17 @@ func WorkerMain(opts *WorkerOptions) error {
}
}()

var clientOptions client.Options
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
}
if opts.EnableMetrics {
clientOptions = client.Options{
HostPort: opts.TemporalHostPort,
MetricsHandler: sdktally.NewMetricsHandler(newPrometheusScope(
prometheus.Configuration{
ListenAddress: opts.MetricsServer,
TimerType: "histogram",
},
)),
}
} else {
clientOptions = client.Options{
HostPort: opts.TemporalHostPort,
}
clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope(
prometheus.Configuration{
ListenAddress: opts.MetricsServer,
TimerType: "histogram",
},
))
}

conn, err := utils.GetCatalogConnectionPoolFromEnv()
Expand Down

0 comments on commit 6207e23

Please sign in to comment.