Skip to content

Commit

Permalink
Merge branch 'main' into insert-soft-delete-same-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Nov 27, 2023
2 parents 0d24790 + 607ff04 commit 5b36068
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 11 deletions.
1 change: 1 addition & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ x-flow-worker-env: &flow-worker-env
TEMPORAL_HOST_PORT: temporal:7233
PEERDB_TEMPORAL_NAMESPACE: default
# For the below 2 cert and key variables,
# paste as base64 encoded strings.
# use yml multiline syntax with '|'
TEMPORAL_CLIENT_CERT:
TEMPORAL_CLIENT_KEY:
Expand Down
15 changes: 14 additions & 1 deletion flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package main
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"net"
"net/http"
"strings"
"time"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
Expand Down Expand Up @@ -90,7 +92,18 @@ func APIMain(args *APIServerParams) error {
Namespace: args.TemporalNamespace,
}
if args.TemporalCert != "" && args.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(args.TemporalCert), []byte(args.TemporalKey))
log.Info("Using temporal certificate/key for authentication")
certBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(args.TemporalCert))
if err != nil {
return fmt.Errorf("unable to decode temporal certificate: %w", err)
}

keyBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(args.TemporalKey))
if err != nil {
return fmt.Errorf("unable to decode temporal key: %w", err)
}

cert, err := tls.X509KeyPair(certBytes, keyBytes)
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
}
Expand Down
15 changes: 14 additions & 1 deletion flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"crypto/tls"
"encoding/base64"
"fmt"
"strings"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -26,7 +28,17 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey))
certBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(opts.TemporalCert))
if err != nil {
return fmt.Errorf("unable to decode temporal certificate: %w", err)
}

keyBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(opts.TemporalKey))
if err != nil {
return fmt.Errorf("unable to decode temporal key: %w", err)
}

cert, err := tls.X509KeyPair(certBytes, keyBytes)
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
}
Expand All @@ -36,6 +48,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
}
clientOptions.ConnectionOptions = connOptions
}

c, err := client.Dial(clientOptions)
if err != nil {
return fmt.Errorf("unable to create Temporal client: %w", err)
Expand Down
14 changes: 13 additions & 1 deletion flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"crypto/tls"
"encoding/base64"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -93,7 +94,18 @@ func WorkerMain(opts *WorkerOptions) error {
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
cert, err := tls.X509KeyPair([]byte(opts.TemporalCert), []byte(opts.TemporalKey))
log.Info("Using temporal certificate/key for authentication")
certBytes, err := base64.StdEncoding.DecodeString(opts.TemporalCert)
if err != nil {
return fmt.Errorf("unable to decode temporal certificate: %w", err)
}

keyBytes, err := base64.StdEncoding.DecodeString(opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to decode temporal key: %w", err)
}

cert, err := tls.X509KeyPair(certBytes, keyBytes)
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
}
Expand Down
13 changes: 12 additions & 1 deletion ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ export const handleCreateCDC = async (
setLoading(false);
};

const quotedWatermarkTable = (watermarkTable: string): string => {
if (watermarkTable.includes('.')) {
const [schema, table] = watermarkTable.split('.');
return `"${schema}"."${table}"`;
} else {
return `"${watermarkTable}"`;
}
};

export const handleCreateQRep = async (
flowJobName: string,
query: string,
Expand All @@ -210,7 +219,9 @@ export const handleCreateQRep = async (

if (xmin == true) {
config.watermarkColumn = 'xmin';
config.query = `SELECT * FROM ${config.watermarkTable} WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}`;
config.query = `SELECT * FROM ${quotedWatermarkTable(
config.watermarkTable
)} WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}`;
query = config.query;
config.initialCopyOnly = false;
}
Expand Down
33 changes: 26 additions & 7 deletions ui/app/mirrors/create/qrep/qrep.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,17 @@ export default function QRepConfigForm({
) => {
if (val) {
if (setting.label === 'Table') {
setter((curr) => ({ ...curr, destinationTableIdentifier: val }));
if (mirrorConfig.destinationPeer?.type === DBType.BIGQUERY) {
setter((curr) => ({
...curr,
destinationTableIdentifier: val.split('.')[1],
}));
} else {
setter((curr) => ({
...curr,
destinationTableIdentifier: val,
}));
}
loadColumnOptions(val);
}
handleChange(val, setting);
Expand All @@ -127,6 +137,20 @@ export default function QRepConfigForm({
);
}, [mirrorConfig.sourcePeer]);

useEffect(() => {
if (mirrorConfig.destinationPeer?.type === DBType.BIGQUERY) {
setter((curr) => ({
...curr,
destinationTableIdentifier: mirrorConfig.watermarkTable?.split('.')[1],
}));
} else {
setter((curr) => ({
...curr,
destinationTableIdentifier: mirrorConfig.watermarkTable,
}));
}
}, [mirrorConfig.destinationPeer, mirrorConfig.watermarkTable, setter]);

useEffect(() => {
// set defaults
setter((curr) => ({ ...curr, ...blankQRepSetting }));
Expand Down Expand Up @@ -255,12 +279,7 @@ export default function QRepConfigForm({
type={setting.type}
defaultValue={
setting.label === 'Destination Table Name'
? mirrorConfig.destinationPeer?.type ===
DBType.BIGQUERY
? mirrorConfig.destinationTableIdentifier?.split(
'.'
)[1]
: mirrorConfig.destinationTableIdentifier
? mirrorConfig.destinationTableIdentifier
: setting.default
}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
Expand Down

0 comments on commit 5b36068

Please sign in to comment.