diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index c6e4de2faa..3c955bef88 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -8,7 +8,14 @@ x-catalog-config: &catalog-config PEERDB_CATALOG_DATABASE: postgres x-flow-worker-env: &flow-worker-env + # For Temporal Cloud, this will look like: + # ..tmprl.cloud:7233 TEMPORAL_HOST_PORT: temporal:7233 + PEERDB_TEMPORAL_NAMESPACE: default + # For the below 2 cert and key variables, + # use yml multiline syntax with '|' + TEMPORAL_CLIENT_CERT: + TEMPORAL_CLIENT_KEY: # For GCS, these will be your HMAC keys instead # For more information: # https://cloud.google.com/storage/docs/authentication/managing-hmackeys @@ -109,8 +116,7 @@ services: - 8112:8112 - 8113:8113 environment: - <<: [*catalog-config] - TEMPORAL_HOST_PORT: temporal:7233 + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy @@ -122,7 +128,7 @@ services: dockerfile: stacks/flow.Dockerfile target: flow-snapshot-worker environment: - TEMPORAL_HOST_PORT: temporal:7233 + <<: [*flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy diff --git a/docker-compose.yml b/docker-compose.yml index d5d4dbbff6..0686913a8e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,9 @@ x-catalog-config: &catalog-config x-flow-worker-env: &flow-worker-env TEMPORAL_HOST_PORT: temporal:7233 + TEMPORAL_CLIENT_CERT: + TEMPORAL_CLIENT_KEY: + PEERDB_TEMPORAL_NAMESPACE: default # For GCS, these will be your HMAC keys instead # For more information: # https://cloud.google.com/storage/docs/authentication/managing-hmackeys @@ -21,9 +24,6 @@ x-flow-worker-env: &flow-worker-env # enables exporting of mirror metrics to Prometheus for visualization using Grafana ENABLE_METRICS: "true" -x-peerdb-temporal-namespace: &peerdb-temporal-namespace - PEERDB_TEMPORAL_NAMESPACE: default - services: catalog: container_name: catalog @@ -100,8 +100,7 @@ services: - 8112:8112 - 8113:8113 environment: - <<: [*catalog-config, *peerdb-temporal-namespace] - TEMPORAL_HOST_PORT: temporal:7233 + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy @@ -110,8 +109,7 @@ services: container_name: flow-snapshot-worker image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev environment: - <<: [*peerdb-temporal-namespace] - TEMPORAL_HOST_PORT: temporal:7233 + <<: [*flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy @@ -120,7 +118,7 @@ services: container_name: flow-worker1 image: ghcr.io/peerdb-io/flow-worker:latest-dev environment: - <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] + <<: [*catalog-config, *flow-worker-env] METRICS_SERVER: 0.0.0.0:6061 ports: - 6060:6060 @@ -133,7 +131,7 @@ services: container_name: flow-worker2 image: ghcr.io/peerdb-io/flow-worker:latest-dev environment: - <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] + <<: [*catalog-config, *flow-worker-env] METRICS_SERVER: 0.0.0.0:6063 ports: - 6062:6062 @@ -149,7 +147,7 @@ services: container_name: flow-worker3 image: ghcr.io/peerdb-io/flow-worker:latest-dev environment: - <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] + <<: [*catalog-config, *flow-worker-env] METRICS_SERVER: 0.0.0.0:6065 ports: - 6064:6064 diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 9126dc2c06..98bb798a63 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/tls" "fmt" "net" "net/http" @@ -26,6 +27,8 @@ type APIServerParams struct { GatewayPort uint TemporalHostPort string TemporalNamespace string + TemporalCert string + TemporalKey string } // setupGRPCGatewayServer sets up the grpc-gateway mux @@ -57,11 +60,23 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) { func APIMain(args *APIServerParams) error { ctx := args.ctx - - tc, err := client.Dial(client.Options{ + clientOptions := client.Options{ HostPort: args.TemporalHostPort, Namespace: args.TemporalNamespace, - }) + } + if args.TemporalCert != "" && args.TemporalKey != "" { + cert, err := tls.X509KeyPair([]byte(args.TemporalCert), []byte(args.TemporalKey)) + if err != nil { + return fmt.Errorf("unable to obtain temporal key pair: %w", err) + } + + connOptions := client.ConnectionOptions{ + TLS: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + clientOptions.ConnectionOptions = connOptions + } + + tc, err := client.Dial(clientOptions) if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) } diff --git a/flow/cmd/main.go b/flow/cmd/main.go index 66e96996ce..74d2fd872e 100644 --- a/flow/cmd/main.go +++ b/flow/cmd/main.go @@ -30,6 +30,18 @@ func main() { EnvVars: []string{"TEMPORAL_HOST_PORT"}, } + temporalCertFlag := cli.StringFlag{ + Name: "temporal-cert", + Value: "", // default: no cert needed + EnvVars: []string{"TEMPORAL_CLIENT_CERT"}, + } + + temporalKeyFlag := cli.StringFlag{ + Name: "temporal-key", + Value: "", // default: no key needed + EnvVars: []string{"TEMPORAL_CLIENT_KEY"}, + } + profilingFlag := &cli.BoolFlag{ Name: "enable-profiling", Value: false, // Default is off @@ -79,6 +91,8 @@ func main() { PyroscopeServer: ctx.String("pyroscope-server-address"), MetricsServer: ctx.String("metrics-server"), TemporalNamespace: ctx.String("temporal-namespace"), + TemporalCert: ctx.String("temporal-cert"), + TemporalKey: ctx.String("temporal-key"), }) }, Flags: []cli.Flag{ @@ -88,6 +102,8 @@ func main() { pyroscopeServerFlag, metricsServerFlag, temporalNamespaceFlag, + &temporalCertFlag, + &temporalKeyFlag, }, }, { @@ -97,11 +113,15 @@ func main() { return SnapshotWorkerMain(&SnapshotWorkerOptions{ TemporalHostPort: temporalHostPort, TemporalNamespace: ctx.String("temporal-namespace"), + TemporalCert: ctx.String("temporal-cert"), + TemporalKey: ctx.String("temporal-key"), }) }, Flags: []cli.Flag{ temporalHostPortFlag, temporalNamespaceFlag, + &temporalCertFlag, + &temporalKeyFlag, }, }, { @@ -119,6 +139,8 @@ func main() { }, temporalHostPortFlag, temporalNamespaceFlag, + &temporalCertFlag, + &temporalKeyFlag, }, Action: func(ctx *cli.Context) error { temporalHostPort := ctx.String("temporal-host-port") @@ -129,6 +151,8 @@ func main() { TemporalHostPort: temporalHostPort, GatewayPort: ctx.Uint("gateway-port"), TemporalNamespace: ctx.String("temporal-namespace"), + TemporalCert: ctx.String("temporal-cert"), + TemporalKey: ctx.String("temporal-key"), }) }, }, diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 2b09bbc0f7..5b5ea65b3c 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -1,6 +1,7 @@ package main import ( + "crypto/tls" "fmt" "github.com/PeerDB-io/peer-flow/activities" @@ -14,6 +15,8 @@ import ( type SnapshotWorkerOptions struct { TemporalHostPort string TemporalNamespace string + TemporalCert string + TemporalKey string } func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { @@ -22,6 +25,17 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { Namespace: opts.TemporalNamespace, } + if opts.TemporalCert != "" && opts.TemporalKey != "" { + cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey)) + if err != nil { + return fmt.Errorf("unable to obtain temporal key pair: %w", err) + } + + connOptions := client.ConnectionOptions{ + TLS: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + clientOptions.ConnectionOptions = connOptions + } c, err := client.Dial(clientOptions) if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 2e9746b127..55ca6c0efd 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -1,6 +1,7 @@ package main import ( + "crypto/tls" "fmt" "os" "os/signal" @@ -31,6 +32,8 @@ type WorkerOptions struct { PyroscopeServer string MetricsServer string TemporalNamespace string + TemporalCert string + TemporalKey string } func setupPyroscope(opts *WorkerOptions) { @@ -95,6 +98,19 @@ func WorkerMain(opts *WorkerOptions) error { HostPort: opts.TemporalHostPort, Namespace: opts.TemporalNamespace, } + + if opts.TemporalCert != "" && opts.TemporalKey != "" { + cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey)) + if err != nil { + return fmt.Errorf("unable to obtain temporal key pair: %w", err) + } + + connOptions := client.ConnectionOptions{ + TLS: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + clientOptions.ConnectionOptions = connOptions + } + if opts.EnableMetrics { clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope( prometheus.Configuration{ @@ -115,6 +131,7 @@ func WorkerMain(opts *WorkerOptions) error { if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) } + log.Info("Created temporal client") defer c.Close() w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{})