Skip to content

Commit

Permalink
flow: specify when channel is read-only or write-only (#987)
Browse files Browse the repository at this point in the history
Makes it clearer how a channel is intended to be used,
& is an extra layer of static type checking

Also have `HeartbeatRoutine` stop when `ctx.Done()`
  • Loading branch information
serprex authored Jan 4, 2024
1 parent db621e1 commit d155028
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 10 deletions.
11 changes: 6 additions & 5 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@ func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) chan struct{} {
counter := 1
) chan<- struct{} {
shutdown := make(chan struct{})
go func() {
counter := 0
for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
RecordHeartbeatWithRecover(ctx, msg)
counter += 1
to := time.After(interval)
select {
case <-shutdown:
return
case <-to:
case <-ctx.Done():
return
case <-time.After(interval):
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (r *CDCRecordStream) Close() {
r.lastCheckpointSet = true
}

func (r *CDCRecordStream) GetRecords() chan Record {
func (r *CDCRecordStream) GetRecords() <-chan Record {
return r.records
}

Expand Down
8 changes: 4 additions & 4 deletions flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ type QRecordStream struct {
}

type RecordsToStreamRequest struct {
records chan Record
records <-chan Record
TableMapping map[string]uint32
BatchID int64
}

func NewRecordsToStreamRequest(
records chan Record,
records <-chan Record,
tableMapping map[string]uint32,
batchID int64,
) *RecordsToStreamRequest {
Expand All @@ -37,7 +37,7 @@ func NewRecordsToStreamRequest(
}
}

func (r *RecordsToStreamRequest) GetRecords() chan Record {
func (r *RecordsToStreamRequest) GetRecords() <-chan Record {
return r.records
}

Expand Down Expand Up @@ -80,6 +80,6 @@ func (s *QRecordStream) IsSchemaSet() bool {
return s.schemaSet
}

func (s *QRecordStream) SchemaChan() chan QRecordSchemaOrError {
func (s *QRecordStream) SchemaChan() <-chan QRecordSchemaOrError {
return s.schema
}

0 comments on commit d155028

Please sign in to comment.