Skip to content

Commit

Permalink
xmin
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 17, 2024
1 parent 1f544e7 commit 8db6fb3
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 105 deletions.
118 changes: 13 additions & 105 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
Expand Down Expand Up @@ -717,116 +716,25 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr
return nil, errors.New("create tables from existing is only supported on snowflake and bigquery")
}

// ReplicateXminPartition replicates a XminPartition from the source to the destination.
func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
runUUID string,
) (int64, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

startTime := time.Now()
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("replicating xmin")

bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)

stream := model.NewQRecordStream(bufferSize)

var currentSnapshotXmin int64
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
var pullErr error
var numRecords int
numRecords, currentSnapshotXmin, pullErr = pgConn.PullXminRecordStream(ctx, config, partition, stream)
if pullErr != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
logger.Warn(fmt.Sprintf("[xmin] failed to pull records: %v", err))
return err
}

// The first sync of an XMIN mirror will have a partition without a range
// A nil range is not supported by the catalog mirror monitor functions below
// So I'm creating a partition with a range of 0 to numRecords
partitionForMetrics := partition
if partition.Range == nil {
partitionForMetrics = &protos.QRepPartition{
PartitionId: partition.PartitionId,
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)},
},
},
}
}
updateErr := monitoring.InitializeQRepRun(
ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}

err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
logger.Error(err.Error())
return err
}

return nil
})

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "syncing xmin."
})
defer shutdown()

rowsSynced, err := dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, fmt.Errorf("failed to sync records: %w", err)
switch config.System {
case protos.TypeSystem_Q:
return replicateXminPartition(ctx, a, config, partition, runUUID,
model.NewQRecordStream(shared.FetchAndChannelSize),
(*connpostgres.PostgresConnector).PullXminRecordStream,
connectors.QRepSyncConnector.SyncQRepRecords)
case protos.TypeSystem_PG:
return replicateXminPartition(ctx, a, config, partition, runUUID,
model.NewRecordStream[[]byte](shared.FetchAndChannelSize),
(*connpostgres.PostgresConnector).PullXminPgRecordStream,
connectors.QRepSyncPgConnector.SyncPgQRepRecords)
default:
return 0, fmt.Errorf("unknown type system %d", config.System)
}

if rowsSynced == 0 {
logger.Info("no records to push for xmin")
} else {
err := errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, err
}

err = monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}

logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
if err != nil {
return 0, err
}

return currentSnapshotXmin, nil
}

func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs,
Expand Down
118 changes: 118 additions & 0 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -403,3 +404,120 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,

return monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
}

// replicateXminPartition replicates a XminPartition from the source to the destination.
func replicateXminPartition[T any, TSync connectors.QRepSyncConnectorCore](ctx context.Context,
a *FlowableActivity,
config *protos.QRepConfig,
partition *protos.QRepPartition,
runUUID string,
stream *model.RecordStream[T],
pullRecords func(
*connpostgres.PostgresConnector,
context.Context, *protos.QRepConfig,
*protos.QRepPartition,
*model.RecordStream[T],
) (int, int64, error),
syncRecords func(TSync, context.Context, *protos.QRepConfig, *protos.QRepPartition, *model.RecordStream[T]) (int, error),
) (int64, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

startTime := time.Now()
srcConn, err := connectors.GetAs[*connpostgres.PostgresConnector](ctx, config.SourcePeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetAs[TSync](ctx, config.DestinationPeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info("replicating xmin")

errGroup, errCtx := errgroup.WithContext(ctx)

var currentSnapshotXmin int64
errGroup.Go(func() error {
var pullErr error
var numRecords int
numRecords, currentSnapshotXmin, pullErr = pullRecords(srcConn, ctx, config, partition, stream)
if pullErr != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
logger.Warn(fmt.Sprintf("[xmin] failed to pull records: %v", err))
return err
}

// The first sync of an XMIN mirror will have a partition without a range
// A nil range is not supported by the catalog mirror monitor functions below
// So I'm creating a partition with a range of 0 to numRecords
partitionForMetrics := partition
if partition.Range == nil {
partitionForMetrics = &protos.QRepPartition{
PartitionId: partition.PartitionId,
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)},
},
},
}
}
updateErr := monitoring.InitializeQRepRun(
ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}

err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
logger.Error(err.Error())
return err
}

return nil
})

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return "syncing xmin."
})
defer shutdown()

rowsSynced, err := syncRecords(dstConn, ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, fmt.Errorf("failed to sync records: %w", err)
}

if rowsSynced == 0 {
logger.Info("no records to push for xmin")
} else {
err := errGroup.Wait()
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, err
}

err = monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}

logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
if err != nil {
return 0, err
}

return currentSnapshotXmin, nil
}
10 changes: 10 additions & 0 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,16 @@ func (c *PostgresConnector) PullXminRecordStream(
(*QRepQueryExecutor).mapRowToQRecord)
}

func (c *PostgresConnector) PullXminPgRecordStream(
ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
stream *model.PgRecordStream,
) (int, int64, error) {
return pullXminRecordStream(c, ctx, config, partition, stream,
(*QRepQueryExecutor).mapRowToPgRecord)
}

func pullXminRecordStream[T any](
c *PostgresConnector,
ctx context.Context,
Expand Down

0 comments on commit 8db6fb3

Please sign in to comment.