diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6b71e6b907..2fc36da694 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -266,7 +266,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() syncStartTime := time.Now() @@ -364,7 +364,7 @@ func (a *FlowableActivity) StartNormalize( return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() log.Info("initializing table schema...") @@ -443,7 +443,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() partitions, err := srcConn.GetQRepPartitions(config, last) @@ -574,7 +574,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() res, err := dstConn.SyncQRepRecords(config, partition, stream) @@ -618,7 +618,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() err = dstConn.ConsolidateQRepPartitions(config) @@ -919,7 +919,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() res, err := dstConn.SyncQRepRecords(config, partition, stream) diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index a490674273..98a0025d56 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -21,7 +21,7 @@ func (a *SnapshotActivity) CloseSlotKeepAlive(flowJobName string) error { } if s, ok := a.SnapshotConnections[flowJobName]; ok { - s.signal.CloneComplete <- true + s.signal.CloneComplete <- struct{}{} s.connector.Close() } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 5f395f3585..c31442ed2f 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -325,7 +325,7 @@ func (s *QRepAvroSyncMethod) writeToStage( }, ) defer func() { - shutdown <- true + shutdown <- struct{}{} }() var avroFile *avro.AvroFile diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 961fb979ff..896b7fd9a7 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -227,7 +227,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S ) }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() // if env var PEERDB_BETA_EVENTHUB_PUSH_ASYNC=true diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index fc0063c1ab..f0ef60f1b6 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -209,7 +209,7 @@ func (p *PostgresCDCSource) consumeStream( }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() standbyMessageTimeout := req.IdleTimeout diff --git a/flow/connectors/postgres/postgres_repl_test.go b/flow/connectors/postgres/postgres_repl_test.go index 7e10d9a154..d3831fee63 100644 --- a/flow/connectors/postgres/postgres_repl_test.go +++ b/flow/connectors/postgres/postgres_repl_test.go @@ -127,7 +127,7 @@ func (suite *PostgresReplicationSnapshotTestSuite) TestSimpleSlotCreation() { log.Infof("signaling clone complete for %s after waiting for 2 seconds", flowJobName) time.Sleep(2 * time.Second) - signal.CloneComplete <- true + signal.CloneComplete <- struct{}{} log.Infof("successfully setup replication for %s", flowJobName) } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 85c0fb2cce..c8ea1bcadd 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -83,7 +83,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc }) defer func() { - shutdownCh <- true + shutdownCh <- struct{}{} }() } diff --git a/flow/connectors/postgres/slot_signal.go b/flow/connectors/postgres/slot_signal.go index e557049831..9660575591 100644 --- a/flow/connectors/postgres/slot_signal.go +++ b/flow/connectors/postgres/slot_signal.go @@ -11,13 +11,13 @@ type SlotCreationResult struct { // 2. CloneComplete - which can be waited on to ensure that the clone has completed. type SlotSignal struct { SlotCreated chan *SlotCreationResult - CloneComplete chan bool + CloneComplete chan struct{} } // NewSlotSignal returns a new SlotSignal. func NewSlotSignal() *SlotSignal { return &SlotSignal{ SlotCreated: make(chan *SlotCreationResult, 1), - CloneComplete: make(chan bool, 1), + CloneComplete: make(chan struct{}, 1), } } diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 920e01d1ca..59259e3a58 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -318,7 +318,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() if _, err := s.connector.database.Exec(putCmd); err != nil { diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 3c7fa5830c..1bb65d4317 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -135,7 +135,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( }) defer func() { - shutdown <- true + shutdown <- struct{}{} }() } diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index b16735b6bb..164c64e3cb 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -13,9 +13,9 @@ func HeartbeatRoutine( ctx context.Context, interval time.Duration, message func() string, -) chan bool { +) chan struct{} { counter := 1 - shutdown := make(chan bool) + shutdown := make(chan struct{}) go func() { for { msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())