Skip to content

Commit

Permalink
patch hubmanager close, heartbeat function
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 19, 2024
1 parent 8a87191 commit 79dcc3d
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 32 deletions.
12 changes: 6 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
Expand Down Expand Up @@ -383,7 +383,7 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(dstConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -452,7 +452,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -590,7 +590,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
}

shutdown := utils.HeartbeatRoutine(ctx, 1*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()
Expand Down Expand Up @@ -633,7 +633,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
return err
}

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -942,7 +942,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return nil
})

shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "syncing xmin."
})
defer shutdown()
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
stream *model.QRecordStream,
flowName string,
) (int, error) {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute,
shutdown := utils.HeartbeatRoutine(s.connector.ctx,
func() string {
return fmt.Sprintf("writing to avro stage for objectFolder %s and staging table %s",
objectFolder, stagingTable)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *EventHubConnector) Close() error {
allErrors = errors.Join(allErrors, err)
}

err = c.hubManager.Close(context.Background())
err = c.hubManager.Close(c.ctx)
if err != nil {
c.logger.Error("failed to close event hub manager", slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (c *EventHubConnector) processBatch(
lastUpdatedOffset := int64(0)

numRecords := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(c.ctx, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords.Load(), flowJobName,
Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand Down Expand Up @@ -127,7 +128,11 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu

func (m *EventHubManager) Close(ctx context.Context) error {
var allErrors error

numHubsClosed := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load())
})
defer shutdown()
m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
hub := value.(*azeventhubs.ProducerClient)
Expand All @@ -136,6 +141,8 @@ func (m *EventHubManager) Close(ctx context.Context) error {
slog.Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}

numHubsClosed.Add(1)
return true
})

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(p.ctx, func() string {
jobName := p.flowJobName
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (c *PostgresConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
c.logger.Info(fmt.Sprintf("fetched schema for table %s", tableName))
}

Expand Down Expand Up @@ -695,7 +695,7 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab

tableExistsMapping[tableIdentifier] = false
c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
}

err = createNormalizedTablesTx.Commit(c.ctx)
Expand Down Expand Up @@ -800,7 +800,7 @@ func (c *PostgresConnector) EnsurePullability(
},
},
}
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
}

return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/geo"
Expand Down Expand Up @@ -84,7 +83,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName)

if !qe.testEnv {
shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(qe.ctx, func() string {
qe.logger.Info(fmt.Sprintf("still running '%s'...", q))
return fmt.Sprintf("running '%s'", q)
})
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(avroFile *avro.AvroFile, stage
activity.RecordHeartbeat(s.connector.ctx, "putting file to stage")
putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage)

shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})
defer shutdown()
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *SnowflakeConnector) GetTableSchema(
return nil, err
}
res[tableName] = tableSchema
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
utils.RecordHeartbeat(c.ctx, fmt.Sprintf("fetched schema for table %s", tableName))
}

return &protos.GetTableSchemaBatchOutput{
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"log/slog"
"os"
"sync/atomic"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -131,7 +130,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
numRows := atomic.Uint32{}

if p.ctx != nil {
shutdown := utils.HeartbeatRoutine(p.ctx, 30*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(p.ctx, func() string {
written := numRows.Load()
return fmt.Sprintf("[avro] written %d rows to OCF", written)
})
Expand Down
21 changes: 9 additions & 12 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,31 @@ package utils
import (
"context"
"fmt"
"log/slog"
"time"

"go.temporal.io/sdk/activity"
)

func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) func() {
shutdown := make(chan struct{})
go func() {
counter := 0
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()

for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
RecordHeartbeatWithRecover(ctx, msg)
RecordHeartbeat(ctx, msg)
select {
case <-shutdown:
return
case <-ctx.Done():
return
case <-time.After(interval):
case <-ticker.C:
}
}
}()
Expand All @@ -35,12 +36,8 @@ func HeartbeatRoutine(

// if the functions are being called outside the context of a Temporal workflow,
// activity.RecordHeartbeat panics, this is a bandaid for that.
func RecordHeartbeatWithRecover(ctx context.Context, details ...interface{}) {
defer func() {
if r := recover(); r != nil {
slog.Warn("ignoring panic from activity.RecordHeartbeat")
slog.Warn("this can happen when function is invoked outside of a Temporal workflow")
}
}()
activity.RecordHeartbeat(ctx, details...)
func RecordHeartbeat(ctx context.Context, details ...interface{}) {
if activity.IsActivity(ctx) {
activity.RecordHeartbeat(ctx, details...)
}
}

0 comments on commit 79dcc3d

Please sign in to comment.