From 825dc3d0a0523e9558f663bcd8f514263375e8ae Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 12 Dec 2023 23:41:26 +0530 Subject: [PATCH 1/2] adding IF NOT EXISTS for pg and bq (#808) --- flow/connectors/bigquery/bigquery.go | 3 ++- flow/connectors/postgres/postgres.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index f1ad468c5b..aa2e7750b5 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -237,7 +237,8 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(flowJobName string, } for _, addedColumn := range schemaDelta.AddedColumns { - _, err := c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN `%s` %s", c.datasetID, + _, err := c.client.Query(fmt.Sprintf( + "ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS `%s` %s", c.datasetID, schemaDelta.DstTableName, addedColumn.ColumnName, qValueKindToBigQueryType(addedColumn.ColumnType))).Read(c.ctx) if err != nil { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 2bf7fb7d04..2858841a0d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -712,7 +712,8 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string, } for _, addedColumn := range schemaDelta.AddedColumns { - _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN \"%s\" %s", + _, err = tableSchemaModifyTx.Exec(c.ctx, fmt.Sprintf( + "ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s", schemaDelta.DstTableName, addedColumn.ColumnName, qValueKindToPostgresType(addedColumn.ColumnType))) if err != nil { From f29deb480e9187c36aec3ab3fa8e868287aeea43 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Tue, 12 Dec 2023 13:33:09 -0500 Subject: [PATCH 2/2] Fetch clone status always (#809) --- flow/cmd/mirror_status.go | 31 +++++++++---------- .../utils/cdc_records/cdc_records_storage.go | 4 +-- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 73b2b444a4..82d00185c1 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -64,26 +64,25 @@ func (h *FlowRequestHandler) CDCFlowStatus( } var initialCopyStatus *protos.SnapshotStatus - if config.DoInitialCopy { - cloneJobNames, err := h.getCloneTableFlowNames(ctx, req.FlowJobName) + + cloneJobNames, err := h.getCloneTableFlowNames(ctx, req.FlowJobName) + if err != nil { + return nil, err + } + + cloneStatuses := []*protos.QRepMirrorStatus{} + for _, cloneJobName := range cloneJobNames { + cloneStatus, err := h.QRepFlowStatus(ctx, &protos.MirrorStatusRequest{ + FlowJobName: cloneJobName, + }) if err != nil { return nil, err } + cloneStatuses = append(cloneStatuses, cloneStatus) + } - cloneStatuses := []*protos.QRepMirrorStatus{} - for _, cloneJobName := range cloneJobNames { - cloneStatus, err := h.QRepFlowStatus(ctx, &protos.MirrorStatusRequest{ - FlowJobName: cloneJobName, - }) - if err != nil { - return nil, err - } - cloneStatuses = append(cloneStatuses, cloneStatus) - } - - initialCopyStatus = &protos.SnapshotStatus{ - Clones: cloneStatuses, - } + initialCopyStatus = &protos.SnapshotStatus{ + Clones: cloneStatuses, } return &protos.CDCMirrorStatus{ diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index 7c87f2a99a..458c5707e0 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -18,9 +18,9 @@ import ( const ( /** begin with in-memory store, and then switch to Pebble DB - when the number of stored records crosses 1M + when the number of stored records crosses 100k **/ - defaultNumRecordsSwitchThreshold = 1_000_000 + defaultNumRecordsSwitchThreshold = 1_00_000 ) func encVal(val any) ([]byte, error) {