Skip to content

Commit

Permalink
Cleanup snapshot flow, fixing two issues (#1146)
Browse files Browse the repository at this point in the history
1. identifier quoting wasn't using QuoteIdentifier
2. mapping.Exclude wasn't being used to actually remove excluded columns
  • Loading branch information
serprex authored Jan 24, 2024
1 parent 4010599 commit e88f2ef
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log/slog"
"regexp"
"slices"
"strings"
"time"

Expand All @@ -13,6 +14,7 @@ import (
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peer-flow/concurrency"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -141,11 +143,13 @@ func (s *SnapshotFlowExecution) cloneTable(
if len(mapping.Exclude) != 0 {
for _, v := range s.tableNameSchemaMapping {
if v.TableIdentifier == srcName {
colNames := utils.TableSchemaColumnNames(v)
for i, colName := range colNames {
colNames[i] = fmt.Sprintf(`"%s"`, colName)
quotedColumns := make([]string, 0, len(v.ColumnNames))
for _, colName := range v.ColumnNames {
if !slices.Contains(mapping.Exclude, colName) {
quotedColumns = append(quotedColumns, connpostgres.QuoteIdentifier(colName))
}
}
from = strings.Join(colNames, ",")
from = strings.Join(quotedColumns, ",")
break
}
}
Expand Down Expand Up @@ -254,20 +258,15 @@ func SnapshotFlowWorkflow(ctx workflow.Context, config *protos.FlowConnectionCon
logger: logger,
}

numTablesInParallel := int(config.SnapshotNumTablesInParallel)
if numTablesInParallel <= 0 {
numTablesInParallel = 1
}

replCtx := ctx
numTablesInParallel := int(max(config.SnapshotNumTablesInParallel, 1))

if !config.DoInitialSnapshot {
_, err := se.setupReplication(replCtx)
_, err := se.setupReplication(ctx)
if err != nil {
return fmt.Errorf("failed to setup replication: %w", err)
}

if err := se.closeSlotKeepAlive(replCtx); err != nil {
if err := se.closeSlotKeepAlive(ctx); err != nil {
return fmt.Errorf("failed to close slot keep alive: %w", err)
}

Expand Down

0 comments on commit e88f2ef

Please sign in to comment.