Skip to content

Commit

Permalink
Remove forced upper case for peerdb columns (#1706)
Browse files Browse the repository at this point in the history
Removes upper casing of user-provided peerdb column names in handler.go
for create cdc and qrep endpoints
Restrict custom names for these columns to just bigquery in UI
Functionally tested with soft delete
Added a custom name for these columns in a BQ test
  • Loading branch information
Amogh-Bharadwaj authored May 9, 2024
1 parent e2be383 commit e551e73
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 13 deletions.
10 changes: 1 addition & 9 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,10 @@ func (h *FlowRequestHandler) CreateCDCFlow(

if req.ConnectionConfigs.SoftDeleteColName == "" {
req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED"
} else {
// make them all uppercase
req.ConnectionConfigs.SoftDeleteColName = strings.ToUpper(req.ConnectionConfigs.SoftDeleteColName)
}

if req.ConnectionConfigs.SyncedAtColName == "" {
req.ConnectionConfigs.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName)
}

err := h.createCdcJobEntry(ctx, req, workflowID)
Expand Down Expand Up @@ -256,10 +250,8 @@ func (h *FlowRequestHandler) CreateQRepFlow(

if req.QrepConfig.SyncedAtColName == "" {
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
} else {
// make them all uppercase
cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName)
}

_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state)
if err != nil {
slog.Error("unable to start QRepFlow workflow",
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,8 +1110,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
Source: e2e.GeneratePostgresPeer(),
CdcStagingPath: connectionGen.CdcStagingPath,
SoftDelete: true,
SoftDeleteColName: "_PEERDB_IS_DELETED",
SyncedAtColName: "_PEERDB_SYNCED_AT",
SoftDeleteColName: "_custom_deleted",
SyncedAtColName: "_custom_synced",
MaxBatchSize: 100,
}

Expand Down Expand Up @@ -1141,7 +1141,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_softdel_iud", "id,c1,c2,t")
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "checking soft delete", func() bool {
newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
"SELECT COUNT(*) FROM `%s.%s` WHERE _custom_deleted",
s.bqHelper.Config.DatasetId, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
e2e.EnvNoError(s.t, env, err)
Expand Down
1 change: 0 additions & 1 deletion ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ export default function CDCConfigForm({
mirrorConfig.destination?.type !== DBType.POSTGRES) &&
label.includes('type system')) ||
(mirrorConfig.destination?.type !== DBType.BIGQUERY &&
mirrorConfig.destination?.type !== DBType.SNOWFLAKE &&
label.includes('column name'))
) {
return false;
Expand Down

0 comments on commit e551e73

Please sign in to comment.