Skip to content

Commit

Permalink
Merge branch 'main' into fix-walheartbeat-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 27, 2023
2 parents 8830d37 + 30152d0 commit 5d28b49
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 50 deletions.
17 changes: 3 additions & 14 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package main
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"net"
"net/http"
"strings"
"time"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
Expand Down Expand Up @@ -93,23 +91,14 @@ func APIMain(args *APIServerParams) error {
}
if args.TemporalCert != "" && args.TemporalKey != "" {
log.Info("Using temporal certificate/key for authentication")
certBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(args.TemporalCert))
if err != nil {
return fmt.Errorf("unable to decode temporal certificate: %w", err)
}

keyBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(args.TemporalKey))
if err != nil {
return fmt.Errorf("unable to decode temporal key: %w", err)
}

cert, err := tls.X509KeyPair(certBytes, keyBytes)
certs, err := Base64DecodeCertAndKey(args.TemporalCert, args.TemporalKey)
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
return fmt.Errorf("unable to process certificate and key: %w", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
TLS: &tls.Config{Certificates: certs},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
29 changes: 29 additions & 0 deletions flow/cmd/cert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"crypto/tls"
"encoding/base64"
"fmt"
"strings"
)

func Base64DecodeCertAndKey(cert string, key string) ([]tls.Certificate, error) {
temporalCert := strings.TrimSpace(cert)
certBytes, err := base64.StdEncoding.DecodeString(temporalCert)
if err != nil {
return nil, fmt.Errorf("unable to decode temporal certificate: %w", err)
}

temporalKey := strings.TrimSpace(key)
keyBytes, err := base64.StdEncoding.DecodeString(temporalKey)
if err != nil {
return nil, fmt.Errorf("unable to decode temporal key: %w", err)
}

keyPair, err := tls.X509KeyPair(certBytes, keyBytes)
if err != nil {
return nil, fmt.Errorf("unable to obtain temporal key pair: %w", err)
}

return []tls.Certificate{keyPair}, nil
}
18 changes: 3 additions & 15 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package main

import (
"crypto/tls"
"encoding/base64"
"fmt"
"strings"

"github.com/PeerDB-io/peer-flow/activities"
"github.com/PeerDB-io/peer-flow/shared"
Expand All @@ -28,23 +26,13 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) error {
}

if opts.TemporalCert != "" && opts.TemporalKey != "" {
certBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(opts.TemporalCert))
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to decode temporal certificate: %w", err)
}

keyBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(opts.TemporalKey))
if err != nil {
return fmt.Errorf("unable to decode temporal key: %w", err)
}

cert, err := tls.X509KeyPair(certBytes, keyBytes)
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
return fmt.Errorf("unable to process certificate and key: %w", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
TLS: &tls.Config{Certificates: certs},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
18 changes: 3 additions & 15 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"crypto/tls"
"encoding/base64"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -95,23 +94,12 @@ func WorkerMain(opts *WorkerOptions) error {

if opts.TemporalCert != "" && opts.TemporalKey != "" {
log.Info("Using temporal certificate/key for authentication")
certBytes, err := base64.StdEncoding.DecodeString(opts.TemporalCert)
certs, err := Base64DecodeCertAndKey(opts.TemporalCert, opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to decode temporal certificate: %w", err)
return fmt.Errorf("unable to process certificate and key: %w", err)
}

keyBytes, err := base64.StdEncoding.DecodeString(opts.TemporalKey)
if err != nil {
return fmt.Errorf("unable to decode temporal key: %w", err)
}

cert, err := tls.X509KeyPair(certBytes, keyBytes)
if err != nil {
return fmt.Errorf("unable to obtain temporal key pair: %w", err)
}

connOptions := client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{cert}},
TLS: &tls.Config{Certificates: certs},
}
clientOptions.ConnectionOptions = connOptions
}
Expand Down
18 changes: 12 additions & 6 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,27 +488,33 @@ func (c *PostgresConnector) generateFallbackStatements(destinationTableIdentifie
for columnName, genericColumnType := range normalizedTableSchema.Columns {
columnNames = append(columnNames, fmt.Sprintf("\"%s\"", columnName))
pgType := qValueKindToPostgresType(genericColumnType)
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("(_peerdb_data->>'%s')::%s AS \"%s\"",
columnName, pgType, columnName))
if strings.Contains(genericColumnType, "array") {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("ARRAY(SELECT * FROM JSON_ARRAY_ELEMENTS_TEXT((_peerdb_data->>'%s')::JSON))::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
} else {
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("(_peerdb_data->>'%s')::%s AS \"%s\"",
strings.Trim(columnName, "\""), pgType, columnName))
}
if slices.Contains(normalizedTableSchema.PrimaryKeyColumns, columnName) {
primaryKeyColumnCasts[columnName] = fmt.Sprintf("(_peerdb_data->>'%s')::%s", columnName, pgType)
}
}
flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",")
parsedDstTable, _ := utils.ParseSchemaTable(destinationTableIdentifier)

insertColumnsSQL := strings.TrimSuffix(strings.Join(columnNames, ","), ",")
updateColumnsSQLArray := make([]string, 0, len(normalizedTableSchema.Columns))
for columnName := range normalizedTableSchema.Columns {
updateColumnsSQLArray = append(updateColumnsSQLArray, fmt.Sprintf("%s=EXCLUDED.%s", columnName, columnName))
updateColumnsSQLArray = append(updateColumnsSQLArray, fmt.Sprintf(`"%s"=EXCLUDED."%s"`, columnName, columnName))
}
updateColumnsSQL := strings.TrimSuffix(strings.Join(updateColumnsSQLArray, ","), ",")
deleteWhereClauseArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
for columnName, columnCast := range primaryKeyColumnCasts {
deleteWhereClauseArray = append(deleteWhereClauseArray, fmt.Sprintf("%s.%s=%s AND ",
destinationTableIdentifier, columnName, columnCast))
deleteWhereClauseArray = append(deleteWhereClauseArray, fmt.Sprintf(`%s."%s"=%s AND `,
parsedDstTable.String(), columnName, columnCast))
}
deleteWhereClauseSQL := strings.TrimSuffix(strings.Join(deleteWhereClauseArray, ""), "AND ")
parsedDstTable, _ := utils.ParseSchemaTable(destinationTableIdentifier)

fallbackUpsertStatement := fmt.Sprintf(fallbackUpsertStatementSQL,
strings.TrimSuffix(strings.Join(maps.Values(primaryKeyColumnCasts), ","), ","), c.metadataSchema,
Expand Down

0 comments on commit 5d28b49

Please sign in to comment.