Skip to content

Commit

Permalink
Merge branch 'wait-for-at-least-one' into customer-mirage
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 21, 2023
2 parents 07ef226 + 39bdc9d commit cd390e1
Show file tree
Hide file tree
Showing 20 changed files with 714 additions and 733 deletions.
12 changes: 9 additions & 3 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ x-catalog-config: &catalog-config
PEERDB_CATALOG_DATABASE: postgres

x-flow-worker-env: &flow-worker-env
# For Temporal Cloud, this will look like:
# <yournamespace>.<id>.tmprl.cloud:7233
TEMPORAL_HOST_PORT: temporal:7233
PEERDB_TEMPORAL_NAMESPACE: default
# For the below 2 cert and key variables,
# use yml multiline syntax with '|'
TEMPORAL_CLIENT_CERT:
TEMPORAL_CLIENT_KEY:
# For GCS, these will be your HMAC keys instead
# For more information:
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
Expand Down Expand Up @@ -109,8 +116,7 @@ services:
- 8112:8112
- 8113:8113
environment:
<<: [*catalog-config]
TEMPORAL_HOST_PORT: temporal:7233
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -122,7 +128,7 @@ services:
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
TEMPORAL_HOST_PORT: temporal:7233
<<: [*flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
18 changes: 8 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ x-catalog-config: &catalog-config

x-flow-worker-env: &flow-worker-env
TEMPORAL_HOST_PORT: temporal:7233
TEMPORAL_CLIENT_CERT:
TEMPORAL_CLIENT_KEY:
PEERDB_TEMPORAL_NAMESPACE: default
# For GCS, these will be your HMAC keys instead
# For more information:
# https://cloud.google.com/storage/docs/authentication/managing-hmackeys
Expand All @@ -21,9 +24,6 @@ x-flow-worker-env: &flow-worker-env
# enables exporting of mirror metrics to Prometheus for visualization using Grafana
ENABLE_METRICS: "true"

x-peerdb-temporal-namespace: &peerdb-temporal-namespace
PEERDB_TEMPORAL_NAMESPACE: default

services:
catalog:
container_name: catalog
Expand Down Expand Up @@ -100,8 +100,7 @@ services:
- 8112:8112
- 8113:8113
environment:
<<: [*catalog-config, *peerdb-temporal-namespace]
TEMPORAL_HOST_PORT: temporal:7233
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -110,8 +109,7 @@ services:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
environment:
<<: [*peerdb-temporal-namespace]
TEMPORAL_HOST_PORT: temporal:7233
<<: [*flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand All @@ -120,7 +118,7 @@ services:
container_name: flow-worker1
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6061
ports:
- 6060:6060
Expand All @@ -133,7 +131,7 @@ services:
container_name: flow-worker2
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6063
ports:
- 6062:6062
Expand All @@ -149,7 +147,7 @@ services:
container_name: flow-worker3
image: ghcr.io/peerdb-io/flow-worker:latest-dev
environment:
<<: [*catalog-config, *flow-worker-env, *peerdb-temporal-namespace]
<<: [*catalog-config, *flow-worker-env]
METRICS_SERVER: 0.0.0.0:6065
ports:
- 6064:6064
Expand Down
21 changes: 18 additions & 3 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
Expand All @@ -26,6 +27,8 @@ type APIServerParams struct {
GatewayPort uint
TemporalHostPort string
TemporalNamespace string
TemporalCert string
TemporalKey string
}

// setupGRPCGatewayServer sets up the grpc-gateway mux
Expand Down Expand Up @@ -57,11 +60,23 @@ func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {

func APIMain(args *APIServerParams) error {
ctx := args.ctx

tc, err := client.Dial(client.Options{
clientOptions := client.Options{
HostPort: args.TemporalHostPort,
Namespace: args.TemporalNamespace,
})
}
if args.TemporalCert != "" && args.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(args.TemporalCert), []byte(args.TemporalKey))
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
}
clientOptions.ConnectionOptions = connOptions
}

tc, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
}
Expand Down
24 changes: 24 additions & 0 deletions flow/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ func main() {
EnvVars: []string{"TEMPORAL_HOST_PORT"},
}

temporalCertFlag := cli.StringFlag{
Name: "temporal-cert",
Value: "", // default: no cert needed
EnvVars: []string{"TEMPORAL_CLIENT_CERT"},
}

temporalKeyFlag := cli.StringFlag{
Name: "temporal-key",
Value: "", // default: no key needed
EnvVars: []string{"TEMPORAL_CLIENT_KEY"},
}

profilingFlag := &cli.BoolFlag{
Name: "enable-profiling",
Value: false, // Default is off
Expand Down Expand Up @@ -79,6 +91,8 @@ func main() {
PyroscopeServer: ctx.String("pyroscope-server-address"),
MetricsServer: ctx.String("metrics-server"),
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
})
},
Flags: []cli.Flag{
Expand All @@ -88,6 +102,8 @@ func main() {
pyroscopeServerFlag,
metricsServerFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
},
},
{
Expand All @@ -97,11 +113,15 @@ func main() {
return SnapshotWorkerMain(&SnapshotWorkerOptions{
TemporalHostPort: temporalHostPort,
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
})
},
Flags: []cli.Flag{
temporalHostPortFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
},
},
{
Expand All @@ -119,6 +139,8 @@ func main() {
},
temporalHostPortFlag,
temporalNamespaceFlag,
&temporalCertFlag,
&temporalKeyFlag,
},
Action: func(ctx *cli.Context) error {
temporalHostPort := ctx.String("temporal-host-port")
Expand All @@ -129,6 +151,8 @@ func main() {
TemporalHostPort: temporalHostPort,
GatewayPort: ctx.Uint("gateway-port"),
TemporalNamespace: ctx.String("temporal-namespace"),
TemporalCert: ctx.String("temporal-cert"),
TemporalKey: ctx.String("temporal-key"),
})
},
},
Expand Down
14 changes: 14 additions & 0 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"crypto/tls"
"fmt"

"github.com/PeerDB-io/peer-flow/activities"
Expand All @@ -14,6 +15,8 @@ import (
type SnapshotWorkerOptions struct {
TemporalHostPort string
TemporalNamespace string
TemporalCert string
TemporalKey string
}

func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
Expand All @@ -22,6 +25,17 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
Namespace: opts.TemporalNamespace,
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey))
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
}
clientOptions.ConnectionOptions = connOptions
}
c, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
Expand Down
17 changes: 17 additions & 0 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"crypto/tls"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -31,6 +32,8 @@ type WorkerOptions struct {
PyroscopeServer string
MetricsServer string
TemporalNamespace string
TemporalCert string
TemporalKey string
}

func setupPyroscope(opts *WorkerOptions) {
Expand Down Expand Up @@ -95,6 +98,19 @@ func WorkerMain(opts *WorkerOptions) error {
HostPort: opts.TemporalHostPort,
Namespace: opts.TemporalNamespace,
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey))
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
}
clientOptions.ConnectionOptions = connOptions
}

if opts.EnableMetrics {
clientOptions.MetricsHandler = sdktally.NewMetricsHandler(newPrometheusScope(
prometheus.Configuration{
Expand All @@ -115,6 +131,7 @@ func WorkerMain(opts *WorkerOptions) error {
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
}
log.Info("Created temporal client")
defer c.Close()

w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{})
Expand Down
19 changes: 10 additions & 9 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,6 @@ func (c *EventHubConnector) processBatch(
}

func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf("syncing records to eventhub with"+
" push parallelism %d and push batch size %d",
req.PushParallelism, req.PushBatchSize)
})
defer func() {
shutdown <- true
}()

maxParallelism := req.PushParallelism
if maxParallelism <= 0 {
maxParallelism = 10
Expand All @@ -211,6 +202,16 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
batch := req.Records
var numRecords uint32

shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords, req.FlowJobName,
)
})
defer func() {
shutdown <- true
}()

// if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true
// we kick off processBatch in a goroutine and return immediately.
// otherwise, we block until processBatch is done.
Expand Down
32 changes: 19 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ func (p *PostgresCDCSource) consumeStream(
consumedXLogPos := pglogrepl.LSN(0)
if clientXLogPos > 0 {
consumedXLogPos = clientXLogPos - 1

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}
}

var standByLastLogged time.Time
Expand Down Expand Up @@ -220,12 +226,6 @@ func (p *PostgresCDCSource) consumeStream(
pkmRequiresResponse := false
waitingForCommit := false

err := pglogrepl.SendStandbyStatusUpdate(p.ctx, conn,
pglogrepl.StandbyStatusUpdate{WALWritePosition: consumedXLogPos})
if err != nil {
return fmt.Errorf("[initial-flush] SendStandbyStatusUpdate failed: %w", err)
}

for {
if pkmRequiresResponse {
// Update XLogPos to the last processed position, we can only confirm
Expand Down Expand Up @@ -260,18 +260,24 @@ func (p *PostgresCDCSource) consumeStream(

// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if !p.commitLock && len(localRecords) > 0 {
log.Infof(
"[%s] Stand-by deadline exceeded, returning currently accumulated records - %d",
if len(localRecords) > 0 {
log.Infof("[%s] standby deadline reached, have %d records, will return at next commit",
req.FlowJobName,
len(localRecords),
)
return nil

if !p.commitLock {
// immediate return if we are not waiting for a commit
return nil
}

waitingForCommit = true
} else {
// we need to wait for next commit.
waitingForCommit = p.commitLock
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
log.Infof("[%s] standby deadline reached, no records accumulated, continuing to wait",
req.FlowJobName,
)
}
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}

var ctx context.Context
Expand Down
Loading

0 comments on commit cd390e1

Please sign in to comment.