Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connectors: don't emit heartbeats #1742

Merged
merged 5 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -139,10 +138,14 @@ func (a *FlowableActivity) GetTableSchema(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
srcConn, err := connectors.GetAs[connectors.GetTableSchemaConnector](ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get CDCPullPgConnector: %w", err)
return nil, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

heartbeatRoutine(ctx, func() string {
return "getting table schema"
})

return srcConn.GetTableSchema(ctx, config)
}

Expand Down Expand Up @@ -171,7 +174,7 @@ func (a *FlowableActivity) CreateNormalizedTable(

numTablesSetup := atomic.Uint32{}
totalTables := uint32(len(config.TableNameSchemaMapping))
shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return fmt.Sprintf("setting up normalized tables - %d of %d done",
numTablesSetup.Load(), totalTables)
})
Expand Down Expand Up @@ -308,7 +311,7 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "normalizing records from batch for job"
})
defer shutdown()
Expand Down Expand Up @@ -380,7 +383,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "getting partitions for job"
})
defer shutdown()
Expand Down Expand Up @@ -449,7 +452,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
}
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "consolidating partitions for job"
})
defer shutdown()
Expand Down Expand Up @@ -661,7 +664,7 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "scanning for new rows"
})
defer shutdown()
Expand All @@ -687,7 +690,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
}
defer connectors.CloseConnector(ctx, conn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
shutdown := heartbeatRoutine(ctx, func() string {
return "renaming tables for job"
})
defer shutdown()
Expand Down Expand Up @@ -731,6 +734,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

logger.Info("replicating xmin")
shutdown := heartbeatRoutine(ctx, func() string {
return "syncing xmin"
})
defer shutdown()

startTime := time.Now()
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
Expand All @@ -744,14 +753,13 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("replicating xmin")

bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)

stream := model.NewQRecordStream(bufferSize)

var currentSnapshotXmin int64
var rowsSynced int
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
var pullErr error
Expand Down Expand Up @@ -798,32 +806,26 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return nil
})

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "syncing xmin."
errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
return context.Canceled
})
defer shutdown()

rowsSynced, err := dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
if err := errGroup.Wait(); err != nil && err != context.Canceled {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, fmt.Errorf("failed to sync records: %w", err)
return 0, err
}

if rowsSynced == 0 {
logger.Info("no records to push for xmin")
} else {
err := errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, err
}

err = monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if rowsSynced > 0 {
logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}

logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
Expand Down
41 changes: 26 additions & 15 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

func heartbeatRoutine(
ctx context.Context,
message func() string,
) func() {
counter := 0
return shared.Interval(
ctx,
15*time.Second,
func() {
counter += 1
activity.RecordHeartbeat(ctx, fmt.Sprintf("heartbeat #%d: %s", counter, message()))
},
)
}

func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, a *FlowableActivity, sessionID string) (TPull, error) {
var none TPull
logger := activity.GetLogger(ctx)
Expand Down Expand Up @@ -66,7 +81,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
flowName := config.FlowJobName
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
logger := activity.GetLogger(ctx)
activity.RecordHeartbeat(ctx, "starting flow...")
shutdown := heartbeatRoutine(ctx, func() string {
return "transferring records for job"
})
defer shutdown()

dstConn, err := connectors.GetAs[TSync](ctx, config.Destination)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
Expand All @@ -86,11 +105,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil)
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "transferring records for job"
})
defer shutdown()

batchSize := options.BatchSize
if batchSize == 0 {
batchSize = 1_000_000
Expand Down Expand Up @@ -286,12 +300,15 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
msg := fmt.Sprintf("replicating partition - %s: %d of %d total.", partition.PartitionId, idx, total)
activity.RecordHeartbeat(ctx, msg)

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

logger.Info("replicating partition " + partition.PartitionId)
shutdown := heartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -326,12 +343,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
return fmt.Errorf("failed to update start time for partition: %w", err)
}

logger.Info("replicating partition " + partition.PartitionId)
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()

bufferSize := shared.FetchAndChannelSize
stream := model.NewQRecordStream(bufferSize)
outstream := stream
Expand Down
14 changes: 0 additions & 14 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/storage"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"google.golang.org/api/iterator"

Expand Down Expand Up @@ -740,9 +739,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

activity.RecordHeartbeat(ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

// if source table does not exist, log and continue.
dataset := c.client.DatasetInProject(c.projectID, srcDatasetTable.dataset)
_, err := dataset.Table(srcDatasetTable.table).Metadata(ctx)
Expand Down Expand Up @@ -776,8 +772,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

activity.RecordHeartbeat(ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

pkeyOnClauseBuilder := strings.Builder{}
ljWhereClauseBuilder := strings.Builder{}
for idx, col := range pkeyCols {
Expand Down Expand Up @@ -818,12 +812,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
if req.SyncedAtColName != nil {
c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", srcDatasetTable.string()))

activity.RecordHeartbeat(ctx, fmt.Sprintf("setting synced at column for table '%s'...",
srcDatasetTable.string()))

c.logger.Info(
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName))
query := c.client.Query(
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName))
Expand Down Expand Up @@ -877,8 +865,6 @@ func (c *BigQueryConnector) CreateTablesFromExisting(
existingDatasetTable, _ := c.convertToDatasetTable(existingTable)
c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

activity.RecordHeartbeat(ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable))

// rename the src table to dst
query := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`",
newDatasetTable.string(), existingDatasetTable.string()))
Expand Down
16 changes: 2 additions & 14 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"cloud.google.com/go/bigquery"
"go.temporal.io/sdk/activity"

avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -48,6 +47,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
s.connector.logger.Info(
fmt.Sprintf("Obtaining Avro schema for destination table %s and sync batch ID %d",
rawTableName, syncBatchID))

// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(rawTableName, dstTableMetadata, "", "")
if err != nil {
Expand All @@ -70,12 +70,6 @@ func (s *QRepAvroSyncMethod) SyncRecords(
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM `%s`;",
rawTableName, stagingTable)

activity.RecordHeartbeat(ctx,
fmt.Sprintf("Flow job %s: performing insert and update transaction"+
" for destination table %s and sync batch ID %d",
req.FlowJobName, rawTableName, syncBatchID),
)

query := bqClient.Query(insertStmt)
query.DefaultDatasetID = s.connector.datasetID
query.DefaultProjectID = s.connector.projectID
Expand Down Expand Up @@ -164,8 +158,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
s.connector.logger.Info("Obtained Avro schema for destination table", flowLog)
s.connector.logger.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog)
s.connector.logger.Info("Obtained Avro schema for destination table", flowLog, slog.Any("avroSchema", avroSchema))
// create a staging table name with partitionID replace hyphens with underscores
dstDatasetTable, _ := s.connector.convertToDatasetTable(dstTableName)
stagingDatasetTable := &datasetTable{
Expand All @@ -179,11 +172,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %w", err)
}
activity.RecordHeartbeat(ctx, fmt.Sprintf(
"Flow job %s: running insert-into-select transaction for"+
" destination table %s and partition ID %s",
flowJobName, dstTableName, partition.PartitionId),
)
bqClient := s.connector.client

insertColumns := make([]string, 0, len(dstTableMetadata.Schema))
Expand Down
4 changes: 0 additions & 4 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"strings"
"time"

"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/connectors/utils"
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -163,8 +161,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords(
return -1, err
}

activity.RecordHeartbeat(ctx, "finished syncing records")

return avroFile.NumRecords, nil
}

Expand Down
8 changes: 0 additions & 8 deletions flow/connectors/eventhub/hubmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand Down Expand Up @@ -106,12 +105,6 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu
}

func (m *EventHubManager) Close(ctx context.Context) error {
numHubsClosed := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load())
})
defer shutdown()

var allErrors error
m.hubs.Range(func(key any, value any) bool {
name := key.(ScopedEventhub)
Expand All @@ -121,7 +114,6 @@ func (m *EventHubManager) Close(ctx context.Context) error {
logger.LoggerFromCtx(ctx).Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err))
allErrors = errors.Join(allErrors, err)
}
numHubsClosed.Add(1)
return true
})

Expand Down
Loading
Loading