From 342433e8725d91cfdae1e54bcec286851ea3f9da Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 9 May 2024 16:44:33 +0530 Subject: [PATCH 1/2] remove forced upper case for peerdb columns --- flow/cmd/handler.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 22865042d6..41dd04b4e7 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -142,16 +142,10 @@ func (h *FlowRequestHandler) CreateCDCFlow( if req.ConnectionConfigs.SoftDeleteColName == "" { req.ConnectionConfigs.SoftDeleteColName = "_PEERDB_IS_DELETED" - } else { - // make them all uppercase - req.ConnectionConfigs.SoftDeleteColName = strings.ToUpper(req.ConnectionConfigs.SoftDeleteColName) } if req.ConnectionConfigs.SyncedAtColName == "" { req.ConnectionConfigs.SyncedAtColName = "_PEERDB_SYNCED_AT" - } else { - // make them all uppercase - req.ConnectionConfigs.SyncedAtColName = strings.ToUpper(req.ConnectionConfigs.SyncedAtColName) } err := h.createCdcJobEntry(ctx, req, workflowID) @@ -256,10 +250,8 @@ func (h *FlowRequestHandler) CreateQRepFlow( if req.QrepConfig.SyncedAtColName == "" { cfg.SyncedAtColName = "_PEERDB_SYNCED_AT" - } else { - // make them all uppercase - cfg.SyncedAtColName = strings.ToUpper(req.QrepConfig.SyncedAtColName) } + _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, state) if err != nil { slog.Error("unable to start QRepFlow workflow", From 69202915874759c2d05c9f761e77420390445c96 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Thu, 9 May 2024 17:21:57 +0530 Subject: [PATCH 2/2] add test for bq, restrict feature to bq --- flow/e2e/bigquery/peer_flow_bq_test.go | 6 +++--- ui/app/mirrors/create/cdc/cdc.tsx | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index da4f036b0a..303fa0c8ea 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1110,8 +1110,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { Source: e2e.GeneratePostgresPeer(), CdcStagingPath: connectionGen.CdcStagingPath, SoftDelete: true, - SoftDeleteColName: "_PEERDB_IS_DELETED", - SyncedAtColName: "_PEERDB_SYNCED_AT", + SoftDeleteColName: "_custom_deleted", + SyncedAtColName: "_custom_synced", MaxBatchSize: 100, } @@ -1141,7 +1141,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { e2e.EnvWaitForEqualTables(env, s, "normalizing tx", "test_softdel_iud", "id,c1,c2,t") e2e.EnvWaitFor(s.t, env, 3*time.Minute, "checking soft delete", func() bool { newerSyncedAtQuery := fmt.Sprintf( - "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", + "SELECT COUNT(*) FROM `%s.%s` WHERE _custom_deleted", s.bqHelper.Config.DatasetId, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) e2e.EnvNoError(s.t, env, err) diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 2ef76be5be..353d988d19 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -92,7 +92,6 @@ export default function CDCConfigForm({ mirrorConfig.destination?.type !== DBType.POSTGRES) && label.includes('type system')) || (mirrorConfig.destination?.type !== DBType.BIGQUERY && - mirrorConfig.destination?.type !== DBType.SNOWFLAKE && label.includes('column name')) ) { return false;