Skip to content

Commit

Permalink
fix closing hubs
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jan 25, 2024
1 parent ca68b64 commit b919a1a
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
6 changes: 2 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
numRecords := res.NumRecordsSynced
syncDuration := time.Since(syncStartTime)

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds\n",
numRecords, int(syncDuration.Seconds())),
)
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records in %d seconds", numRecords, int(syncDuration.Seconds())))
activity.RecordHeartbeat(ctx, fmt.Sprintf("pushed %d records", numRecords))

lastCheckpoint, err := recordBatch.GetLastCheckpoint()
Expand Down Expand Up @@ -632,7 +630,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return err
}

slog.InfoContext(ctx, fmt.Sprintf("pushed %d records\n", rowsSynced))
slog.InfoContext(ctx, fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *EventHubConnector) Close() error {
allErrors = errors.Join(allErrors, err)
}

err = c.hubManager.Close(context.Background())
err = c.hubManager.Close(c.ctx)
if err != nil {
c.logger.Error("failed to close event hub manager", slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand Down Expand Up @@ -126,6 +127,12 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu
}

func (m *EventHubManager) Close(ctx context.Context) error {
numHubsClosed := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load())
})
defer shutdown()

var allErrors error

m.hubs.Range(func(key any, value any) bool {
Expand All @@ -136,6 +143,7 @@ func (m *EventHubManager) Close(ctx context.Context) error {
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}
numHubsClosed.Add(1)
return true
})

Expand Down

0 comments on commit b919a1a

Please sign in to comment.