Skip to content

Commit

Permalink
Revert "HeartbeatRoutine: return context.CancelFunc instead of chan s…
Browse files Browse the repository at this point in the history
…truct{} (#997)"

This reverts commit 081eb40.
  • Loading branch information
Amogh-Bharadwaj committed Jan 7, 2024
1 parent 19178f2 commit 56dfcb4
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 17 deletions.
29 changes: 23 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("pushing records for job - %s", jobName)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Expand Down Expand Up @@ -394,7 +397,9 @@ func (a *FlowableActivity) StartNormalize(
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer shutdown()
defer func() {
shutdown <- struct{}{}
}()

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
Expand Down Expand Up @@ -489,7 +494,10 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
Expand Down Expand Up @@ -627,7 +635,10 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
Expand Down Expand Up @@ -673,7 +684,10 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

err = dstConn.ConsolidateQRepPartitions(config)
if err != nil {
Expand Down Expand Up @@ -982,7 +996,10 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return "syncing xmin."
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,9 @@ func (s *QRepAvroSyncMethod) writeToStage(
objectFolder, stagingTable)
},
)
defer shutdown()
defer func() {
shutdown <- struct{}{}
}()

var avroFile *avro.AvroFile
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
numRecords, req.FlowJobName,
)
})
defer shutdown()
defer func() {
shutdown <- struct{}{}
}()

numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ func (p *PostgresCDCSource) consumeStream(
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer req.RecordStream.Close()
defer func() {
req.RecordStream.Close()
}()

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
Expand Down
7 changes: 5 additions & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName)

if !qe.testEnv {
shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
shutdownCh := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
return fmt.Sprintf("running '%s'", q)
})
defer shutdown()

defer func() {
shutdownCh <- struct{}{}
}()
}

rows, err := tx.Query(qe.ctx, q)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,10 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage
shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil {
return fmt.Errorf("failed to put file to stage: %w", err)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
written := numRows.Load()
return fmt.Sprintf("[avro] written %d rows to OCF", written)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()
}

for qRecordOrErr := range p.stream.Records {
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@ func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)
) chan<- struct{} {
shutdown := make(chan struct{})
go func() {
counter := 0
for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
RecordHeartbeatWithRecover(ctx, msg)
select {
case <-shutdown:
return
case <-ctx.Done():
return
case <-time.After(interval):
}
}
}()
return cancel
return shutdown
}

// if the functions are being called outside the context of a Temporal workflow,
Expand Down

0 comments on commit 56dfcb4

Please sign in to comment.