diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index cfa17fa64..c1bc81f07 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -13,19 +13,20 @@ func HeartbeatRoutine( ctx context.Context, interval time.Duration, message func() string, -) chan struct{} { - counter := 1 +) chan<- struct{} { shutdown := make(chan struct{}) go func() { + counter := 0 for { + counter += 1 msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) RecordHeartbeatWithRecover(ctx, msg) - counter += 1 - to := time.After(interval) select { case <-shutdown: return - case <-to: + case <-ctx.Done(): + return + case <-time.After(interval): } } }() diff --git a/flow/model/model.go b/flow/model/model.go index fce324658..10c04d599 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -412,7 +412,7 @@ func (r *CDCRecordStream) Close() { r.lastCheckpointSet = true } -func (r *CDCRecordStream) GetRecords() chan Record { +func (r *CDCRecordStream) GetRecords() <-chan Record { return r.records } diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index af8346d63..a293e66eb 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -20,13 +20,13 @@ type QRecordStream struct { } type RecordsToStreamRequest struct { - records chan Record + records <-chan Record TableMapping map[string]uint32 BatchID int64 } func NewRecordsToStreamRequest( - records chan Record, + records <-chan Record, tableMapping map[string]uint32, batchID int64, ) *RecordsToStreamRequest { @@ -37,7 +37,7 @@ func NewRecordsToStreamRequest( } } -func (r *RecordsToStreamRequest) GetRecords() chan Record { +func (r *RecordsToStreamRequest) GetRecords() <-chan Record { return r.records } @@ -80,6 +80,6 @@ func (s *QRecordStream) IsSchemaSet() bool { return s.schemaSet } -func (s *QRecordStream) SchemaChan() chan QRecordSchemaOrError { +func (s *QRecordStream) SchemaChan() <-chan QRecordSchemaOrError { return s.schema }