Skip to content

Commit

Permalink
connectors: don't emit heartbeats (#1742)
Browse files Browse the repository at this point in the history
Activity portion is responsible for heartbeats
Connectors should log info if wanting to expose progress
  • Loading branch information
serprex authored May 27, 2024
1 parent f4d566b commit 56a68c0
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 169 deletions.
58 changes: 30 additions & 28 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -139,10 +138,14 @@ func (a *FlowableActivity) GetTableSchema(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
srcConn, err := connectors.GetAs[connectors.GetTableSchemaConnector](ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get CDCPullPgConnector: %w", err)
return nil, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

heartbeatRoutine(ctx, func() string {
return "getting table schema"
})

return srcConn.GetTableSchema(ctx, config)
}

Expand Down Expand Up @@ -171,7 +174,7 @@ func (a *FlowableActivity) CreateNormalizedTable(

numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(config.TableNameSchemaMapping))
shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
Expand Down Expand Up @@ -308,7 +311,7 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job"
})
defer shutdown()
Expand Down Expand Up @@ -380,7 +383,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "getting partitions for job"
})
defer shutdown()
Expand Down Expand Up @@ -449,7 +452,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "consolidating partitions for job"
})
defer shutdown()
Expand Down Expand Up @@ -661,7 +664,7 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "scanning for new rows"
})
defer shutdown()
Expand All @@ -687,7 +690,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
}
defer connectors.CloseConnector(ctx, conn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "renaming tables for job"
})
defer shutdown()
Expand Down Expand Up @@ -731,6 +734,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

logger.Info("replicating xmin")
shutdown := heartbeatRoutine(ctx, func() string {
return "syncing xmin"
})
defer shutdown()

startTime := time.Now()
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
Expand All @@ -744,14 +753,13 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("replicating xmin")

bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)

stream := model.NewQRecordStream(bufferSize)

var currentSnapshotXmin int64
var rowsSynced int
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
var pullErr error
Expand Down Expand Up @@ -798,32 +806,26 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return nil
})

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "syncing xmin."
errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
return context.Canceled
})
defer shutdown()

rowsSynced, err := dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
if err := errGroup.Wait(); err != nil && err != context.Canceled {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, fmt.Errorf("failed to sync records: %w", err)
return 0, err
}

if rowsSynced == 0 {
logger.Info("no records to push for xmin")
} else {
err := errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, err
}

err = monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if rowsSynced > 0 {
logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}

logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
41 changes: 26 additions & 15 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

func heartbeatRoutine(
ctx context.Context,
message func() string,
) func() {
counter := 0
return shared.Interval(
ctx,
15*time.Second,
func() {
counter += 1
activity.RecordHeartbeat(ctx, fmt.Sprintf("heartbeat #%d: %s", counter, message()))
},
)
}

func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, a *FlowableActivity, sessionID string) (TPull, error) {
var none TPull
logger := activity.GetLogger(ctx)
Expand Down Expand Up @@ -66,7 +81,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
flowName := config.FlowJobName
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := activity.GetLogger(ctx)
activity.RecordHeartbeat(ctx, "starting flow...")
shutdown := heartbeatRoutine(ctx, func() string {
return "transferring records for job"
})
defer shutdown()

dstConn, err := connectors.GetAs[TSync](ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand All @@ -86,11 +105,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "transferring records for job"
})
defer shutdown()

batchSize := options.BatchSize
if batchSize == 0 {
batchSize = 1_000_000
Expand Down Expand Up @@ -286,12 +300,15 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
msg := fmt.Sprintf("replicating partition - %s: %d of %d total.", partition.PartitionId, idx, total)
activity.RecordHeartbeat(ctx, msg)

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

logger.Info("replicating partition " + partition.PartitionId)
shutdown := heartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -326,12 +343,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

logger.Info("replicating partition " + partition.PartitionId)
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()

bufferSize := shared.FetchAndChannelSize
stream := model.NewQRecordStream(bufferSize)
outstream := stream
Expand Down
14 changes: 0 additions & 14 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"google.golang.org/api/iterator"

Expand Down Expand Up @@ -740,9 +739,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

activity.RecordHeartbeat(ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

// if source table does not exist, log and continue.
dataset := c.client.DatasetInProject(c.projectID, srcDatasetTable.dataset)
_, err := dataset.Table(srcDatasetTable.table).Metadata(ctx)
Expand Down Expand Up @@ -776,8 +772,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

activity.RecordHeartbeat(ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

pkeyOnClauseBuilder := strings.Builder{}
ljWhereClauseBuilder := strings.Builder{}
for idx, col := range pkeyCols {
Expand Down Expand Up @@ -818,12 +812,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
if req.SyncedAtColName != nil {
c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", srcDatasetTable.string()))

activity.RecordHeartbeat(ctx, fmt.Sprintf("setting synced at column for table '%s'...",
srcDatasetTable.string()))

c.logger.Info(
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName))
query := c.client.Query(
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName))
Expand Down Expand Up @@ -877,8 +865,6 @@ func (c *BigQueryConnector) CreateTablesFromExisting(
existingDatasetTable, _ := c.convertToDatasetTable(existingTable)
c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

activity.RecordHeartbeat(ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

// rename the src table to dst
query := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`",
newDatasetTable.string(), existingDatasetTable.string()))
Expand Down
16 changes: 2 additions & 14 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"cloud.google.com/go/bigquery"
"go.temporal.io/sdk/activity"

avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -48,6 +47,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
s.connector.logger.Info(
fmt.Sprintf("Obtaining Avro schema for destination table %s and sync batch ID %d",
rawTableName, syncBatchID))

// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(rawTableName, dstTableMetadata, "", "")
if err != nil {
Expand All @@ -70,12 +70,6 @@ func (s *QRepAvroSyncMethod) SyncRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM `%s`;",
rawTableName, stagingTable)

activity.RecordHeartbeat(ctx,
fmt.Sprintf("Flow job %s: performing insert and update transaction"+
" for destination table %s and sync batch ID %d",
req.FlowJobName, rawTableName, syncBatchID),
)

query := bqClient.Query(insertStmt)
query.DefaultDatasetID = s.connector.datasetID
query.DefaultProjectID = s.connector.projectID
Expand Down Expand Up @@ -164,8 +158,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
s.connector.logger.Info("Obtained Avro schema for destination table", flowLog)
s.connector.logger.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog)
s.connector.logger.Info("Obtained Avro schema for destination table", flowLog, slog.Any("avroSchema", avroSchema))
// create a staging table name with partitionID replace hyphens with underscores
dstDatasetTable, _ := s.connector.convertToDatasetTable(dstTableName)
stagingDatasetTable := &datasetTable{
Expand All @@ -179,11 +172,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %w", err)
}
activity.RecordHeartbeat(ctx, fmt.Sprintf(
"Flow job %s: running insert-into-select transaction for"+
" destination table %s and partition ID %s",
flowJobName, dstTableName, partition.PartitionId),
)
bqClient := s.connector.client

insertColumns := make([]string, 0, len(dstTableMetadata.Schema))
Expand Down
4 changes: 0 additions & 4 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"strings"
"time"

"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -163,8 +161,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
return -1, err
}

activity.RecordHeartbeat(ctx, "finished syncing records")

return avroFile.NumRecords, nil
}

Expand Down
8 changes: 0 additions & 8 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand Down Expand Up @@ -106,12 +105,6 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu
}

func (m *EventHubManager) Close(ctx context.Context) error {
numHubsClosed := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load())
})
defer shutdown()

var allErrors error
m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
Expand All @@ -121,7 +114,6 @@ func (m *EventHubManager) Close(ctx context.Context) error {
logger.LoggerFromCtx(ctx).Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}
numHubsClosed.Add(1)
return true
})

Expand Down
Loading

0 comments on commit 56a68c0

Please sign in to comment.