Skip to content

Commit

Permalink
EventHub processBatch: handle ctx.Done()
Browse files Browse the repository at this point in the history
Based on #1151,
the ticker.Reset doesn't need to be changed since ticker isn't shared between goroutines
  • Loading branch information
serprex committed Jan 25, 2024
1 parent 090b8fb commit 1caa2ca
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 17 deletions.
27 changes: 13 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
numRecords := res.NumRecordsSynced
syncDuration := time.Since(syncStartTime)

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n",
numRecords, int(syncDuration.Seconds())),
)
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
if err != nil {
Expand Down Expand Up @@ -426,7 +424,7 @@ func (a *FlowableActivity) StartNormalize(

// log the number of batches normalized
if res != nil {
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d\n",
slog.InfoContext(ctx, fmt.Sprintf("normalized records from batch %d to batch %d",
res.StartBatchID, res.EndBatchID))
}

Expand Down Expand Up @@ -505,11 +503,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,

numPartitions := len(partitions.Partitions)

slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d\n",
slog.InfoContext(ctx, fmt.Sprintf("replicating partitions for batch %d - size: %d",
partitions.BatchId, numPartitions),
)
for i, p := range partitions.Partitions {
slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s\n", partitions.BatchId, p.PartitionId))
slog.InfoContext(ctx, fmt.Sprintf("batch-%d - replicating partition - %s", partitions.BatchId, p.PartitionId))
err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -551,7 +549,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s\n", partition.PartitionId))
slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s", partition.PartitionId))
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
Expand Down Expand Up @@ -590,7 +588,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull qrep records: %w", err)
}
slog.InfoContext(ctx, fmt.Sprintf("pulled %d records\n", len(recordBatch.Records)))
slog.InfoContext(ctx, fmt.Sprintf("pulled %d records", len(recordBatch.Records)))

err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, int64(len(recordBatch.Records)))
if err != nil {
Expand All @@ -611,7 +609,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s\n", partition.PartitionId))
slog.InfoContext(ctx, fmt.Sprintf("no records to push for partition %s", partition.PartitionId))
pullCancel()
} else {
wg.Wait()
Expand All @@ -625,7 +623,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return err
}

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced))
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down Expand Up @@ -732,6 +730,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
sendTimeout := 10 * time.Minute
ticker := time.NewTicker(sendTimeout)
defer ticker.Stop()

activity.RecordHeartbeat(ctx, "sending walheartbeat every 10 minutes")
for {
select {
Expand Down Expand Up @@ -798,7 +797,7 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)
pgSrcConn := srcConn.(*connpostgres.PostgresConnector)
slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v\n", last))
slog.InfoContext(ctx, fmt.Sprintf("current last partition value is %v", last))
attemptCount := 1
for {
activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount))
Expand Down Expand Up @@ -914,7 +913,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, "replicating xmin\n")
slog.InfoContext(ctx, "replicating xmin")

bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -980,7 +979,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}

if rowsSynced == 0 {
slog.InfoContext(ctx, "no records to push for xmin\n")
slog.InfoContext(ctx, "no records to push for xmin")
} else {
err := errGroup.Wait()
if err != nil {
Expand All @@ -993,7 +992,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return 0, err
}

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced))
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
2 changes: 1 addition & 1 deletion flow/activities/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (a *FlowableActivity) recordSlotSizePeriodically(

timeout := 5 * time.Minute
ticker := time.NewTicker(timeout)

defer ticker.Stop()

for {
select {
case <-ticker.C:
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (c *EventHubConnector) processBatch(
c.logger.Info("processBatch", slog.Int("number of records processed for sending", int(curNumRecords)))
}

case <-c.ctx.Done():
return 0, fmt.Errorf("[eventhub] context cancelled %w", c.ctx.Err())

case <-ticker.C:
err := batchPerTopic.flushAllBatches(ctx, flowJobName)
if err != nil {
Expand All @@ -213,7 +216,7 @@ func (c *EventHubConnector) processBatch(
lastUpdatedOffset = lastSeenLSN
c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeenLSN))
if err != nil {
return 0, fmt.Errorf("failed to update last offset: %v", err)
return 0, fmt.Errorf("failed to update last offset: %w", err)
}
}

Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand Down Expand Up @@ -103,8 +104,13 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu
}

func (m *EventHubManager) Close(ctx context.Context) error {
var allErrors error
numHubsClosed := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load())
})
defer shutdown()

var allErrors error
m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
hub := value.(*azeventhubs.ProducerClient)
Expand All @@ -113,6 +119,7 @@ func (m *EventHubManager) Close(ctx context.Context) error {
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}
numHubsClosed.Add(1)
return true
})

Expand Down

0 comments on commit 1caa2ca

Please sign in to comment.