Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "HeartbeatRoutine: return context.CancelFunc instead of chan s… #1020

Merged
merged 1 commit into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("pushing records for job - %s", jobName)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Expand Down Expand Up @@ -394,7 +397,9 @@ func (a *FlowableActivity) StartNormalize(
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer shutdown()
defer func() {
shutdown <- struct{}{}
}()

slog.InfoContext(ctx, "initializing table schema...")
err = dstConn.InitializeTableSchema(input.FlowConnectionConfigs.TableNameSchemaMapping)
Expand Down Expand Up @@ -489,7 +494,10 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

partitions, err := srcConn.GetQRepPartitions(config, last)
if err != nil {
Expand Down Expand Up @@ -627,7 +635,10 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
Expand Down Expand Up @@ -673,7 +684,10 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

err = dstConn.ConsolidateQRepPartitions(config)
if err != nil {
Expand Down Expand Up @@ -982,7 +996,10 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
return "syncing xmin."
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,9 @@ func (s *QRepAvroSyncMethod) writeToStage(
objectFolder, stagingTable)
},
)
defer shutdown()
defer func() {
shutdown <- struct{}{}
}()

var avroFile *avro.AvroFile
ocfWriter := avro.NewPeerDBOCFWriter(s.connector.ctx, stream, avroSchema,
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
numRecords, req.FlowJobName,
)
})
defer shutdown()
defer func() {
shutdown <- struct{}{}
}()

numRecords, err = c.processBatch(req.FlowJobName, batch, maxParallelism)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ func (p *PostgresCDCSource) consumeStream(
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer req.RecordStream.Close()
defer func() {
req.RecordStream.Close()
}()

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
Expand Down
7 changes: 5 additions & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName)

if !qe.testEnv {
shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
shutdownCh := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
return fmt.Sprintf("running '%s'", q)
})
defer shutdown()

defer func() {
shutdownCh <- struct{}{}
}()
}

rows, err := tx.Query(qe.ctx, q)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,10 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage
shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()

if _, err := s.connector.database.ExecContext(s.connector.ctx, putCmd); err != nil {
return fmt.Errorf("failed to put file to stage: %w", err)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
written := numRows.Load()
return fmt.Sprintf("[avro] written %d rows to OCF", written)
})
defer shutdown()

defer func() {
shutdown <- struct{}{}
}()
}

for qRecordOrErr := range p.stream.Records {
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@ func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)
) chan<- struct{} {
shutdown := make(chan struct{})
go func() {
counter := 0
for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
RecordHeartbeatWithRecover(ctx, msg)
select {
case <-shutdown:
return
case <-ctx.Done():
return
case <-time.After(interval):
}
}
}()
return cancel
return shutdown
}

// if the functions are being called outside the context of a Temporal workflow,
Expand Down
Loading