From 19178f21554c150d4cf278855500dbf14d11d209 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Mon, 8 Jan 2024 04:07:46 +0530 Subject: [PATCH 1/4] fix update statements to split unchangedToastCols properly (#1016) --- flow/connectors/bigquery/merge_statement_generator.go | 2 +- flow/connectors/bigquery/merge_stmt_generator_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 47dd2b3936..3582f5a2f6 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -204,7 +204,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements( updateStmts := make([]string, 0, len(unchangedToastCols)) for _, cols := range unchangedToastCols { - unchangedColsArray := strings.Split(cols, ", ") + unchangedColsArray := strings.Split(cols, ",") otherCols := utils.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)) for _, colName := range otherCols { diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index 0818caed56..141b3999b7 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -17,7 +17,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { }, } allCols := []string{"col1", "col2", "col3"} - unchangedToastCols := []string{"", "col2, col3", "col2", "col3"} + unchangedToastCols := []string{"", "col2,col3", "col2", "col3"} expected := []string{ "WHEN MATCHED AND _rt!=2 AND _ut=''" + From 8aaa4c7021fac72f76f490692105354f882c8a74 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 8 Jan 2024 04:39:14 +0530 Subject: [PATCH 2/4] =?UTF-8?q?Revert=20"HeartbeatRoutine:=20return=20cont?= =?UTF-8?q?ext.CancelFunc=20instead=20of=20chan=20s=E2=80=A6=20(#1020)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reverts the change made in #997 This reverts commit 081eb4065c551570498c193c96f2a10b890d8267. --- flow/activities/flowable.go | 29 +++++++++++++++---- flow/connectors/bigquery/qrep_avro_sync.go | 4 ++- flow/connectors/eventhub/eventhub.go | 4 ++- flow/connectors/postgres/cdc.go | 5 +++- flow/connectors/postgres/postgres.go | 4 ++- .../postgres/qrep_query_executor.go | 7 +++-- flow/connectors/snowflake/qrep_avro_sync.go | 5 +++- flow/connectors/utils/avro/avro_writer.go | 5 +++- flow/connectors/utils/heartbeat.go | 8 +++-- 9 files changed, 54 insertions(+), 17 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 25693f5ee1..6ee858c028 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -286,7 +286,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, jobName := input.FlowConnectionConfigs.FlowJobName return fmt.Sprintf("pushing records for job - %s", jobName) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() syncStartTime := time.Now() res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{ @@ -394,7 +397,9 @@ func (a *FlowableActivity) StartNormalize( shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) }) - defer shutdown() + defer func() { + shutdown <- struct{}{} + }() slog.InfoContext(ctx, "initializing table schema...") err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping) @@ -489,7 +494,10 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() partitions, err := srcConn.GetQRepPartitions(config, last) if err != nil { @@ -627,7 +635,10 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { @@ -673,7 +684,10 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() err = dstConn.ConsolidateQRepPartitions(config) if err != nil { @@ -982,7 +996,10 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string { return "syncing xmin." }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d6df8fdb6e..ceb3b38402 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -373,7 +373,9 @@ func (s *QRepAvroSyncMethod) writeToStage( objectFolder, stagingTable) }, ) - defer shutdown() + defer func() { + shutdown <- struct{}{} + }() var avroFile *avro.AvroFile ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 24a37eaf91..1bb7b00166 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -216,7 +216,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S numRecords, req.FlowJobName, ) }) - defer shutdown() + defer func() { + shutdown <- struct{}{} + }() numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) if err != nil { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index af2b483de1..fcb3e64174 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -249,7 +249,10 @@ func (p *PostgresCDCSource) consumeStream( currRecords := cdcRecordsStorage.Len() return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 7fef9d26c2..43a11a2e72 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -196,7 +196,9 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro // PullRecords pulls records from the source. func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error { - defer req.RecordStream.Close() + defer func() { + req.RecordStream.Close() + }() // Slotname would be the job name prefixed with "peerflow_slot_" slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 839845d284..bb07fbb98f 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -83,10 +83,13 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName) if !qe.testEnv { - shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string { + shutdownCh := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string { return fmt.Sprintf("running '%s'", q) }) - defer shutdown() + + defer func() { + shutdownCh <- struct{}{} + }() } rows, err := tx.Query(qe.ctx, q) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 83521088d8..07eb791c5c 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -282,7 +282,10 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { return fmt.Sprintf("putting file to stage %s", stage) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil { return fmt.Errorf("failed to put file to stage: %w", err) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 1e6f318713..90c016b404 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -136,7 +136,10 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( written := numRows.Load() return fmt.Sprintf("[avro] written %d rows to OCF", written) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() } for qRecordOrErr := range p.stream.Records { diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 37f00bc72f..c1bc81f077 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -13,8 +13,8 @@ func HeartbeatRoutine( ctx context.Context, interval time.Duration, message func() string, -) context.CancelFunc { - ctx, cancel := context.WithCancel(ctx) +) chan<- struct{} { + shutdown := make(chan struct{}) go func() { counter := 0 for { @@ -22,13 +22,15 @@ func HeartbeatRoutine( msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) RecordHeartbeatWithRecover(ctx, msg) select { + case <-shutdown: + return case <-ctx.Done(): return case <-time.After(interval): } } }() - return cancel + return shutdown } // if the functions are being called outside the context of a Temporal workflow, From ef2ed5709dfd066932aed2633627435f09466dd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 8 Jan 2024 14:17:42 +0000 Subject: [PATCH 3/4] HeartbeatRoutine: return cancel func instead of chan struct{} (#1021) #1020 reverted #997 Reimplement so only func declaration moves into HeartbeatRoutine --- flow/activities/flowable.go | 29 ++++--------------- flow/connectors/bigquery/qrep_avro_sync.go | 4 +-- flow/connectors/eventhub/eventhub.go | 4 +-- flow/connectors/postgres/cdc.go | 5 +--- .../postgres/qrep_query_executor.go | 7 ++--- flow/connectors/snowflake/qrep_avro_sync.go | 5 +--- flow/connectors/utils/avro/avro_writer.go | 5 +--- flow/connectors/utils/heartbeat.go | 4 +-- 8 files changed, 15 insertions(+), 48 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6ee858c028..25693f5ee1 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -286,10 +286,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, jobName := input.FlowConnectionConfigs.FlowJobName return fmt.Sprintf("pushing records for job - %s", jobName) }) - - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() syncStartTime := time.Now() res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{ @@ -397,9 +394,7 @@ func (a *FlowableActivity) StartNormalize( shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) }) - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() slog.InfoContext(ctx, "initializing table schema...") err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping) @@ -494,10 +489,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName) }) - - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() partitions, err := srcConn.GetQRepPartitions(config, last) if err != nil { @@ -635,10 +627,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) - - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { @@ -684,10 +673,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName) }) - - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() err = dstConn.ConsolidateQRepPartitions(config) if err != nil { @@ -996,10 +982,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string { return "syncing xmin." }) - - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index ceb3b38402..d6df8fdb6e 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -373,9 +373,7 @@ func (s *QRepAvroSyncMethod) writeToStage( objectFolder, stagingTable) }, ) - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() var avroFile *avro.AvroFile ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 1bb7b00166..24a37eaf91 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -216,9 +216,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S numRecords, req.FlowJobName, ) }) - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) if err != nil { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index fcb3e64174..af2b483de1 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -249,10 +249,7 @@ func (p *PostgresCDCSource) consumeStream( currRecords := cdcRecordsStorage.Len() return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords) }) - - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index bb07fbb98f..839845d284 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -83,13 +83,10 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName) if !qe.testEnv { - shutdownCh := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string { + shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string { return fmt.Sprintf("running '%s'", q) }) - - defer func() { - shutdownCh <- struct{}{} - }() + defer shutdown() } rows, err := tx.Query(qe.ctx, q) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 07eb791c5c..83521088d8 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -282,10 +282,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { return fmt.Sprintf("putting file to stage %s", stage) }) - - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil { return fmt.Errorf("failed to put file to stage: %w", err) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 90c016b404..1e6f318713 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -136,10 +136,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( written := numRows.Load() return fmt.Sprintf("[avro] written %d rows to OCF", written) }) - - defer func() { - shutdown <- struct{}{} - }() + defer shutdown() } for qRecordOrErr := range p.stream.Records { diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index c1bc81f077..270680ded1 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -13,7 +13,7 @@ func HeartbeatRoutine( ctx context.Context, interval time.Duration, message func() string, -) chan<- struct{} { +) func() { shutdown := make(chan struct{}) go func() { counter := 0 @@ -30,7 +30,7 @@ func HeartbeatRoutine( } } }() - return shutdown + return func() { shutdown <- struct{}{} } } // if the functions are being called outside the context of a Temporal workflow, From 311a65d5ebbd85e637d7cc288ab5100ef8ae42b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 8 Jan 2024 14:18:05 +0000 Subject: [PATCH 4/4] e2e/test_utils: don't use query state after error (#1018) --- flow/e2e/test_utils.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 88f4fb3486..1a2be43b63 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -141,9 +141,7 @@ func SetupCDCFlowStatusQuery(env *testsuite.TestWorkflowEnvironment, err = response.Get(&state) if err != nil { slog.Error(err.Error()) - } - - if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { + } else if state.CurrentFlowState == protos.FlowStatus_STATUS_RUNNING { break } } else { @@ -171,9 +169,7 @@ func NormalizeFlowCountQuery(env *testsuite.TestWorkflowEnvironment, err = response.Get(&state) if err != nil { slog.Error(err.Error()) - } - - if len(state.NormalizeFlowStatuses) >= minCount { + } else if len(state.NormalizeFlowStatuses) >= minCount { break } } else {