From 8aaa4c7021fac72f76f490692105354f882c8a74 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 8 Jan 2024 04:39:14 +0530 Subject: [PATCH] =?UTF-8?q?Revert=20"HeartbeatRoutine:=20return=20context.?= =?UTF-8?q?CancelFunc=20instead=20of=20chan=20s=E2=80=A6=20(#1020)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reverts the change made in #997 This reverts commit 081eb4065c551570498c193c96f2a10b890d8267. --- flow/activities/flowable.go | 29 +++++++++++++++---- flow/connectors/bigquery/qrep_avro_sync.go | 4 ++- flow/connectors/eventhub/eventhub.go | 4 ++- flow/connectors/postgres/cdc.go | 5 +++- flow/connectors/postgres/postgres.go | 4 ++- .../postgres/qrep_query_executor.go | 7 +++-- flow/connectors/snowflake/qrep_avro_sync.go | 5 +++- flow/connectors/utils/avro/avro_writer.go | 5 +++- flow/connectors/utils/heartbeat.go | 8 +++-- 9 files changed, 54 insertions(+), 17 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 25693f5ee1..6ee858c028 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -286,7 +286,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, jobName := input.FlowConnectionConfigs.FlowJobName return fmt.Sprintf("pushing records for job - %s", jobName) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() syncStartTime := time.Now() res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{ @@ -394,7 +397,9 @@ func (a *FlowableActivity) StartNormalize( shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) }) - defer shutdown() + defer func() { + shutdown <- struct{}{} + }() slog.InfoContext(ctx, "initializing table schema...") err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping) @@ -489,7 +494,10 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() partitions, err := srcConn.GetQRepPartitions(config, last) if err != nil { @@ -627,7 +635,10 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { @@ -673,7 +684,10 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string { return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() err = dstConn.ConsolidateQRepPartitions(config) if err != nil { @@ -982,7 +996,10 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string { return "syncing xmin." }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream) if err != nil { diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d6df8fdb6e..ceb3b38402 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -373,7 +373,9 @@ func (s *QRepAvroSyncMethod) writeToStage( objectFolder, stagingTable) }, ) - defer shutdown() + defer func() { + shutdown <- struct{}{} + }() var avroFile *avro.AvroFile ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema, diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 24a37eaf91..1bb7b00166 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -216,7 +216,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S numRecords, req.FlowJobName, ) }) - defer shutdown() + defer func() { + shutdown <- struct{}{} + }() numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism) if err != nil { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index af2b483de1..fcb3e64174 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -249,7 +249,10 @@ func (p *PostgresCDCSource) consumeStream( currRecords := cdcRecordsStorage.Len() return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 7fef9d26c2..43a11a2e72 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -196,7 +196,9 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro // PullRecords pulls records from the source. func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error { - defer req.RecordStream.Close() + defer func() { + req.RecordStream.Close() + }() // Slotname would be the job name prefixed with "peerflow_slot_" slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName) diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 839845d284..bb07fbb98f 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -83,10 +83,13 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName) if !qe.testEnv { - shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string { + shutdownCh := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string { return fmt.Sprintf("running '%s'", q) }) - defer shutdown() + + defer func() { + shutdownCh <- struct{}{} + }() } rows, err := tx.Query(qe.ctx, q) diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 83521088d8..07eb791c5c 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -282,7 +282,10 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string { return fmt.Sprintf("putting file to stage %s", stage) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil { return fmt.Errorf("failed to put file to stage: %w", err) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 1e6f318713..90c016b404 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -136,7 +136,10 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( written := numRows.Load() return fmt.Sprintf("[avro] written %d rows to OCF", written) }) - defer shutdown() + + defer func() { + shutdown <- struct{}{} + }() } for qRecordOrErr := range p.stream.Records { diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 37f00bc72f..c1bc81f077 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -13,8 +13,8 @@ func HeartbeatRoutine( ctx context.Context, interval time.Duration, message func() string, -) context.CancelFunc { - ctx, cancel := context.WithCancel(ctx) +) chan<- struct{} { + shutdown := make(chan struct{}) go func() { counter := 0 for { @@ -22,13 +22,15 @@ func HeartbeatRoutine( msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) RecordHeartbeatWithRecover(ctx, msg) select { + case <-shutdown: + return case <-ctx.Done(): return case <-time.After(interval): } } }() - return cancel + return shutdown } // if the functions are being called outside the context of a Temporal workflow,