From 88b9008579695803e7f8e100d32c48cc86304a5b Mon Sep 17 00:00:00 2001 From: Demur Rumed Date: Tue, 5 Dec 2023 11:05:47 +0000 Subject: [PATCH] fix xmin ci Also fix poorly named method after marker changed from `txid_current()` to `txid_snapshot_min(txid_current_snapsnot())` Remove special casing for xmin in qrep code --- flow/activities/flowable.go | 6 ------ flow/connectors/postgres/qrep.go | 9 ++------- flow/connectors/postgres/qrep_query_executor.go | 2 +- flow/e2e/snowflake/qrep_flow_sf_test.go | 2 +- flow/e2e/test_utils.go | 7 +++++++ 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 33c944f993..bd460e9661 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -764,12 +764,6 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second } - if config.WatermarkColumn == "xmin" { - // for xmin we ignore the wait between batches, as seq scan time is - // extremely slow. - waitBetweenBatches = 10 * time.Second - } - srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer) if err != nil { return fmt.Errorf("failed to get qrep source connector: %w", err) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 5bde72370c..518d47686e 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -274,11 +274,6 @@ func (c *PostgresConnector) getMinMaxValues( func (c *PostgresConnector) CheckForUpdatedMaxValue(config *protos.QRepConfig, last *protos.QRepPartition) (bool, error) { - // for xmin lets always assume there are updates - if config.WatermarkColumn == "xmin" { - return true, nil - } - tx, err := c.pool.Begin(c.ctx) if err != nil { return false, fmt.Errorf("unable to begin transaction for getting max value: %w", err) @@ -571,9 +566,9 @@ func (c *PostgresConnector) PullXminRecordStream( var numRecords int if partition.Range != nil { - numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query, oldxid) + numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(stream, query, oldxid) } else { - numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentTxid(stream, query) + numRecords, currentSnapshotXmin, err = executor.ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin(stream, query) } if err != nil { return 0, currentSnapshotXmin, err diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 81c7095c66..85c0fb2cce 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -325,7 +325,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStream( return totalRecordsFetched, err } -func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentTxid( +func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamGettingCurrentSnapshotXmin( stream *model.QRecordStream, query string, args ...interface{}, diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index f39686d0bb..5f5b01404d 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -208,7 +208,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { qrepConfig.WatermarkColumn = "xmin" s.NoError(err) - e2e.RunQrepFlowWorkflow(env, qrepConfig) + e2e.RunXminFlowWorkflow(env, qrepConfig) // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index ac668ae2f5..3b36079f4c 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -49,6 +49,7 @@ func RegisterWorkflowsAndActivities(env *testsuite.TestWorkflowEnvironment) { env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) env.RegisterWorkflow(peerflow.NormalizeFlowWorkflow) env.RegisterWorkflow(peerflow.QRepFlowWorkflow) + env.RegisterWorkflow(peerflow.XminFlowWorkflow) env.RegisterWorkflow(peerflow.QRepPartitionWorkflow) env.RegisterActivity(&activities.FlowableActivity{}) env.RegisterActivity(&activities.SnapshotActivity{}) @@ -303,6 +304,12 @@ func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos. env.ExecuteWorkflow(peerflow.QRepFlowWorkflow, config, state) } +func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) { + state := peerflow.NewQRepFlowState() + time.Sleep(5 * time.Second) + env.ExecuteWorkflow(peerflow.XminFlowWorkflow, config, state) +} + func GetOwnersSchema() *model.QRecordSchema { return &model.QRecordSchema{ Fields: []*model.QField{