Skip to content

Commit

Permalink
tests for mixed case + fixes/cleanup (#1212)
Browse files Browse the repository at this point in the history
1) Added test for mixed case column/pkey/table names in PG
2) BatchSize is now taken directly from config instead of from limits
which is derived from config, Limits is now empty so removed.
3) table name in addpub should be using `SchemaTable.String()` instead
of `QuoteIdentifier`

dynamic properties test isn't working in CI, will debug and raise it
separately
  • Loading branch information
heavycrystal authored Feb 7, 2024
1 parent 202ca2b commit 6309d62
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 302 deletions.
7 changes: 6 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})
defer shutdown()

batchSize := input.SyncFlowOptions.BatchSize
if batchSize <= 0 {
batchSize = 1_000_000
}

// start a goroutine to pull records from the source
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
Expand All @@ -235,7 +240,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: input.SyncFlowOptions.BatchSize,
MaxBatchSize: batchSize,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.SyncFlowOptions.IdleTimeoutSeconds),
),
Expand Down
10 changes: 0 additions & 10 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,6 @@ func (h *FlowRequestHandler) CreateCDCFlow(
},
}

maxBatchSize := cfg.MaxBatchSize
if maxBatchSize == 0 {
maxBatchSize = 1_000_000
}

limits := &peerflow.CDCFlowLimits{
MaxBatchSize: maxBatchSize,
}

if req.ConnectionConfigs.SoftDeleteColName == "" {
req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED"
} else {
Expand Down Expand Up @@ -178,7 +169,6 @@ func (h *FlowRequestHandler) CreateCDCFlow(
workflowOptions, // workflow start options
peerflow.CDCFlowWorkflowWithConfig, // workflow function
cfg, // workflow input
limits, // workflow limits
nil, // workflow state
)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,9 +1014,13 @@ func (c *PostgresConnector) AddTablesToPublication(req *protos.AddTablesToPublic
}
} else {
for _, additionalSrcTable := range additionalSrcTables {
_, err := c.conn.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s",
schemaTable, err := utils.ParseSchemaTable(additionalSrcTable)
if err != nil {
return err
}
_, err = c.conn.Exec(c.ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s",
utils.QuoteIdentifier(c.getDefaultPublicationName(req.FlowJobName)),
utils.QuoteIdentifier(additionalSrcTable)))
schemaTable.String()))
// don't error out if table is already added to our publication
if err != nil && !strings.Contains(err.Error(), "SQLSTATE 42710") {
return fmt.Errorf("failed to alter publication: %w", err)
Expand Down
Loading

0 comments on commit 6309d62

Please sign in to comment.