Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HeartbeatRoutine: return context.CancelFunc instead of chan struct{} #997

Merged
merged 1 commit into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("pushing records for job - %s", jobName)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Expand Down Expand Up @@ -397,9 +394,7 @@ func (a *FlowableActivity) StartNormalize(
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
Expand Down Expand Up @@ -494,10 +489,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
Expand Down Expand Up @@ -635,10 +627,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
Expand Down Expand Up @@ -684,10 +673,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

err = dstConn.ConsolidateQRepPartitions(config)
if err != nil {
Expand Down Expand Up @@ -996,10 +982,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return "syncing xmin."
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
objectFolder, stagingTable)
},
)
defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

var avroFile *avro.AvroFile
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
numRecords, req.FlowJobName,
)
})
defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ func (p *PostgresCDCSource) consumeStream(
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer func() {
req.RecordStream.Close()
}()
defer req.RecordStream.Close()

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
Expand Down
7 changes: 2 additions & 5 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,10 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName)

if !qe.testEnv {
shutdownCh := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
return fmt.Sprintf("running '%s'", q)
})

defer func() {
shutdownCh <- struct{}{}
}()
defer shutdown()
}

rows, err := tx.Query(qe.ctx, q)
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage
shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil {
return fmt.Errorf("failed to put file to stage: %w", err)
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
written := numRows.Load()
return fmt.Sprintf("[avro] written %d rows to OCF", written)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()
}

for qRecordOrErr := range p.stream.Records {
Expand Down
8 changes: 3 additions & 5 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,22 @@ func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) chan<- struct{} {
shutdown := make(chan struct{})
) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)
go func() {
counter := 0
for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
RecordHeartbeatWithRecover(ctx, msg)
select {
case <-shutdown:
return
case <-ctx.Done():
return
case <-time.After(interval):
}
}
}()
return shutdown
return cancel
}

// if the functions are being called outside the context of a Temporal workflow,
Expand Down
Loading