From 91f7687d84b72b74f15fb8c09e4b00d664c65f18 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 27 Nov 2023 23:18:35 +0530 Subject: [PATCH 1/2] Quote watermark table in QRep UI (#723) Co-authored-by: Kaushik Iska --- ui/app/mirrors/create/handlers.ts | 13 +++++++++++- ui/app/mirrors/create/qrep/qrep.tsx | 33 +++++++++++++++++++++++------ 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 3f90b37297..f65ecc8fd8 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -187,6 +187,15 @@ export const handleCreateCDC = async ( setLoading(false); }; +const quotedWatermarkTable = (watermarkTable: string): string => { + if (watermarkTable.includes('.')) { + const [schema, table] = watermarkTable.split('.'); + return `"${schema}"."${table}"`; + } else { + return `"${watermarkTable}"`; + } +}; + export const handleCreateQRep = async ( flowJobName: string, query: string, @@ -210,7 +219,9 @@ export const handleCreateQRep = async ( if (xmin == true) { config.watermarkColumn = 'xmin'; - config.query = `SELECT * FROM ${config.watermarkTable} WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}`; + config.query = `SELECT * FROM ${quotedWatermarkTable( + config.watermarkTable + )} WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}`; query = config.query; config.initialCopyOnly = false; } diff --git a/ui/app/mirrors/create/qrep/qrep.tsx b/ui/app/mirrors/create/qrep/qrep.tsx index 993e7bc1aa..6b5b7b4b35 100644 --- a/ui/app/mirrors/create/qrep/qrep.tsx +++ b/ui/app/mirrors/create/qrep/qrep.tsx @@ -114,7 +114,17 @@ export default function QRepConfigForm({ ) => { if (val) { if (setting.label === 'Table') { - setter((curr) => ({ ...curr, destinationTableIdentifier: val })); + if (mirrorConfig.destinationPeer?.type === DBType.BIGQUERY) { + setter((curr) => ({ + ...curr, + destinationTableIdentifier: val.split('.')[1], + })); + } else { + setter((curr) => ({ + ...curr, + destinationTableIdentifier: val, + })); + } loadColumnOptions(val); } handleChange(val, setting); @@ -127,6 +137,20 @@ export default function QRepConfigForm({ ); }, [mirrorConfig.sourcePeer]); + useEffect(() => { + if (mirrorConfig.destinationPeer?.type === DBType.BIGQUERY) { + setter((curr) => ({ + ...curr, + destinationTableIdentifier: mirrorConfig.watermarkTable?.split('.')[1], + })); + } else { + setter((curr) => ({ + ...curr, + destinationTableIdentifier: mirrorConfig.watermarkTable, + })); + } + }, [mirrorConfig.destinationPeer, mirrorConfig.watermarkTable, setter]); + useEffect(() => { // set defaults setter((curr) => ({ ...curr, ...blankQRepSetting })); @@ -255,12 +279,7 @@ export default function QRepConfigForm({ type={setting.type} defaultValue={ setting.label === 'Destination Table Name' - ? mirrorConfig.destinationPeer?.type === - DBType.BIGQUERY - ? mirrorConfig.destinationTableIdentifier?.split( - '.' - )[1] - : mirrorConfig.destinationTableIdentifier + ? mirrorConfig.destinationTableIdentifier : setting.default } onChange={(e: React.ChangeEvent) => From 607ff045af09a1705905d64f90893ef8deee3477 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 27 Nov 2023 23:48:54 +0530 Subject: [PATCH 2/2] Reads cert and key as base64 for Temporal Cloud (#725) --- docker-compose-dev.yml | 1 + flow/cmd/api.go | 15 ++++++++++++++- flow/cmd/snapshot_worker.go | 15 ++++++++++++++- flow/cmd/worker.go | 14 +++++++++++++- 4 files changed, 42 insertions(+), 3 deletions(-) diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index da114713c7..2963b3ca57 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -13,6 +13,7 @@ x-flow-worker-env: &flow-worker-env TEMPORAL_HOST_PORT: temporal:7233 PEERDB_TEMPORAL_NAMESPACE: default # For the below 2 cert and key variables, + # paste as base64 encoded strings. # use yml multiline syntax with '|' TEMPORAL_CLIENT_CERT: TEMPORAL_CLIENT_KEY: diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 67273be8a7..21ad34f27b 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -3,9 +3,11 @@ package main import ( "context" "crypto/tls" + "encoding/base64" "fmt" "net" "net/http" + "strings" "time" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" @@ -90,7 +92,18 @@ func APIMain(args *APIServerParams) error { Namespace: args.TemporalNamespace, } if args.TemporalCert != "" && args.TemporalKey != "" { - cert, err := tls.X509KeyPair([]byte(args.TemporalCert), []byte(args.TemporalKey)) + log.Info("Using temporal certificate/key for authentication") + certBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(args.TemporalCert)) + if err != nil { + return fmt.Errorf("unable to decode temporal certificate: %w", err) + } + + keyBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(args.TemporalKey)) + if err != nil { + return fmt.Errorf("unable to decode temporal key: %w", err) + } + + cert, err := tls.X509KeyPair(certBytes, keyBytes) if err != nil { return fmt.Errorf("unable to obtain temporal key pair: %w", err) } diff --git a/flow/cmd/snapshot_worker.go b/flow/cmd/snapshot_worker.go index 5b5ea65b3c..ebd7609c05 100644 --- a/flow/cmd/snapshot_worker.go +++ b/flow/cmd/snapshot_worker.go @@ -2,7 +2,9 @@ package main import ( "crypto/tls" + "encoding/base64" "fmt" + "strings" "github.com/PeerDB-io/peer-flow/activities" "github.com/PeerDB-io/peer-flow/shared" @@ -26,7 +28,17 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { } if opts.TemporalCert != "" && opts.TemporalKey != "" { - cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey)) + certBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(opts.TemporalCert)) + if err != nil { + return fmt.Errorf("unable to decode temporal certificate: %w", err) + } + + keyBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(opts.TemporalKey)) + if err != nil { + return fmt.Errorf("unable to decode temporal key: %w", err) + } + + cert, err := tls.X509KeyPair(certBytes, keyBytes) if err != nil { return fmt.Errorf("unable to obtain temporal key pair: %w", err) } @@ -36,6 +48,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error { } clientOptions.ConnectionOptions = connOptions } + c, err := client.Dial(clientOptions) if err != nil { return fmt.Errorf("unable to create Temporal client: %w", err) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 240af31387..da6c03f122 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -2,6 +2,7 @@ package main import ( "crypto/tls" + "encoding/base64" "fmt" "os" "os/signal" @@ -93,7 +94,18 @@ func WorkerMain(opts *WorkerOptions) error { } if opts.TemporalCert != "" && opts.TemporalKey != "" { - cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey)) + log.Info("Using temporal certificate/key for authentication") + certBytes, err := base64.StdEncoding.DecodeString(opts.TemporalCert) + if err != nil { + return fmt.Errorf("unable to decode temporal certificate: %w", err) + } + + keyBytes, err := base64.StdEncoding.DecodeString(opts.TemporalKey) + if err != nil { + return fmt.Errorf("unable to decode temporal key: %w", err) + } + + cert, err := tls.X509KeyPair(certBytes, keyBytes) if err != nil { return fmt.Errorf("unable to obtain temporal key pair: %w", err) }