Skip to content

Commit 358eea8

Browse files
authored
Merge branch 'main' into unflake-test-pls
2 parents abb795e + 62e2e5c commit 358eea8

10 files changed

+167
-98
lines changed

docker-compose-dev.yml

+9-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ x-catalog-config: &catalog-config
88
PEERDB_CATALOG_DATABASE: postgres
99

1010
x-flow-worker-env: &flow-worker-env
11+
# For Temporal Cloud, this will look like:
12+
# <yournamespace>.<id>.tmprl.cloud:7233
1113
TEMPORAL_HOST_PORT: temporal:7233
14+
PEERDB_TEMPORAL_NAMESPACE: default
15+
# For the below 2 cert and key variables,
16+
# use yml multiline syntax with '|'
17+
TEMPORAL_CLIENT_CERT:
18+
TEMPORAL_CLIENT_KEY:
1219
# For GCS, these will be your HMAC keys instead
1320
# For more information:
1421
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
@@ -109,8 +116,7 @@ services:
109116
- 8112:8112
110117
- 8113:8113
111118
environment:
112-
<<: [*catalog-config]
113-
TEMPORAL_HOST_PORT: temporal:7233
119+
<<: [*catalog-config, *flow-worker-env]
114120
depends_on:
115121
temporal-admin-tools:
116122
condition: service_healthy
@@ -122,7 +128,7 @@ services:
122128
dockerfile: stacks/flow.Dockerfile
123129
target: flow-snapshot-worker
124130
environment:
125-
TEMPORAL_HOST_PORT: temporal:7233
131+
<<: [*flow-worker-env]
126132
depends_on:
127133
temporal-admin-tools:
128134
condition: service_healthy

docker-compose.yml

+8-10
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ x-catalog-config: &catalog-config
99

1010
x-flow-worker-env: &flow-worker-env
1111
TEMPORAL_HOST_PORT: temporal:7233
12+
TEMPORAL_CLIENT_CERT:
13+
TEMPORAL_CLIENT_KEY:
14+
PEERDB_TEMPORAL_NAMESPACE: default
1215
# For GCS, these will be your HMAC keys instead
1316
# For more information:
1417
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
@@ -21,9 +24,6 @@ x-flow-worker-env: &flow-worker-env
2124
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
2225
ENABLE_METRICS: "true"
2326

24-
x-peerdb-temporal-namespace: &peerdb-temporal-namespace
25-
PEERDB_TEMPORAL_NAMESPACE: default
26-
2727
services:
2828
catalog:
2929
container_name: catalog
@@ -100,8 +100,7 @@ services:
100100
- 8112:8112
101101
- 8113:8113
102102
environment:
103-
<<: [*catalog-config, *peerdb-temporal-namespace]
104-
TEMPORAL_HOST_PORT: temporal:7233
103+
<<: [*catalog-config, *flow-worker-env]
105104
depends_on:
106105
temporal-admin-tools:
107106
condition: service_healthy
@@ -110,8 +109,7 @@ services:
110109
container_name: flow-snapshot-worker
111110
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
112111
environment:
113-
<<: [*peerdb-temporal-namespace]
114-
TEMPORAL_HOST_PORT: temporal:7233
112+
<<: [*flow-worker-env]
115113
depends_on:
116114
temporal-admin-tools:
117115
condition: service_healthy
@@ -120,7 +118,7 @@ services:
120118
container_name: flow-worker1
121119
image: ghcr.io/peerdb-io/flow-worker:latest-dev
122120
environment:
123-
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
121+
<<: [*catalog-config, *flow-worker-env]
124122
METRICS_SERVER: 0.0.0.0:6061
125123
ports:
126124
- 6060:6060
@@ -133,7 +131,7 @@ services:
133131
container_name: flow-worker2
134132
image: ghcr.io/peerdb-io/flow-worker:latest-dev
135133
environment:
136-
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
134+
<<: [*catalog-config, *flow-worker-env]
137135
METRICS_SERVER: 0.0.0.0:6063
138136
ports:
139137
- 6062:6062
@@ -149,7 +147,7 @@ services:
149147
container_name: flow-worker3
150148
image: ghcr.io/peerdb-io/flow-worker:latest-dev
151149
environment:
152-
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
150+
<<: [*catalog-config, *flow-worker-env]
153151
METRICS_SERVER: 0.0.0.0:6065
154152
ports:
155153
- 6064:6064

flow/cmd/api.go

+18-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"crypto/tls"
56
"fmt"
67
"net"
78
"net/http"
@@ -26,6 +27,8 @@ type APIServerParams struct {
2627
GatewayPort uint
2728
TemporalHostPort string
2829
TemporalNamespace string
30+
TemporalCert string
31+
TemporalKey string
2932
}
3033

3134
// setupGRPCGatewayServer sets up the grpc-gateway mux
@@ -57,11 +60,23 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
5760

5861
func APIMain(args *APIServerParams) error {
5962
ctx := args.ctx
60-
61-
tc, err := client.Dial(client.Options{
63+
clientOptions := client.Options{
6264
HostPort: args.TemporalHostPort,
6365
Namespace: args.TemporalNamespace,
64-
})
66+
}
67+
if args.TemporalCert != "" && args.TemporalKey != "" {
68+
cert, err := tls.X509KeyPair([]byte(args.TemporalCert), []byte(args.TemporalKey))
69+
if err != nil {
70+
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
71+
}
72+
73+
connOptions := client.ConnectionOptions{
74+
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
75+
}
76+
clientOptions.ConnectionOptions = connOptions
77+
}
78+
79+
tc, err := client.Dial(clientOptions)
6580
if err != nil {
6681
return fmt.Errorf("unable to create Temporal client: %w", err)
6782
}

flow/cmd/main.go

+24
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,18 @@ func main() {
3030
EnvVars: []string{"TEMPORAL_HOST_PORT"},
3131
}
3232

33+
temporalCertFlag := cli.StringFlag{
34+
Name: "temporal-cert",
35+
Value: "", // default: no cert needed
36+
EnvVars: []string{"TEMPORAL_CLIENT_CERT"},
37+
}
38+
39+
temporalKeyFlag := cli.StringFlag{
40+
Name: "temporal-key",
41+
Value: "", // default: no key needed
42+
EnvVars: []string{"TEMPORAL_CLIENT_KEY"},
43+
}
44+
3345
profilingFlag := &cli.BoolFlag{
3446
Name: "enable-profiling",
3547
Value: false, // Default is off
@@ -79,6 +91,8 @@ func main() {
7991
PyroscopeServer: ctx.String("pyroscope-server-address"),
8092
MetricsServer: ctx.String("metrics-server"),
8193
TemporalNamespace: ctx.String("temporal-namespace"),
94+
TemporalCert: ctx.String("temporal-cert"),
95+
TemporalKey: ctx.String("temporal-key"),
8296
})
8397
},
8498
Flags: []cli.Flag{
@@ -88,6 +102,8 @@ func main() {
88102
pyroscopeServerFlag,
89103
metricsServerFlag,
90104
temporalNamespaceFlag,
105+
&temporalCertFlag,
106+
&temporalKeyFlag,
91107
},
92108
},
93109
{
@@ -97,11 +113,15 @@ func main() {
97113
return SnapshotWorkerMain(&SnapshotWorkerOptions{
98114
TemporalHostPort: temporalHostPort,
99115
TemporalNamespace: ctx.String("temporal-namespace"),
116+
TemporalCert: ctx.String("temporal-cert"),
117+
TemporalKey: ctx.String("temporal-key"),
100118
})
101119
},
102120
Flags: []cli.Flag{
103121
temporalHostPortFlag,
104122
temporalNamespaceFlag,
123+
&temporalCertFlag,
124+
&temporalKeyFlag,
105125
},
106126
},
107127
{
@@ -119,6 +139,8 @@ func main() {
119139
},
120140
temporalHostPortFlag,
121141
temporalNamespaceFlag,
142+
&temporalCertFlag,
143+
&temporalKeyFlag,
122144
},
123145
Action: func(ctx *cli.Context) error {
124146
temporalHostPort := ctx.String("temporal-host-port")
@@ -129,6 +151,8 @@ func main() {
129151
TemporalHostPort: temporalHostPort,
130152
GatewayPort: ctx.Uint("gateway-port"),
131153
TemporalNamespace: ctx.String("temporal-namespace"),
154+
TemporalCert: ctx.String("temporal-cert"),
155+
TemporalKey: ctx.String("temporal-key"),
132156
})
133157
},
134158
},

flow/cmd/snapshot_worker.go

+14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"crypto/tls"
45
"fmt"
56

67
"github.com/PeerDB-io/peer-flow/activities"
@@ -14,6 +15,8 @@ import (
1415
type SnapshotWorkerOptions struct {
1516
TemporalHostPort string
1617
TemporalNamespace string
18+
TemporalCert string
19+
TemporalKey string
1720
}
1821

1922
func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
@@ -22,6 +25,17 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
2225
Namespace: opts.TemporalNamespace,
2326
}
2427

28+
if opts.TemporalCert != "" && opts.TemporalKey != "" {
29+
cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey))
30+
if err != nil {
31+
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
32+
}
33+
34+
connOptions := client.ConnectionOptions{
35+
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
36+
}
37+
clientOptions.ConnectionOptions = connOptions
38+
}
2539
c, err := client.Dial(clientOptions)
2640
if err != nil {
2741
return fmt.Errorf("unable to create Temporal client: %w", err)

flow/cmd/worker.go

+17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"crypto/tls"
45
"fmt"
56
"os"
67
"os/signal"
@@ -31,6 +32,8 @@ type WorkerOptions struct {
3132
PyroscopeServer string
3233
MetricsServer string
3334
TemporalNamespace string
35+
TemporalCert string
36+
TemporalKey string
3437
}
3538

3639
func setupPyroscope(opts *WorkerOptions) {
@@ -95,6 +98,19 @@ func WorkerMain(opts *WorkerOptions) error {
9598
HostPort: opts.TemporalHostPort,
9699
Namespace: opts.TemporalNamespace,
97100
}
101+
102+
if opts.TemporalCert != "" && opts.TemporalKey != "" {
103+
cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey))
104+
if err != nil {
105+
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
106+
}
107+
108+
connOptions := client.ConnectionOptions{
109+
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
110+
}
111+
clientOptions.ConnectionOptions = connOptions
112+
}
113+
98114
if opts.EnableMetrics {
99115
clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope(
100116
prometheus.Configuration{
@@ -115,6 +131,7 @@ func WorkerMain(opts *WorkerOptions) error {
115131
if err != nil {
116132
return fmt.Errorf("unable to create Temporal client: %w", err)
117133
}
134+
log.Info("Created temporal client")
118135
defer c.Close()
119136

120137
w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{})

nexus/Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nexus/analyzer/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ anyhow = "1.0"
1010
async-trait = "0.1"
1111
catalog = { path = "../catalog" }
1212
flow-rs = { path = "../flow-rs" }
13-
lazy_static = "1.4"
1413
pem = "3.0"
1514
pt = { path = "../pt" }
1615
sqlparser = { path = "../sqlparser-rs", features = ["visitor"] }

0 commit comments

Comments
 (0)