From d15502855e3c5747b749082b9009d28bf27ee2e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 4 Jan 2024 16:52:27 +0000 Subject: [PATCH] flow: specify when channel is read-only or write-only (#987) Makes it clearer how a channel is intended to be used, & is an extra layer of static type checking Also have `HeartbeatRoutine` stop when `ctx.Done()` --- flow/connectors/utils/heartbeat.go | 11 ++++++----- flow/model/model.go | 2 +- flow/model/qrecord_stream.go | 8 ++++---- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index cfa17fa646..c1bc81f077 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -13,19 +13,20 @@ func HeartbeatRoutine( ctx context.Context, interval time.Duration, message func() string, -) chan struct{} { - counter := 1 +) chan<- struct{} { shutdown := make(chan struct{}) go func() { + counter := 0 for { + counter += 1 msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) RecordHeartbeatWithRecover(ctx, msg) - counter += 1 - to := time.After(interval) select { case <-shutdown: return - case <-to: + case <-ctx.Done(): + return + case <-time.After(interval): } } }() diff --git a/flow/model/model.go b/flow/model/model.go index fce3246585..10c04d599c 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -412,7 +412,7 @@ func (r *CDCRecordStream) Close() { r.lastCheckpointSet = true } -func (r *CDCRecordStream) GetRecords() chan Record { +func (r *CDCRecordStream) GetRecords() <-chan Record { return r.records } diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index af8346d633..a293e66ebb 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -20,13 +20,13 @@ type QRecordStream struct { } type RecordsToStreamRequest struct { - records chan Record + records <-chan Record TableMapping map[string]uint32 BatchID int64 } func NewRecordsToStreamRequest( - records chan Record, + records <-chan Record, tableMapping map[string]uint32, batchID int64, ) *RecordsToStreamRequest { @@ -37,7 +37,7 @@ func NewRecordsToStreamRequest( } } -func (r *RecordsToStreamRequest) GetRecords() chan Record { +func (r *RecordsToStreamRequest) GetRecords() <-chan Record { return r.records } @@ -80,6 +80,6 @@ func (s *QRecordStream) IsSchemaSet() bool { return s.schemaSet } -func (s *QRecordStream) SchemaChan() chan QRecordSchemaOrError { +func (s *QRecordStream) SchemaChan() <-chan QRecordSchemaOrError { return s.schema }