From e88f2efb34f79d3353fa47554a52967fc4c221f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 24 Jan 2024 20:20:09 +0000 Subject: [PATCH] Cleanup snapshot flow, fixing two issues (#1146) 1. identifier quoting wasn't using QuoteIdentifier 2. mapping.Exclude wasn't being used to actually remove excluded columns --- flow/workflows/snapshot_flow.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index d38801b599..5b1035eb1c 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -4,6 +4,7 @@ import ( "fmt" "log/slog" "regexp" + "slices" "strings" "time" @@ -13,6 +14,7 @@ import ( "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/concurrency" + connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/shared" @@ -141,11 +143,13 @@ func (s *SnapshotFlowExecution) cloneTable( if len(mapping.Exclude) != 0 { for _, v := range s.tableNameSchemaMapping { if v.TableIdentifier == srcName { - colNames := utils.TableSchemaColumnNames(v) - for i, colName := range colNames { - colNames[i] = fmt.Sprintf(`"%s"`, colName) + quotedColumns := make([]string, 0, len(v.ColumnNames)) + for _, colName := range v.ColumnNames { + if !slices.Contains(mapping.Exclude, colName) { + quotedColumns = append(quotedColumns, connpostgres.QuoteIdentifier(colName)) + } } - from = strings.Join(colNames, ",") + from = strings.Join(quotedColumns, ",") break } } @@ -254,20 +258,15 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon logger: logger, } - numTablesInParallel := int(config.SnapshotNumTablesInParallel) - if numTablesInParallel <= 0 { - numTablesInParallel = 1 - } - - replCtx := ctx + numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1)) if !config.DoInitialSnapshot { - _, err := se.setupReplication(replCtx) + _, err := se.setupReplication(ctx) if err != nil { return fmt.Errorf("failed to setup replication: %w", err) } - if err := se.closeSlotKeepAlive(replCtx); err != nil { + if err := se.closeSlotKeepAlive(ctx); err != nil { return fmt.Errorf("failed to close slot keep alive: %w", err) }