Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qrep pgvalue #1733

Merged
merged 36 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3e84281
remove flow/connectors/postgres/qrep_sql_sync.sql
serprex May 15, 2024
99cdfb5
get past type check
serprex May 17, 2024
5903292
xmin
serprex May 17, 2024
6c8e01b
qrep activities juggling
serprex May 17, 2024
6ad9ec0
ui
serprex May 18, 2024
434f42f
podman
serprex May 18, 2024
2b7971e
move checked state calc to settings, doesn't belong in layout
serprex May 18, 2024
5689cd2
copy any not bytes
serprex May 18, 2024
8751214
fix qvalue qrep
serprex May 21, 2024
dec8e17
test pg type system
serprex May 21, 2024
242a36f
wip
serprex May 22, 2024
51d4f00
pgvalue-qrep-copy
serprex May 22, 2024
6aaf468
Sketching out implementations, bit annoying that conn is private on p…
serprex May 23, 2024
d53ef5e
it builds
serprex May 24, 2024
da0fbe9
wtf numRecords
serprex May 24, 2024
d9b2165
lint
serprex May 24, 2024
02ba012
oops
serprex May 24, 2024
fe0fdf7
fix parameter interpolation
serprex May 24, 2024
1319b8f
properly interpolate timestamps
serprex May 24, 2024
3857e84
fix copy syntax
serprex May 24, 2024
aeffe5b
Check command tag for row counts
serprex May 24, 2024
2cf44d7
vendor pgx sanitize.go
serprex May 24, 2024
ef433c1
Rename interfaces, split up code
serprex May 25, 2024
047a3f0
fix lints
serprex May 25, 2024
d9dec77
Avoid double escape
serprex May 25, 2024
0074deb
Merge branch 'main' into pgvalue-qrep
serprex May 27, 2024
3d75557
oops
serprex May 27, 2024
8eb2dbe
Merge remote-tracking branch 'origin/main' into pgvalue-qrep
serprex May 27, 2024
995275b
bad merge?
serprex May 27, 2024
100aab7
bring back xmin heartbeat
serprex May 27, 2024
13efdeb
Merge remote-tracking branch 'origin/main' into pgvalue-qrep
serprex May 27, 2024
8d1550c
revert merge regression, always wait
serprex May 27, 2024
d41ecbb
Merge remote-tracking branch 'origin/main' into pgvalue-qrep
serprex May 27, 2024
a44bb8a
Delegate some specialized methods to sink implementation
serprex May 27, 2024
bb7ca88
Add back context.Canceled check again
serprex May 27, 2024
a3ee420
whitespace
serprex May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-peerdb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ if test -n "$USE_PODMAN"; then
fi

export PEERDB_VERSION_SHA_SHORT=local-$(git rev-parse --short HEAD)
exec $DOCKER compose -f docker-compose-dev.yml up --build $EXTRA_ARGS
exec $DOCKER compose $EXTRA_ARGS -f docker-compose-dev.yml up --build
1 change: 1 addition & 0 deletions flow/.golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
issues:
exclude-dirs:
- generated
- connectors/postgres/sanitize
linters:
enable:
- containedctx
Expand Down
157 changes: 51 additions & 106 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,26 @@ import (

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/yuin/gopher-lua"
"go.opentelemetry.io/otel/metric"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
connbigquery "github.com/PeerDB-io/peer-flow/connectors/bigquery"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_guages"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down Expand Up @@ -428,9 +430,40 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
logger.Info(fmt.Sprintf("replicating partitions for batch %d - size: %d",
partitions.BatchId, numPartitions),
)
for i, p := range partitions.Partitions {
for _, p := range partitions.Partitions {
logger.Info(fmt.Sprintf("batch-%d - replicating partition - %s", partitions.BatchId, p.PartitionId))
err := a.replicateQRepPartition(ctx, config, i+1, numPartitions, p, runUUID)
var err error
switch config.System {
case protos.TypeSystem_Q:
stream := model.NewQRecordStream(shared.FetchAndChannelSize)
outstream := stream
if config.Script != "" {
ls, err := utils.LoadScript(ctx, config.Script, utils.LuaPrintFn(func(s string) {
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s)
}))
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
}
lfn := ls.Env.RawGetString("transformRow")
if fn, ok := lfn.(*lua.LFunction); ok {
outstream = pua.AttachToStream(ls, fn, stream)
}
}
err = replicateQRepPartition(ctx, a, config, p, runUUID, stream, outstream,
connectors.QRepPullConnector.PullQRepRecords,
connectors.QRepSyncConnector.SyncQRepRecords,
)
case protos.TypeSystem_PG:
read, write := connpostgres.NewPgCopyPipe()
err = replicateQRepPartition(ctx, a, config, p, runUUID, write, read,
connectors.QRepPullPgConnector.PullPgQRepRecords,
connectors.QRepSyncPgConnector.SyncPgQRepRecords,
)
default:
err = fmt.Errorf("unknown type system %d", config.System)
}

if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
Expand Down Expand Up @@ -725,115 +758,27 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr
return nil, errors.New("create tables from existing is only supported on snowflake and bigquery")
}

// ReplicateXminPartition replicates a XminPartition from the source to the destination.
func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
config *protos.QRepConfig,
partition *protos.QRepPartition,
runUUID string,
) (int64, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

logger.Info("replicating xmin")
shutdown := heartbeatRoutine(ctx, func() string {
return "syncing xmin"
})
defer shutdown()

startTime := time.Now()
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetQRepSyncConnector(ctx, config.DestinationPeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

bufferSize := shared.FetchAndChannelSize
errGroup, errCtx := errgroup.WithContext(ctx)

stream := model.NewQRecordStream(bufferSize)

var currentSnapshotXmin int64
var rowsSynced int
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
var pullErr error
var numRecords int
numRecords, currentSnapshotXmin, pullErr = pgConn.PullXminRecordStream(ctx, config, partition, stream)
if pullErr != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
logger.Warn(fmt.Sprintf("[xmin] failed to pull records: %v", err))
return err
}

// The first sync of an XMIN mirror will have a partition without a range
// A nil range is not supported by the catalog mirror monitor functions below
// So I'm creating a partition with a range of 0 to numRecords
partitionForMetrics := partition
if partition.Range == nil {
partitionForMetrics = &protos.QRepPartition{
PartitionId: partition.PartitionId,
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)},
},
},
}
}
updateErr := monitoring.InitializeQRepRun(
ctx, a.CatalogPool, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}

err := monitoring.UpdateStartTimeForPartition(ctx, a.CatalogPool, runUUID, partition, startTime)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = monitoring.UpdatePullEndTimeAndRowsForPartition(
errCtx, a.CatalogPool, runUUID, partition, int64(numRecords))
if err != nil {
logger.Error(err.Error())
return err
}

return nil
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(ctx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
return context.Canceled
})

if err := errGroup.Wait(); err != nil && err != context.Canceled {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return 0, err
}

if rowsSynced > 0 {
logger.Info(fmt.Sprintf("pushed %d records", rowsSynced))
err := monitoring.UpdateRowsSyncedForPartition(ctx, a.CatalogPool, rowsSynced, runUUID, partition)
if err != nil {
return 0, err
}
switch config.System {
case protos.TypeSystem_Q:
stream := model.NewQRecordStream(shared.FetchAndChannelSize)
return replicateXminPartition(ctx, a, config, partition, runUUID,
stream, stream,
(*connpostgres.PostgresConnector).PullXminRecordStream,
connectors.QRepSyncConnector.SyncQRepRecords)
case protos.TypeSystem_PG:
pgread, pgwrite := connpostgres.NewPgCopyPipe()
return replicateXminPartition(ctx, a, config, partition, runUUID,
pgwrite, pgread,
(*connpostgres.PostgresConnector).PullXminPgRecordStream,
connectors.QRepSyncPgConnector.SyncPgQRepRecords)
default:
return 0, fmt.Errorf("unknown type system %d", config.System)
}

err = monitoring.UpdateEndTimeForPartition(ctx, a.CatalogPool, runUUID, partition)
if err != nil {
return 0, err
}

return currentSnapshotXmin, nil
}

func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *protos.FlowConnectionConfigs,
Expand Down
Loading
Loading