Skip to content

Commit

Permalink
wires cert and key variables
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 21, 2023
1 parent e2e4276 commit ac3fc4e
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 6 deletions.
12 changes: 9 additions & 3 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ x-catalog-config: &catalog-config
PEERDB_CATALOG_DATABASE: postgres

x-flow-worker-env: &flow-worker-env
# For Temporal Cloud, this will look like:
# <yournamespace>.<id>.tmprl.cloud:7233
TEMPORAL_HOST_PORT: temporal:7233
PEERDB_TEMPORAL_NAMESPACE: default
# For the below 2 cert and key variables,
# use yml multiline syntax with '|'
TEMPORAL_CLIENT_CERT:
TEMPORAL_CLIENT_KEY:
# For GCS, these will be your HMAC keys instead
# For more information:
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
Expand Down Expand Up @@ -109,8 +116,7 @@ services:
- 8112:8112
- 8113:8113
environment:
<<: [*catalog-config]
TEMPORAL_HOST_PORT: temporal:7233
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -122,7 +128,7 @@ services:
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
TEMPORAL_HOST_PORT: temporal:7233
<<: [*flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ x-catalog-config: &catalog-config

x-flow-worker-env: &flow-worker-env
TEMPORAL_HOST_PORT: temporal:7233
TEMPORAL_CLIENT_CERT:
TEMPORAL_CLIENT_KEY:
# For GCS, these will be your HMAC keys instead
# For more information:
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
Expand Down
21 changes: 18 additions & 3 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
Expand All @@ -26,6 +27,8 @@ type APIServerParams struct {
GatewayPort uint
TemporalHostPort string
TemporalNamespace string
TemporalCert string
TemporalKey string
}

// setupGRPCGatewayServer sets up the grpc-gateway mux
Expand Down Expand Up @@ -57,11 +60,23 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {

func APIMain(args *APIServerParams) error {
ctx := args.ctx

tc, err := client.Dial(client.Options{
clientOptions := client.Options{
HostPort: args.TemporalHostPort,
Namespace: args.TemporalNamespace,
})
}
if args.TemporalCert != "" && args.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(args.TemporalCert), []byte(args.TemporalKey))
if err != nil {
log.Fatalln("Unable to load cert and key pair.", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
}
clientOptions.ConnectionOptions = connOptions
}

tc, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
}
Expand Down
24 changes: 24 additions & 0 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ func main() {
EnvVars: []string{"TEMPORAL_HOST_PORT"},
}

temporalCertFlag := cli.StringFlag{
Name: "temporal-cert",
Value: "", // default: no cert needed
EnvVars: []string{"TEMPORAL_CLIENT_CERT"},
}

temporalKeyFlag := cli.StringFlag{
Name: "temporal-key",
Value: "", // default: no key needed
EnvVars: []string{"TEMPORAL_CLIENT_KEY"},
}

profilingFlag := &cli.BoolFlag{
Name: "enable-profiling",
Value: false, // Default is off
Expand Down Expand Up @@ -79,6 +91,8 @@ func main() {
PyroscopeServer: ctx.String("pyroscope-server-address"),
MetricsServer: ctx.String("metrics-server"),
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
})
},
Flags: []cli.Flag{
Expand All @@ -88,6 +102,8 @@ func main() {
pyroscopeServerFlag,
metricsServerFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
},
},
{
Expand All @@ -97,11 +113,15 @@ func main() {
return SnapshotWorkerMain(&SnapshotWorkerOptions{
TemporalHostPort: temporalHostPort,
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
})
},
Flags: []cli.Flag{
temporalHostPortFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
},
},
{
Expand All @@ -119,6 +139,8 @@ func main() {
},
temporalHostPortFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
},
Action: func(ctx *cli.Context) error {
temporalHostPort := ctx.String("temporal-host-port")
Expand All @@ -129,6 +151,8 @@ func main() {
TemporalHostPort: temporalHostPort,
GatewayPort: ctx.Uint("gateway-port"),
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
})
},
},
Expand Down
15 changes: 15 additions & 0 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package main

import (
"crypto/tls"
"fmt"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
log "github.com/sirupsen/logrus"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
Expand All @@ -14,6 +16,8 @@ import (
type SnapshotWorkerOptions struct {
TemporalHostPort string
TemporalNamespace string
TemporalCert string
TemporalKey string
}

func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
Expand All @@ -22,6 +26,17 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
Namespace: opts.TemporalNamespace,
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey))
if err != nil {
log.Fatalln("Unable to load cert and key pair.", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
}
clientOptions.ConnectionOptions = connOptions
}
c, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
Expand Down
21 changes: 21 additions & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"crypto/tls"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -31,6 +32,8 @@ type WorkerOptions struct {
PyroscopeServer string
MetricsServer string
TemporalNamespace string
TemporalCert string
TemporalKey string
}

func setupPyroscope(opts *WorkerOptions) {
Expand Down Expand Up @@ -91,10 +94,27 @@ func WorkerMain(opts *WorkerOptions) error {
}
}()

log.Info("Temporal Host Port: ", opts.TemporalHostPort)
log.Info("Temporal Namespace: ", opts.TemporalNamespace)
log.Info("Temporal Cert: ", opts.TemporalCert)
log.Info("Temporal Key: ", opts.TemporalKey)
clientOptions := client.Options{
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey))
if err != nil {
log.Fatalln("Unable to load cert and key pair.", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
}
clientOptions.ConnectionOptions = connOptions
}

if opts.EnableMetrics {
clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope(
prometheus.Configuration{
Expand All @@ -115,6 +135,7 @@ func WorkerMain(opts *WorkerOptions) error {
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
}
log.Info("Created temporal client")
defer c.Close()

w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{})
Expand Down

0 comments on commit ac3fc4e

Please sign in to comment.