Skip to content

Commit

Permalink
HeartbeatRoutine: return context.CancelFunc instead of chan struct{}
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 5, 2024
1 parent 2d6a68a commit 3c40596
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 54 deletions.
29 changes: 6 additions & 23 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("pushing records for job - %s", jobName)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Expand Down Expand Up @@ -397,9 +394,7 @@ func (a *FlowableActivity) StartNormalize(
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
Expand Down Expand Up @@ -494,10 +489,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
Expand Down Expand Up @@ -635,10 +627,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
Expand Down Expand Up @@ -684,10 +673,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

err = dstConn.ConsolidateQRepPartitions(config)
if err != nil {
Expand Down Expand Up @@ -996,10 +982,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return "syncing xmin."
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
objectFolder, stagingTable)
},
)
defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

var avroFile *avro.AvroFile
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
numRecords, req.FlowJobName,
)
})
defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ func (p *PostgresCDCSource) consumeStream(
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer func() {
req.RecordStream.Close()
}()
defer req.RecordStream.Close()

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
Expand Down
7 changes: 2 additions & 5 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,10 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName)

if !qe.testEnv {
shutdownCh := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
return fmt.Sprintf("running '%s'", q)
})

defer func() {
shutdownCh <- struct{}{}
}()
defer shutdown()
}

rows, err := tx.Query(qe.ctx, q)
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage
shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()

if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil {
return fmt.Errorf("failed to put file to stage: %w", err)
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
written := numRows.Load()
return fmt.Sprintf("[avro] written %d rows to OCF", written)
})

defer func() {
shutdown <- struct{}{}
}()
defer shutdown()
}

for qRecordOrErr := range p.stream.Records {
Expand Down
8 changes: 3 additions & 5 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,22 @@ func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) chan<- struct{} {
shutdown := make(chan struct{})
) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)
go func() {
counter := 0
for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
RecordHeartbeatWithRecover(ctx, msg)
select {
case <-shutdown:
return
case <-ctx.Done():
return
case <-time.After(interval):
}
}
}()
return shutdown
return cancel
}

// if the functions are being called outside the context of a Temporal workflow,
Expand Down

0 comments on commit 3c40596

Please sign in to comment.