Skip to content

Commit

Permalink
Merge branch 'main' into strings-cut
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 5, 2023
2 parents 0f8c301 + 4274fab commit ca9e713
Show file tree
Hide file tree
Showing 9 changed files with 1,578 additions and 1,676 deletions.
6 changes: 0 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,12 +764,6 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
}

if config.WatermarkColumn == "xmin" {
// for xmin we ignore the wait between batches, as seq scan time is
// extremely slow.
waitBetweenBatches = 10 * time.Second
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down
9 changes: 2 additions & 7 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,6 @@ func (c *PostgresConnector) getMinMaxValues(

func (c *PostgresConnector) CheckForUpdatedMaxValue(config *protos.QRepConfig,
last *protos.QRepPartition) (bool, error) {
// for xmin lets always assume there are updates
if config.WatermarkColumn == "xmin" {
return true, nil
}

tx, err := c.pool.Begin(c.ctx)
if err != nil {
return false, fmt.Errorf("unable to begin transaction for getting max value: %w", err)
Expand Down Expand Up @@ -571,9 +566,9 @@ func (c *PostgresConnector) PullXminRecordStream(

var numRecords int
if partition.Range != nil {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, oldxid)
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(stream, query, oldxid)
} else {
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query)
numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(stream, query)
}
if err != nil {
return 0, currentSnapshotXmin, err
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream(
return totalRecordsFetched, err
}

func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentTxid(
func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(
stream *model.QRecordStream,
query string,
args ...interface{},
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
qrepConfig.WatermarkColumn = "xmin"
s.NoError(err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)
e2e.RunXminFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())
Expand Down
10 changes: 9 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.temporal.io/sdk/testsuite"
)

// readFileToBytes reads a file to a byte array.
// ReadFileToBytes reads a file to a byte array.
func ReadFileToBytes(path string) ([]byte, error) {
var ret []byte

Expand Down Expand Up @@ -49,6 +49,7 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment) {
env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
env.RegisterWorkflow(peerflow.NormalizeFlowWorkflow)
env.RegisterWorkflow(peerflow.QRepFlowWorkflow)
env.RegisterWorkflow(peerflow.XminFlowWorkflow)
env.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
env.RegisterActivity(&activities.FlowableActivity{})
env.RegisterActivity(&activities.SnapshotActivity{})
Expand Down Expand Up @@ -303,6 +304,13 @@ func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.
env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state)
}

func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) {
state := peerflow.NewQRepFlowState()
state.LastPartition.PartitionId = uuid.New().String()
time.Sleep(5 * time.Second)
env.ExecuteWorkflow(peerflow.XminFlowWorkflow, config, state)
}

func GetOwnersSchema() *model.QRecordSchema {
return &model.QRecordSchema{
Fields: []*model.QField{
Expand Down
22 changes: 7 additions & 15 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (q *XminFlowExecution) consolidatePartitions(ctx workflow.Context) error {
return fmt.Errorf("failed to cleanup qrep flow: %w", err)
}

q.logger.Info("qrep flow cleaned up")
q.logger.Info("xmin flow cleaned up")

return nil
}
Expand All @@ -186,14 +186,6 @@ func XminFlowWorkflow(
config *protos.QRepConfig,
state *protos.QRepFlowState,
) error {
// register a query to get the number of partitions processed
err := workflow.SetQueryHandler(ctx, "num-partitions-processed", func() (uint64, error) {
return state.NumPartitionsProcessed, nil
})
if err != nil {
return fmt.Errorf("failed to register query handler: %w", err)
}

// get xmin run uuid via side-effect
runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New().String()
Expand All @@ -206,7 +198,7 @@ func XminFlowWorkflow(

q := NewXminFlowExecution(ctx, config, runUUID)

err = q.SetupWatermarkTableOnDestination(ctx)
err := q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
return fmt.Errorf("failed to setup watermark table: %w", err)
}
Expand Down Expand Up @@ -238,9 +230,8 @@ func XminFlowWorkflow(
return fmt.Errorf("xmin replication failed: %w", err)
}

state.LastPartition = &protos.QRepPartition{
PartitionId: q.runUUID,
Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}},
if err = q.consolidatePartitions(ctx); err != nil {
return err
}

if config.InitialCopyOnly {
Expand All @@ -253,8 +244,9 @@ func XminFlowWorkflow(
return err
}

if err = q.consolidatePartitions(ctx); err != nil {
return err
state.LastPartition = &protos.QRepPartition{
PartitionId: q.runUUID,
Range: &protos.PartitionRange{Range: &protos.PartitionRange_IntRange{IntRange: &protos.IntPartitionRange{Start: lastPartition}}},
}

workflow.GetLogger(ctx).Info("Continuing as new workflow",
Expand Down
Loading

0 comments on commit ca9e713

Please sign in to comment.