Skip to content

Commit

Permalink
Merge branch 'main' into normalize-split
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 22, 2024
2 parents 1ebe562 + a7cbc8f commit ceb7fe9
Show file tree
Hide file tree
Showing 28 changed files with 705 additions and 746 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ui-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,4 @@ jobs:
prettier_dir: ui
eslint_args: "--max-warnings 0"
eslint_extensions: js,ts,jsx,tsx
prettier_extensions: js,ts,jsx,tsx,json
41 changes: 29 additions & 12 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

go a.recordSlotSizePeriodically(errCtx, srcConn, slotNameForMetrics, input.FlowConnectionConfigs.Source.Name)

shutdown := utils.HeartbeatRoutine(ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
})
Expand Down Expand Up @@ -387,7 +387,7 @@ func (a *FlowableActivity) StartNormalize(
}
defer connectors.CloseConnector(dstConn)

shutdown := utils.HeartbeatRoutine(ctx, 15*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -455,7 +455,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}
defer connectors.CloseConnector(srcConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -544,6 +544,10 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
defer connectors.CloseConnector(dstConn)

slog.InfoContext(ctx, fmt.Sprintf("replicating partition %s\n", partition.PartitionId))
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()

var stream *model.QRecordStream
bufferSize := shared.FetchAndChannelSize
Expand Down Expand Up @@ -593,11 +597,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
}

shutdown := utils.HeartbeatRoutine(ctx, 1*time.Minute, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
defer shutdown()

rowsSynced, err := dstConn.SyncQRepRecords(config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down Expand Up @@ -635,10 +634,9 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
} else if err != nil {
return err
}

defer connectors.CloseConnector(dstConn)

shutdown := utils.HeartbeatRoutine(ctx, 2*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
})
defer shutdown()
Expand Down Expand Up @@ -802,7 +800,21 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
attemptCount := 1
for {
activity.RecordHeartbeat(ctx, fmt.Sprintf("no new rows yet, attempt #%d", attemptCount))
time.Sleep(waitBetweenBatches)
waitUntil := time.Now().Add(waitBetweenBatches)
for {
sleep := time.Until(waitUntil)
if sleep > 15*time.Second {
sleep = 15 * time.Second
}
time.Sleep(sleep)

activity.RecordHeartbeat(ctx, "heartbeat while waiting before next batch")
if err := ctx.Err(); err != nil {
return fmt.Errorf("cancelled while waiting for new rows: %w", err)
} else if time.Now().After(waitUntil) {
break
}
}

result, err := pgSrcConn.CheckForUpdatedMaxValue(config, last)
if err != nil {
Expand Down Expand Up @@ -830,6 +842,11 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
}
defer connectors.CloseConnector(dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("renaming tables for job - %s", config.FlowJobName)
})
defer shutdown()

if config.Peer.Type == protos.DBType_SNOWFLAKE {
sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
Expand Down Expand Up @@ -949,7 +966,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
return nil
})

shutdown := utils.HeartbeatRoutine(ctx, 5*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "syncing xmin."
})
defer shutdown()
Expand Down
4 changes: 2 additions & 2 deletions flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ func (h *FlowRequestHandler) ValidatePeer(
defer conn.Close()

if req.Peer.Type == protos.DBType_POSTGRES {
version, err := conn.(*connpostgres.PostgresConnector).GetPostgresVersion()
isValid, version, err := conn.(*connpostgres.PostgresConnector).MajorVersionCheck(connpostgres.POSTGRES_12)
if err != nil {
slog.Error("/peer/validate: pg version check", slog.Any("error", err))
return nil, err
}

if version < 12 {
if !isValid {
return &protos.ValidatePeerResponse{
Status: protos.ValidatePeerStatus_INVALID,
Message: fmt.Sprintf("%s peer %s must be of version 12 or above. Current version: %d",
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
datasetTablesSet[*datasetTable] = struct{}{}
// log that table was created
c.logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
}

return &protos.SetupNormalizedTableBatchOutput{
Expand Down
14 changes: 6 additions & 8 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
stream *model.QRecordStream,
tableNameRowsMapping map[string]uint32,
) (*model.SyncResponse, error) {
activity.RecordHeartbeat(s.connector.ctx, time.Minute,
activity.RecordHeartbeat(s.connector.ctx,
fmt.Sprintf("Flow job %s: Obtaining Avro schema"+
" for destination table %s and sync batch ID %d",
req.FlowJobName, rawTableName, syncBatchID),
Expand Down Expand Up @@ -77,7 +77,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
return nil, fmt.Errorf("failed to update metadata: %v", err)
}

activity.RecordHeartbeat(s.connector.ctx, time.Minute,
activity.RecordHeartbeat(s.connector.ctx,
fmt.Sprintf("Flow job %s: performing insert and update transaction"+
" for destination table %s and sync batch ID %d",
req.FlowJobName, rawTableName, syncBatchID),
Expand Down Expand Up @@ -392,12 +392,10 @@ func (s *QRepAvroSyncMethod) writeToStage(
stream *model.QRecordStream,
flowName string,
) (int, error) {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, time.Minute,
func() string {
return fmt.Sprintf("writing to avro stage for objectFolder %s and staging table %s",
objectFolder, stagingTable)
},
)
shutdown := utils.HeartbeatRoutine(s.connector.ctx, func() string {
return fmt.Sprintf("writing to avro stage for objectFolder %s and staging table %s",
objectFolder, stagingTable)
})
defer shutdown()

var avroFile *avro.AvroFile
Expand Down
7 changes: 2 additions & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,8 @@ func (c *EventHubConnector) processBatch(
lastUpdatedOffset := int64(0)

numRecords := atomic.Uint32{}
shutdown := utils.HeartbeatRoutine(c.ctx, 10*time.Second, func() string {
return fmt.Sprintf(
"processed %d records for flow %s",
numRecords.Load(), flowJobName,
)
shutdown := utils.HeartbeatRoutine(c.ctx, func() string {
return fmt.Sprintf("processed %d records for flow %s", numRecords.Load(), flowJobName)
})
defer shutdown()

Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (p *PostgresCDCSource) consumeStream(
}
}()

shutdown := utils.HeartbeatRoutine(p.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(p.ctx, func() string {
jobName := p.flowJobName
currRecords := cdcRecordsStorage.Len()
return fmt.Sprintf("pulling records for job - %s, currently have %d records", jobName, currRecords)
Expand Down
42 changes: 18 additions & 24 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ import (
"github.com/lib/pq/oid"
)

type PGVersion int

const (
POSTGRES_12 PGVersion = 120000
POSTGRES_13 PGVersion = 130000
POSTGRES_15 PGVersion = 150000
)

const (
mirrorJobsTableIdentifier = "peerdb_mirror_jobs"
createMirrorJobsTableSQL = `CREATE TABLE IF NOT EXISTS %s.%s(mirror_job_name TEXT PRIMARY KEY,
Expand Down Expand Up @@ -305,7 +313,7 @@ func (c *PostgresConnector) createSlotAndPublication(

if !s.PublicationExists {
// check and enable publish_via_partition_root
supportsPubViaRoot, err := c.majorVersionCheck(130000)
supportsPubViaRoot, _, err := c.MajorVersionCheck(POSTGRES_13)
if err != nil {
return fmt.Errorf("error checking Postgres version: %w", err)
}
Expand Down Expand Up @@ -472,14 +480,14 @@ func (c *PostgresConnector) jobMetadataExists(jobName string) (bool, error) {
return result.Bool, nil
}

func (c *PostgresConnector) majorVersionCheck(majorVersion int) (bool, error) {
func (c *PostgresConnector) MajorVersionCheck(majorVersion PGVersion) (bool, int64, error) {
var version pgtype.Int8
err := c.pool.QueryRow(c.ctx, "SELECT current_setting('server_version_num')::INTEGER").Scan(&version)
if err != nil {
return false, fmt.Errorf("failed to get server version: %w", err)
return false, 0, fmt.Errorf("failed to get server version: %w", err)
}

return int(version.Int64) >= majorVersion, nil
return version.Int64 >= int64(majorVersion), version.Int64, nil
}

func (c *PostgresConnector) updateSyncMetadata(flowJobName string, lastCP int64, syncBatchID int64,
Expand Down Expand Up @@ -621,7 +629,12 @@ func (c *PostgresConnector) CheckReplicationPermissions(username string) error {
}

if !replicationRes {
return fmt.Errorf("postgres user does not have replication role")
// RDS case: check pg_settings for rds.logical_replication
var setting string
err := c.pool.QueryRow(c.ctx, "SELECT setting FROM pg_settings WHERE name = 'rds.logical_replication';").Scan(&setting)
if err != nil || setting != "on" {
return fmt.Errorf("postgres user does not have replication role")
}
}

// check wal_level
Expand Down Expand Up @@ -653,22 +666,3 @@ func (c *PostgresConnector) CheckReplicationPermissions(username string) error {

return nil
}

func (c *PostgresConnector) GetPostgresVersion() (int, error) {
if c.pool == nil {
return -1, fmt.Errorf("version check: pool is nil")
}

var versionRes string
err := c.pool.QueryRow(c.ctx, "SHOW server_version_num;").Scan(&versionRes)
if err != nil {
return -1, err
}

version, err := strconv.Atoi(versionRes)
if err != nil {
return -1, err
}

return version / 10000, nil
}
2 changes: 1 addition & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (c *PostgresConnector) NormalizeRecords(req *model.NormalizeRecordsRequest)
}
}()

supportsMerge, err := c.majorVersionCheck(150000)
supportsMerge, _, err := c.MajorVersionCheck(POSTGRES_15)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/geo"
Expand Down Expand Up @@ -84,7 +83,7 @@ func (qe *QRepQueryExecutor) executeQueryInTx(tx pgx.Tx, cursorName string, fetc
q := fmt.Sprintf("FETCH %d FROM %s", fetchSize, cursorName)

if !qe.testEnv {
shutdown := utils.HeartbeatRoutine(qe.ctx, 1*time.Minute, func() string {
shutdown := utils.HeartbeatRoutine(qe.ctx, func() string {
qe.logger.Info(fmt.Sprintf("still running '%s'...", q))
return fmt.Sprintf("running '%s'", q)
})
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *SnowflakeAvroSyncHandler) putFileToStage(avroFile *avro.AvroFile, stage
activity.RecordHeartbeat(s.connector.ctx, "putting file to stage")
putCmd := fmt.Sprintf("PUT file://%s @%s", avroFile.FilePath, stage)

shutdown := utils.HeartbeatRoutine(s.connector.ctx, 10*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(s.connector.ctx, func() string {
return fmt.Sprintf("putting file to stage %s", stage)
})
defer shutdown()
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func (c *SnowflakeConnector) SetupNormalizedTables(
return nil, fmt.Errorf("[sf] error while creating normalized table: %w", err)
}
tableExistsMapping[tableIdentifier] = false
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("created table %s", tableIdentifier))
}

return &protos.SetupNormalizedTableBatchOutput{
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"log/slog"
"os"
"sync/atomic"
"time"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -131,7 +130,7 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
numRows := atomic.Uint32{}

if p.ctx != nil {
shutdown := utils.HeartbeatRoutine(p.ctx, 30*time.Second, func() string {
shutdown := utils.HeartbeatRoutine(p.ctx, func() string {
written := numRows.Load()
return fmt.Sprintf("[avro] written %d rows to OCF", written)
})
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) func() {
shutdown := make(chan struct{})
Expand All @@ -26,7 +25,7 @@ func HeartbeatRoutine(
return
case <-ctx.Done():
return
case <-time.After(interval):
case <-time.After(15 * time.Second):
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func CDCFlowWorkflowWithConfig(
state.TableNameSchemaMapping = correctedTableNameSchemaMapping
renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 12 * time.Hour,
HeartbeatTimeout: 1 * time.Hour,
HeartbeatTimeout: time.Minute,
})
renameTablesFuture := workflow.ExecuteActivity(renameTablesCtx, flowable.RenameTables, renameOpts)
if err := renameTablesFuture.Get(renameTablesCtx, nil); err != nil {
Expand Down
Loading

0 comments on commit ceb7fe9

Please sign in to comment.