From 1441b797dccc0100698d841f113cda24d3abb372 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Tue, 21 May 2024 02:18:24 +0000 Subject: [PATCH] connectors: don't emit heartbeats Activity portion is responsible for heartbeats Connectors should log info if wanting to expose progress --- flow/activities/flowable.go | 58 ++++++++++--------- flow/activities/flowable_core.go | 41 ++++++++----- flow/connectors/bigquery/bigquery.go | 14 ----- flow/connectors/bigquery/qrep_avro_sync.go | 16 +---- flow/connectors/clickhouse/qrep_avro_sync.go | 4 -- flow/connectors/eventhub/hubmanager.go | 8 --- flow/connectors/kafka/qrep.go | 12 ++-- flow/connectors/postgres/cdc.go | 8 +-- flow/connectors/postgres/postgres.go | 9 +-- flow/connectors/snowflake/qrep_avro_sync.go | 3 - flow/connectors/snowflake/snowflake.go | 10 ---- flow/connectors/sql/query_executor.go | 13 ++--- flow/connectors/utils/heartbeat.go | 43 -------------- .../{concurrency => shared}/bound_selector.go | 2 +- flow/shared/interval.go | 30 ++++++++++ flow/workflows/snapshot_flow.go | 5 +- 16 files changed, 107 insertions(+), 169 deletions(-) delete mode 100644 flow/connectors/utils/heartbeat.go rename flow/{concurrency => shared}/bound_selector.go (97%) create mode 100644 flow/shared/interval.go diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6834402b3b..288282053c 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -23,7 +23,6 @@ import ( connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" @@ -139,10 +138,14 @@ func (a *FlowableActivity) GetTableSchema( ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) srcConn, err := connectors.GetAs[connectors.GetTableSchemaConnector](ctx, config.PeerConnectionConfig) if err != nil { - return nil, fmt.Errorf("failed to get CDCPullPgConnector: %w", err) + return nil, fmt.Errorf("failed to get GetTableSchemaConnector: %w", err) } defer connectors.CloseConnector(ctx, srcConn) + heartbeatRoutine(ctx, func() string { + return "getting table schema" + }) + return srcConn.GetTableSchema(ctx, config) } @@ -171,7 +174,7 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesSetup := atomic.Uint32{} totalTables := uint32(len(config.TableNameSchemaMapping)) - shutdown := utils.HeartbeatRoutine(ctx, func() string { + shutdown := heartbeatRoutine(ctx, func() string { return fmt.Sprintf("setting up normalized tables - %d of %d done", numTablesSetup.Load(), totalTables) }) @@ -308,7 +311,7 @@ func (a *FlowableActivity) StartNormalize( } defer connectors.CloseConnector(ctx, dstConn) - shutdown := utils.HeartbeatRoutine(ctx, func() string { + shutdown := heartbeatRoutine(ctx, func() string { return "normalizing records from batch for job" }) defer shutdown() @@ -380,7 +383,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, } defer connectors.CloseConnector(ctx, srcConn) - shutdown := utils.HeartbeatRoutine(ctx, func() string { + shutdown := heartbeatRoutine(ctx, func() string { return "getting partitions for job" }) defer shutdown() @@ -449,7 +452,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config } defer connectors.CloseConnector(ctx, dstConn) - shutdown := utils.HeartbeatRoutine(ctx, func() string { + shutdown := heartbeatRoutine(ctx, func() string { return "consolidating partitions for job" }) defer shutdown() @@ -667,7 +670,7 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context, } defer connectors.CloseConnector(ctx, srcConn) - shutdown := utils.HeartbeatRoutine(ctx, func() string { + shutdown := heartbeatRoutine(ctx, func() string { return "scanning for new rows" }) defer shutdown() @@ -693,7 +696,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena } defer connectors.CloseConnector(ctx, conn) - shutdown := utils.HeartbeatRoutine(ctx, func() string { + shutdown := heartbeatRoutine(ctx, func() string { return "renaming tables for job" }) defer shutdown() @@ -737,6 +740,12 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) + logger.Info("replicating xmin") + shutdown := heartbeatRoutine(ctx, func() string { + return "syncing xmin" + }) + defer shutdown() + startTime := time.Now() srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { @@ -750,14 +759,13 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, } defer connectors.CloseConnector(ctx, dstConn) - logger.Info("replicating xmin") - bufferSize := shared.FetchAndChannelSize errGroup, errCtx := errgroup.WithContext(ctx) stream := model.NewQRecordStream(bufferSize) var currentSnapshotXmin int64 + var rowsSynced int errGroup.Go(func() error { pgConn := srcConn.(*connpostgres.PostgresConnector) var pullErr error @@ -804,32 +812,26 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context, return nil }) - shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "syncing xmin." + errGroup.Go(func() error { + rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream) + if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) + return fmt.Errorf("failed to sync records: %w", err) + } + return context.Canceled }) - defer shutdown() - rowsSynced, err := dstConn.SyncQRepRecords(ctx, config, partition, stream) - if err != nil { + if err := errGroup.Wait(); err != nil && err != context.Canceled { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return 0, fmt.Errorf("failed to sync records: %w", err) + return 0, err } - if rowsSynced == 0 { - logger.Info("no records to push for xmin") - } else { - err := errGroup.Wait() - if err != nil { - a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return 0, err - } - - err = monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition) + if rowsSynced > 0 { + logger.Info(fmt.Sprintf("pushed %d records", rowsSynced)) + err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition) if err != nil { return 0, err } - - logger.Info(fmt.Sprintf("pushed %d records", rowsSynced)) } err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index ad3dca378a..3a052af371 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -28,6 +28,21 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) +func heartbeatRoutine( + ctx context.Context, + message func() string, +) func() { + counter := 0 + return shared.Interval( + ctx, + 15*time.Second, + func() { + counter += 1 + activity.RecordHeartbeat(ctx, fmt.Sprintf("heartbeat #%d: %s", counter, message())) + }, + ) +} + func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, a *FlowableActivity, sessionID string) (TPull, error) { var none TPull logger := activity.GetLogger(ctx) @@ -66,7 +81,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon flowName := config.FlowJobName ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) logger := activity.GetLogger(ctx) - activity.RecordHeartbeat(ctx, "starting flow...") + shutdown := heartbeatRoutine(ctx, func() string { + return "transferring records for job" + }) + defer shutdown() + dstConn, err := connectors.GetAs[TSync](ctx, config.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) @@ -99,11 +118,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } } - shutdown := utils.HeartbeatRoutine(ctx, func() string { - return "transferring records for job" - }) - defer shutdown() - batchSize := options.BatchSize if batchSize == 0 { batchSize = 1_000_000 @@ -299,12 +313,15 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, partition *protos.QRepPartition, runUUID string, ) error { - msg := fmt.Sprintf("replicating partition - %s: %d of %d total.", partition.PartitionId, idx, total) - activity.RecordHeartbeat(ctx, msg) - ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) + logger.Info("replicating partition " + partition.PartitionId) + shutdown := heartbeatRoutine(ctx, func() string { + return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) + }) + defer shutdown() + srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) @@ -339,12 +356,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, return fmt.Errorf("failed to update start time for partition: %w", err) } - logger.Info("replicating partition " + partition.PartitionId) - shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) - }) - defer shutdown() - bufferSize := shared.FetchAndChannelSize stream := model.NewQRecordStream(bufferSize) outstream := stream diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 51812bdf4a..9b10b1f32d 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -12,7 +12,6 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/storage" "github.com/jackc/pgx/v5/pgxpool" - "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "google.golang.org/api/iterator" @@ -740,9 +739,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), dstDatasetTable.string())) - activity.RecordHeartbeat(ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), - dstDatasetTable.string())) - // if source table does not exist, log and continue. dataset := c.client.DatasetInProject(c.projectID, srcDatasetTable.dataset) _, err := dataset.Table(srcDatasetTable.table).Metadata(ctx) @@ -776,8 +772,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) - activity.RecordHeartbeat(ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) - pkeyOnClauseBuilder := strings.Builder{} ljWhereClauseBuilder := strings.Builder{} for idx, col := range pkeyCols { @@ -818,12 +812,6 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename if req.SyncedAtColName != nil { c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", srcDatasetTable.string())) - activity.RecordHeartbeat(ctx, fmt.Sprintf("setting synced at column for table '%s'...", - srcDatasetTable.string())) - - c.logger.Info( - fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(), - *req.SyncedAtColName, *req.SyncedAtColName)) query := c.client.Query( fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(), *req.SyncedAtColName, *req.SyncedAtColName)) @@ -877,8 +865,6 @@ func (c *BigQueryConnector) CreateTablesFromExisting( existingDatasetTable, _ := c.convertToDatasetTable(existingTable) c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable)) - activity.RecordHeartbeat(ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable)) - // rename the src table to dst query := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`", newDatasetTable.string(), existingDatasetTable.string())) diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d022045b70..f79a615920 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -11,7 +11,6 @@ import ( "time" "cloud.google.com/go/bigquery" - "go.temporal.io/sdk/activity" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" numeric "github.com/PeerDB-io/peer-flow/datatypes" @@ -49,6 +48,7 @@ func (s *QRepAvroSyncMethod) SyncRecords( s.connector.logger.Info( fmt.Sprintf("Obtaining Avro schema for destination table %s and sync batch ID %d", rawTableName, syncBatchID)) + // You will need to define your Avro schema as a string avroSchema, err := DefineAvroSchema(rawTableName, dstTableMetadata, "", "") if err != nil { @@ -71,12 +71,6 @@ func (s *QRepAvroSyncMethod) SyncRecords( insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT * FROM `%s`;", rawTableName, stagingTable) - activity.RecordHeartbeat(ctx, - fmt.Sprintf("Flow job %s: performing insert and update transaction"+ - " for destination table %s and sync batch ID %d", - req.FlowJobName, rawTableName, syncBatchID), - ) - query := bqClient.Query(insertStmt) query.DefaultDatasetID = s.connector.datasetID query.DefaultProjectID = s.connector.projectID @@ -165,8 +159,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( if err != nil { return 0, fmt.Errorf("failed to define Avro schema: %w", err) } - s.connector.logger.Info("Obtained Avro schema for destination table", flowLog) - s.connector.logger.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog) + s.connector.logger.Info("Obtained Avro schema for destination table", flowLog, slog.Any("avroSchema", avroSchema)) // create a staging table name with partitionID replace hyphens with underscores dstDatasetTable, _ := s.connector.convertToDatasetTable(dstTableName) stagingDatasetTable := &datasetTable{ @@ -180,11 +173,6 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( if err != nil { return -1, fmt.Errorf("failed to push to avro stage: %w", err) } - activity.RecordHeartbeat(ctx, fmt.Sprintf( - "Flow job %s: running insert-into-select transaction for"+ - " destination table %s and partition ID %s", - flowJobName, dstTableName, partition.PartitionId), - ) bqClient := s.connector.client insertColumns := make([]string, 0, len(dstTableMetadata.Schema)) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index b015ef47e4..a49937ddd6 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -8,8 +8,6 @@ import ( "strings" "time" - "go.temporal.io/sdk/activity" - "github.com/PeerDB-io/peer-flow/connectors/utils" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -163,8 +161,6 @@ func (s *ClickhouseAvroSyncMethod) SyncQRepRecords( return -1, err } - activity.RecordHeartbeat(ctx, "finished syncing records") - return avroFile.NumRecords, nil } diff --git a/flow/connectors/eventhub/hubmanager.go b/flow/connectors/eventhub/hubmanager.go index b374d12543..680aacd155 100644 --- a/flow/connectors/eventhub/hubmanager.go +++ b/flow/connectors/eventhub/hubmanager.go @@ -7,7 +7,6 @@ import ( "log/slog" "strings" "sync" - "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -106,12 +105,6 @@ func (m *EventHubManager) closeProducerClient(ctx context.Context, pc *azeventhu } func (m *EventHubManager) Close(ctx context.Context) error { - numHubsClosed := atomic.Uint32{} - shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("closed %d eventhub clients", numHubsClosed.Load()) - }) - defer shutdown() - var allErrors error m.hubs.Range(func(key any, value any) bool { name := key.(ScopedEventhub) @@ -121,7 +114,6 @@ func (m *EventHubManager) Close(ctx context.Context) error { logger.LoggerFromCtx(ctx).Error(fmt.Sprintf("failed to close eventhub client for %v", name), slog.Any("error", err)) allErrors = errors.Join(allErrors, err) } - numHubsClosed.Add(1) return true }) diff --git a/flow/connectors/kafka/qrep.go b/flow/connectors/kafka/qrep.go index 21bd08de97..e8daa94b7d 100644 --- a/flow/connectors/kafka/qrep.go +++ b/flow/connectors/kafka/qrep.go @@ -9,10 +9,10 @@ import ( "github.com/twmb/franz-go/pkg/kgo" lua "github.com/yuin/gopher-lua" - "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/pua" + "github.com/PeerDB-io/peer-flow/shared" ) func (*KafkaConnector) SetupQRepMetadataTables(_ context.Context, _ *protos.QRepConfig) error { @@ -29,11 +29,6 @@ func (c *KafkaConnector) SyncQRepRecords( numRecords := atomic.Int64{} schema := stream.Schema() - shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("sent %d records to %s", numRecords.Load(), config.DestinationTableIdentifier) - }) - defer shutdown() - queueCtx, queueErr := context.WithCancelCause(ctx) pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, nil, queueErr) if err != nil { @@ -41,6 +36,11 @@ func (c *KafkaConnector) SyncQRepRecords( } defer pool.Close() + shutdown := shared.Interval(ctx, time.Minute, func() { + c.logger.Info(fmt.Sprintf("sent %d records", numRecords.Load())) + }) + defer shutdown() + Loop: for { select { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 7937d452ab..cfb1664bf5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -21,6 +21,7 @@ import ( "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/shared" ) type PostgresCDCSource struct { @@ -323,11 +324,8 @@ func PullCdcRecords[Items model.Items]( } }() - shutdown := utils.HeartbeatRoutine(ctx, func() string { - currRecords := cdcRecordsStorage.Len() - msg := fmt.Sprintf("pulling records, currently have %d records", currRecords) - logger.Info(msg) - return msg + shutdown := shared.Interval(ctx, time.Minute, func() { + logger.Info(fmt.Sprintf("pulling records, currently have %d records", cdcRecordsStorage.Len())) }) defer shutdown() diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index d8dea391de..f41e69c3ce 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -16,7 +16,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "go.opentelemetry.io/otel/attribute" - "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -735,9 +734,6 @@ func (c *PostgresConnector) GetTableSchema( ) (*protos.GetTableSchemaBatchOutput, error) { res := make(map[string]*protos.TableSchema) for _, tableName := range req.TableIdentifiers { - if activity.IsActivity(ctx) { - activity.RecordHeartbeat(ctx, "fetching schema for table "+tableName) - } tableSchema, err := c.getTableSchemaForTable(ctx, tableName, req.System) if err != nil { return nil, err @@ -951,8 +947,7 @@ func (c *PostgresConnector) EnsurePullability( } if !req.CheckConstraints { - msg := "[no-constraints] ensured pullability table " + tableName - utils.RecordHeartbeat(ctx, msg) + logger.LoggerFromCtx(ctx).Info("[no-constraints] ensured pullability table " + tableName) continue } @@ -971,8 +966,6 @@ func (c *PostgresConnector) EnsurePullability( if len(pKeyCols) == 0 && replicaIdentity != ReplicaIdentityFull { return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) } - - utils.RecordHeartbeat(ctx, "ensured pullability table "+tableName) } return &protos.EnsurePullabilityBatchOutput{TableIdentifierMapping: tableIdentifierMapping}, nil diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index fadd09deab..2e37705c14 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -10,7 +10,6 @@ import ( "time" _ "github.com/snowflakedb/gosnowflake" - "go.temporal.io/sdk/activity" "github.com/PeerDB-io/peer-flow/connectors/utils" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" @@ -128,8 +127,6 @@ func (s *SnowflakeAvroSyncHandler) SyncQRepRecords( return 0, err } - activity.RecordHeartbeat(ctx, "finished syncing records") - return avroFile.NumRecords, nil } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index a0fcc1754d..60981a0ce9 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -14,7 +14,6 @@ import ( "github.com/aws/smithy-go/ptr" "github.com/jackc/pgx/v5/pgtype" "github.com/snowflakedb/gosnowflake" - "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "golang.org/x/sync/errgroup" @@ -734,9 +733,6 @@ func (c *SnowflakeConnector) RenameTables(ctx context.Context, req *protos.Renam c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", resyncTblName)) - activity.RecordHeartbeat(ctx, fmt.Sprintf("setting synced at column for table '%s'...", - resyncTblName)) - _, err = renameTablesTx.ExecContext(ctx, fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP", resyncTblName, *req.SyncedAtColName)) if err != nil { @@ -765,8 +761,6 @@ func (c *SnowflakeConnector) RenameTables(ctx context.Context, req *protos.Renam c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dst)) - activity.RecordHeartbeat(ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dst)) - _, err = renameTablesTx.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)", src, fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName), allCols, *req.SoftDeleteColName, @@ -784,8 +778,6 @@ func (c *SnowflakeConnector) RenameTables(ctx context.Context, req *protos.Renam c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", src, dst)) - activity.RecordHeartbeat(ctx, fmt.Sprintf("renaming table '%s' to '%s'...", src, dst)) - // drop the dst table if exists _, err = renameTablesTx.ExecContext(ctx, "DROP TABLE IF EXISTS "+dst) if err != nil { @@ -828,8 +820,6 @@ func (c *SnowflakeConnector) CreateTablesFromExisting(ctx context.Context, req * for newTable, existingTable := range req.NewToExistingTableMapping { c.logger.Info(fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable)) - activity.RecordHeartbeat(ctx, fmt.Sprintf("creating table '%s' similar to '%s'", newTable, existingTable)) - // rename the src table to dst _, err = createTablesFromExistingTx.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s LIKE %s", newTable, existingTable)) diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 7f56398f40..48ab5ce454 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -13,7 +13,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jmoiron/sqlx" "github.com/shopspring/decimal" - "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "github.com/PeerDB-io/peer-flow/model" @@ -166,7 +165,7 @@ func (g *GenericSQLQueryExecutor) columnTypeToQField(ct *sql.ColumnType) (qvalue }, nil } -func (g *GenericSQLQueryExecutor) processRows(ctx context.Context, rows *sqlx.Rows) (*model.QRecordBatch, error) { +func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBatch, error) { dbColTypes, err := rows.ColumnTypes() if err != nil { return nil, err @@ -186,7 +185,7 @@ func (g *GenericSQLQueryExecutor) processRows(ctx context.Context, rows *sqlx.Ro var records [][]qvalue.QValue totalRowsProcessed := 0 - const heartBeatNumRows = 25000 + const logEveryNumRows = 50000 for rows.Next() { columns, err := rows.Columns() @@ -251,8 +250,8 @@ func (g *GenericSQLQueryExecutor) processRows(ctx context.Context, rows *sqlx.Ro records = append(records, qValues) totalRowsProcessed += 1 - if totalRowsProcessed%heartBeatNumRows == 0 { - activity.RecordHeartbeat(ctx, fmt.Sprintf("processed %d rows", totalRowsProcessed)) + if totalRowsProcessed%logEveryNumRows == 0 { + g.logger.Info("processed rows", slog.Int("rows", totalRowsProcessed)) } } @@ -278,7 +277,7 @@ func (g *GenericSQLQueryExecutor) ExecuteAndProcessQuery( } defer rows.Close() - return g.processRows(ctx, rows) + return g.processRows(rows) } func (g *GenericSQLQueryExecutor) NamedExecuteAndProcessQuery( @@ -292,7 +291,7 @@ func (g *GenericSQLQueryExecutor) NamedExecuteAndProcessQuery( } defer rows.Close() - return g.processRows(ctx, rows) + return g.processRows(rows) } func (g *GenericSQLQueryExecutor) ExecuteQuery(ctx context.Context, query string, args ...interface{}) error { diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go deleted file mode 100644 index 201814c2c0..0000000000 --- a/flow/connectors/utils/heartbeat.go +++ /dev/null @@ -1,43 +0,0 @@ -package utils - -import ( - "context" - "fmt" - "time" - - "go.temporal.io/sdk/activity" -) - -func HeartbeatRoutine( - ctx context.Context, - message func() string, -) func() { - shutdown := make(chan struct{}) - go func() { - counter := 0 - ticker := time.NewTicker(15 * time.Second) - defer ticker.Stop() - - for { - counter += 1 - msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) - RecordHeartbeat(ctx, msg) - select { - case <-shutdown: - return - case <-ctx.Done(): - return - case <-ticker.C: - } - } - }() - return func() { close(shutdown) } -} - -// if the functions are being called outside the context of a Temporal workflow, -// activity.RecordHeartbeat panics, this is a bandaid for that. -func RecordHeartbeat(ctx context.Context, details ...interface{}) { - if activity.IsActivity(ctx) { - activity.RecordHeartbeat(ctx, details...) - } -} diff --git a/flow/concurrency/bound_selector.go b/flow/shared/bound_selector.go similarity index 97% rename from flow/concurrency/bound_selector.go rename to flow/shared/bound_selector.go index 18f291cd84..7d056c95af 100644 --- a/flow/concurrency/bound_selector.go +++ b/flow/shared/bound_selector.go @@ -1,4 +1,4 @@ -package concurrency +package shared import ( "errors" diff --git a/flow/shared/interval.go b/flow/shared/interval.go new file mode 100644 index 0000000000..5aead0a443 --- /dev/null +++ b/flow/shared/interval.go @@ -0,0 +1,30 @@ +package shared + +import ( + "context" + "time" +) + +func Interval( + ctx context.Context, + freq time.Duration, + fn func(), +) func() { + shutdown := make(chan struct{}) + go func() { + ticker := time.NewTicker(freq) + defer ticker.Stop() + + for { + fn() + select { + case <-shutdown: + return + case <-ctx.Done(): + return + case <-ticker.C: + } + } + }() + return func() { close(shutdown) } +} diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index 10757774d6..24a51f8643 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -12,7 +12,6 @@ import ( "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/activities" - "github.com/PeerDB-io/peer-flow/concurrency" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -102,7 +101,7 @@ func (s *SnapshotFlowExecution) closeSlotKeepAlive( func (s *SnapshotFlowExecution) cloneTable( ctx workflow.Context, - boundSelector *concurrency.BoundSelector, + boundSelector *shared.BoundSelector, snapshotName string, mapping *protos.TableMapping, ) error { @@ -219,7 +218,7 @@ func (s *SnapshotFlowExecution) cloneTables( cloneTablesInput.snapshotName) } - boundSelector := concurrency.NewBoundSelector(ctx, cloneTablesInput.maxParallelClones) + boundSelector := shared.NewBoundSelector(ctx, cloneTablesInput.maxParallelClones) defaultPartitionCol := "ctid" if !cloneTablesInput.supportsTIDScans {