From 44a8b0fb28ef7059848b2ee400bab20b6c3a91ac Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 21 Nov 2023 23:51:20 +0530 Subject: [PATCH] Support Temporal Cloud (#692) Adds environment variables in docker compose for client certificate and client key to connect to Temporal Cloud server. If a user wants to wire PeerDB with Temporal Cloud instead of local Temporal which we spin up, they need to: 1. Set `TEMPORAL_HOST_PORT` to the Temporal Cloud endpoint 2. Set the certificate and key variables Flow worker, Flow API and Flow Snapshot Worker use these values. Mirrors run will now appear in Temporal Cloud UI --- docker-compose-dev.yml | 12 +++++++++--- docker-compose.yml | 18 ++++++++---------- flow/cmd/api.go | 21 ++++++++++++++++++--- flow/cmd/main.go | 24 ++++++++++++++++++++++++ flow/cmd/snapshot_worker.go | 14 ++++++++++++++ flow/cmd/worker.go | 17 +++++++++++++++++ 6 files changed, 90 insertions(+), 16 deletions(-) diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index c6e4de2faa..3c955bef88 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -8,7 +8,14 @@ x-catalog-config: &catalog-config PEERDB_CATALOG_DATABASE: postgres x-flow-worker-env: &flow-worker-env + # For Temporal Cloud, this will look like: + # ..tmprl.cloud:7233 TEMPORAL_HOST_PORT: temporal:7233 + PEERDB_TEMPORAL_NAMESPACE: default + # For the below 2 cert and key variables, + # use yml multiline syntax with '|' + TEMPORAL_CLIENT_CERT: + TEMPORAL_CLIENT_KEY: # For GCS, these will be your HMAC keys instead # For more information: # https://cloud.google.com/storage/docs/authentication/managing-hmackeys @@ -109,8 +116,7 @@ services: - 8112:8112 - 8113:8113 environment: - <<: [*catalog-config] - TEMPORAL_HOST_PORT: temporal:7233 + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy @@ -122,7 +128,7 @@ services: dockerfile: stacks/flow.Dockerfile target: flow-snapshot-worker environment: - TEMPORAL_HOST_PORT: temporal:7233 + <<: [*flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy diff --git a/docker-compose.yml b/docker-compose.yml index d5d4dbbff6..0686913a8e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,9 @@ x-catalog-config: &catalog-config x-flow-worker-env: &flow-worker-env TEMPORAL_HOST_PORT: temporal:7233 + TEMPORAL_CLIENT_CERT: + TEMPORAL_CLIENT_KEY: + PEERDB_TEMPORAL_NAMESPACE: default # For GCS, these will be your HMAC keys instead # For more information: # https://cloud.google.com/storage/docs/authentication/managing-hmackeys @@ -21,9 +24,6 @@ x-flow-worker-env: &flow-worker-env # enables exporting of mirror metrics to Prometheus for visualization using Grafana ENABLE_METRICS: "true" -x-peerdb-temporal-namespace: &peerdb-temporal-namespace - PEERDB_TEMPORAL_NAMESPACE: default - services: catalog: container_name: catalog @@ -100,8 +100,7 @@ services: - 8112:8112 - 8113:8113 environment: - <<: [*catalog-config, *peerdb-temporal-namespace] - TEMPORAL_HOST_PORT: temporal:7233 + <<: [*catalog-config, *flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy @@ -110,8 +109,7 @@ services: container_name: flow-snapshot-worker image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev environment: - <<: [*peerdb-temporal-namespace] - TEMPORAL_HOST_PORT: temporal:7233 + <<: [*flow-worker-env] depends_on: temporal-admin-tools: condition: service_healthy @@ -120,7 +118,7 @@ services: container_name: flow-worker1 image: ghcr.io/peerdb-io/flow-worker:latest-dev environment: - <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] + <<: [*catalog-config, *flow-worker-env] METRICS_SERVER: 0.0.0.0:6061 ports: - 6060:6060 @@ -133,7 +131,7 @@ services: container_name: flow-worker2 image: ghcr.io/peerdb-io/flow-worker:latest-dev environment: - <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] + <<: [*catalog-config, *flow-worker-env] METRICS_SERVER: 0.0.0.0:6063 ports: - 6062:6062 @@ -149,7 +147,7 @@ services: container_name: flow-worker3 image: ghcr.io/peerdb-io/flow-worker:latest-dev environment: - <<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace] + <<: [*catalog-config, *flow-worker-env] METRICS_SERVER: 0.0.0.0:6065 ports: - 6064:6064 diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 9126dc2c06..98bb798a63 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/tls" "fmt" "net" "net/http" @@ -26,6 +27,8 @@ type APIServerParams struct { GatewayPort uint TemporalHostPort string TemporalNamespace string + TemporalCert string + TemporalKey string } // setupGRPCGatewayServer sets up the grpc-gateway mux @@ -57,11 +60,23 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) { func APIMain(args *APIServerParams) error { ctx := args.ctx - - tc, err := client.Dial(client.Options{ + clientOptions := client.Options{ HostPort: args.TemporalHostPort, Namespace: args.TemporalNamespace, - }) + } + if args.TemporalCert != "" && args.TemporalKey != "" { + cert, err := tls.X509KeyPair([]byte(args.TemporalCert), []byte(args.TemporalKey)) + if err != nil { + return fmt.Errorf("unable to obtain temporal key pair: %w", err) + } + + connOptions := client.ConnectionOptions{ + TLS: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + clientOptions.ConnectionOptions = connOptions + } + + tc, err := client.Dial(clientOptions) if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) } diff --git a/flow/cmd/main.go b/flow/cmd/main.go index 66e96996ce..74d2fd872e 100644 --- a/flow/cmd/main.go +++ b/flow/cmd/main.go @@ -30,6 +30,18 @@ func main() { EnvVars: []string{"TEMPORAL_HOST_PORT"}, } + temporalCertFlag := cli.StringFlag{ + Name: "temporal-cert", + Value: "", // default: no cert needed + EnvVars: []string{"TEMPORAL_CLIENT_CERT"}, + } + + temporalKeyFlag := cli.StringFlag{ + Name: "temporal-key", + Value: "", // default: no key needed + EnvVars: []string{"TEMPORAL_CLIENT_KEY"}, + } + profilingFlag := &cli.BoolFlag{ Name: "enable-profiling", Value: false, // Default is off @@ -79,6 +91,8 @@ func main() { PyroscopeServer: ctx.String("pyroscope-server-address"), MetricsServer: ctx.String("metrics-server"), TemporalNamespace: ctx.String("temporal-namespace"), + TemporalCert: ctx.String("temporal-cert"), + TemporalKey: ctx.String("temporal-key"), }) }, Flags: []cli.Flag{ @@ -88,6 +102,8 @@ func main() { pyroscopeServerFlag, metricsServerFlag, temporalNamespaceFlag, + &temporalCertFlag, + &temporalKeyFlag, }, }, { @@ -97,11 +113,15 @@ func main() { return SnapshotWorkerMain(&SnapshotWorkerOptions{ TemporalHostPort: temporalHostPort, TemporalNamespace: ctx.String("temporal-namespace"), + TemporalCert: ctx.String("temporal-cert"), + TemporalKey: ctx.String("temporal-key"), }) }, Flags: []cli.Flag{ temporalHostPortFlag, temporalNamespaceFlag, + &temporalCertFlag, + &temporalKeyFlag, }, }, { @@ -119,6 +139,8 @@ func main() { }, temporalHostPortFlag, temporalNamespaceFlag, + &temporalCertFlag, + &temporalKeyFlag, }, Action: func(ctx *cli.Context) error { temporalHostPort := ctx.String("temporal-host-port") @@ -129,6 +151,8 @@ func main() { TemporalHostPort: temporalHostPort, GatewayPort: ctx.Uint("gateway-port"), TemporalNamespace: ctx.String("temporal-namespace"), + TemporalCert: ctx.String("temporal-cert"), + TemporalKey: ctx.String("temporal-key"), }) }, }, diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 2b09bbc0f7..5b5ea65b3c 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -1,6 +1,7 @@ package main import ( + "crypto/tls" "fmt" "github.com/PeerDB-io/peer-flow/activities" @@ -14,6 +15,8 @@ import ( type SnapshotWorkerOptions struct { TemporalHostPort string TemporalNamespace string + TemporalCert string + TemporalKey string } func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { @@ -22,6 +25,17 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { Namespace: opts.TemporalNamespace, } + if opts.TemporalCert != "" && opts.TemporalKey != "" { + cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey)) + if err != nil { + return fmt.Errorf("unable to obtain temporal key pair: %w", err) + } + + connOptions := client.ConnectionOptions{ + TLS: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + clientOptions.ConnectionOptions = connOptions + } c, err := client.Dial(clientOptions) if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 2e9746b127..55ca6c0efd 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -1,6 +1,7 @@ package main import ( + "crypto/tls" "fmt" "os" "os/signal" @@ -31,6 +32,8 @@ type WorkerOptions struct { PyroscopeServer string MetricsServer string TemporalNamespace string + TemporalCert string + TemporalKey string } func setupPyroscope(opts *WorkerOptions) { @@ -95,6 +98,19 @@ func WorkerMain(opts *WorkerOptions) error { HostPort: opts.TemporalHostPort, Namespace: opts.TemporalNamespace, } + + if opts.TemporalCert != "" && opts.TemporalKey != "" { + cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey)) + if err != nil { + return fmt.Errorf("unable to obtain temporal key pair: %w", err) + } + + connOptions := client.ConnectionOptions{ + TLS: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + clientOptions.ConnectionOptions = connOptions + } + if opts.EnableMetrics { clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope( prometheus.Configuration{ @@ -115,6 +131,7 @@ func WorkerMain(opts *WorkerOptions) error { if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) } + log.Info("Created temporal client") defer c.Close() w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{})