From 1de9388cf2ec7c9f4200a52dbc817670936b374f Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Sun, 22 Oct 2023 22:41:22 +0530 Subject: [PATCH 1/3] remove code for handling using min,max values for partitioning --- flow/connectors/postgres/qrep.go | 110 +----------------- .../postgres/qrep_partition_test.go | 60 ---------- flow/workflows/qrep_flow.go | 8 -- nexus/analyzer/src/qrep.rs | 34 +++--- nexus/flow-rs/src/grpc.rs | 10 -- protos/flow.proto | 2 + 6 files changed, 17 insertions(+), 207 deletions(-) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index 4ee4cf9a29..e240d2b7d6 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -14,7 +14,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" log "github.com/sirupsen/logrus" - "google.golang.org/protobuf/types/known/timestamppb" ) const qRepMetadataTableName = "_peerdb_query_replication_metadata" @@ -63,36 +62,7 @@ func (c *PostgresConnector) GetQRepPartitions( // log.Warnf("failed to lock table %s: %v", config.WatermarkTable, err) // } - if config.NumRowsPerPartition > 0 { - return c.getNumRowsPartitions(tx, config, last) - } - - minValue, maxValue, err := c.getMinMaxValues(tx, config, last) - if err != nil { - return nil, err - } - - var partitions []*protos.QRepPartition - switch v := minValue.(type) { - case int64: - maxValue := maxValue.(int64) + 1 - partitions, err = c.getIntPartitions(v, maxValue, config.BatchSizeInt) - case time.Time: - maxValue := maxValue.(time.Time).Add(time.Microsecond) - partitions, err = c.getTimePartitions(v, maxValue, config.BatchDurationSeconds) - // only hit when there is no data in the source table - case nil: - log.Warnf("no records to replicate for flow job %s, returning", config.FlowJobName) - return make([]*protos.QRepPartition, 0), nil - default: - return nil, fmt.Errorf("unsupported type: %T", v) - } - - if err != nil { - return nil, err - } - - return partitions, nil + return c.getNumRowsPartitions(tx, config, last) } func (c *PostgresConnector) setTransactionSnapshot(tx pgx.Tx) error { @@ -607,81 +577,3 @@ func (c *PostgresConnector) isPartitionSynced(partitionID string) (bool, error) return count > 0, nil } - -func (c *PostgresConnector) getTimePartitions( - start time.Time, - end time.Time, - batchDurationSeconds uint32, -) ([]*protos.QRepPartition, error) { - if batchDurationSeconds == 0 { - return nil, fmt.Errorf("batch duration must be greater than 0") - } - - batchDuration := time.Duration(batchDurationSeconds) * time.Second - var partitions []*protos.QRepPartition - - for start.Before(end) { - partitionEnd := start.Add(batchDuration) - if partitionEnd.After(end) { - partitionEnd = end - } - - rangePartition := protos.PartitionRange{ - Range: &protos.PartitionRange_TimestampRange{ - TimestampRange: &protos.TimestampPartitionRange{ - Start: timestamppb.New(start), - End: timestamppb.New(partitionEnd), - }, - }, - } - - partitions = append(partitions, &protos.QRepPartition{ - PartitionId: uuid.New().String(), - Range: &rangePartition, - }) - - start = partitionEnd - } - - return partitions, nil -} - -func (c *PostgresConnector) getIntPartitions( - start int64, end int64, batchSizeInt uint32) ([]*protos.QRepPartition, error) { - var partitions []*protos.QRepPartition - batchSize := int64(batchSizeInt) - - if batchSize == 0 { - return nil, fmt.Errorf("batch size cannot be 0") - } - - for start < end { - partitionEnd := start + batchSize - // safeguard against integer overflow - if partitionEnd > end || partitionEnd < start { - partitionEnd = end - } - - rangePartition := protos.PartitionRange{ - Range: &protos.PartitionRange_IntRange{ - IntRange: &protos.IntPartitionRange{ - Start: start, - End: partitionEnd, - }, - }, - } - - partitions = append(partitions, &protos.QRepPartition{ - PartitionId: uuid.New().String(), - Range: &rangePartition, - }) - - if partitionEnd == end { - break - } - - start = partitionEnd - } - - return partitions, nil -} diff --git a/flow/connectors/postgres/qrep_partition_test.go b/flow/connectors/postgres/qrep_partition_test.go index ac084817d3..a4e90a8544 100644 --- a/flow/connectors/postgres/qrep_partition_test.go +++ b/flow/connectors/postgres/qrep_partition_test.go @@ -22,25 +22,6 @@ type testCase struct { wantErr bool } -func newTestCase(schema string, name string, duration uint32, wantErr bool) *testCase { - schemaQualifiedTable := fmt.Sprintf("%s.test", schema) - query := fmt.Sprintf( - `SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`, - schemaQualifiedTable) - return &testCase{ - name: name, - config: &protos.QRepConfig{ - FlowJobName: "test_flow_job", - BatchDurationSeconds: duration, - Query: query, - WatermarkTable: schemaQualifiedTable, - WatermarkColumn: "from", - }, - want: []*protos.QRepPartition{}, - wantErr: wantErr, - } -} - func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum int) *testCase { schemaQualifiedTable := fmt.Sprintf("%s.test", schema) query := fmt.Sprintf( @@ -155,47 +136,6 @@ func TestGetQRepPartitions(t *testing.T) { // Define the test cases testCases := []*testCase{ - newTestCase( - schemaName, - "ensure all days are in 1 partition", - secondsInADay*100, - false, - ).appendPartition( - time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC), - time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC), - ), - newTestCase( - schemaName, - "ensure all days are in 30 partitions", - secondsInADay, - false, - ).appendPartitions( - time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC), - time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC), - 29, - ).appendPartition( - time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC), - time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC), - ), - newTestCase( - schemaName, - "ensure all days are in 60 partitions", - secondsInADay/2, - false, - ).appendPartitions( - time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC), - time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC), - 58, - ).appendPartition( - time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC), - time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC), - ), - newTestCase( - schemaName, - "test for error condition with batch size 0", - 0, - true, - ), newTestCaseForNumRows( schemaName, "ensure all rows are in 1 partition if num_rows_per_partition is size of table", diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index abe1338bf8..4debe11b3a 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -262,14 +262,6 @@ func QRepFlowWorkflow( waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second } - if config.BatchDurationSeconds == 0 { - config.BatchDurationSeconds = 60 - } - - if config.BatchSizeInt == 0 { - config.BatchSizeInt = 10000 - } - // register a signal handler to terminate the workflow terminateWorkflow := false signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName) diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index dda1a92478..dcc379c5ac 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -80,23 +80,11 @@ lazy_static::lazy_static! { default_value: 10, required: false, }, - QRepOptionType::Int { - name: "batch_size_int", - min_value: Some(1), - default_value: 1000, - required: false, - }, - QRepOptionType::Int { - name: "batch_duration_timestamp", - min_value: Some(1), - default_value: 60, - required: false, - }, QRepOptionType::Int { name: "num_rows_per_partition", - min_value: Some(0), - default_value: 0, - required: false, + min_value: Some(1), + default_value: 50000, + required: true, }, QRepOptionType::Boolean { name: "initial_copy_only", @@ -112,7 +100,7 @@ lazy_static::lazy_static! { } pub fn process_options( - raw_opts: HashMap<&str, &SqlValue>, + mut raw_opts: HashMap<&str, &SqlValue>, ) -> anyhow::Result> { let mut opts: HashMap = HashMap::new(); @@ -124,7 +112,7 @@ pub fn process_options( required, accepted_values, } => { - if let Some(raw_value) = raw_opts.get(*name) { + if let Some(raw_value) = raw_opts.remove(*name) { if let SqlValue::SingleQuotedString(str) = raw_value { if let Some(values) = accepted_values { if !values.contains(&str.as_str()) { @@ -147,7 +135,7 @@ pub fn process_options( default_value, required, } => { - if let Some(raw_value) = raw_opts.get(*name) { + if let Some(raw_value) = raw_opts.remove(*name) { if let SqlValue::Number(num_str, _) = raw_value { let num = num_str.parse::()?; if let Some(min) = min_value { @@ -168,7 +156,7 @@ pub fn process_options( } QRepOptionType::StringArray { name } => { // read it as a string and split on comma - if let Some(raw_value) = raw_opts.get(*name) { + if let Some(raw_value) = raw_opts.remove(*name) { if let SqlValue::SingleQuotedString(str) = raw_value { let values: Vec = str .split(',') @@ -185,7 +173,7 @@ pub fn process_options( default_value, required, } => { - if let Some(raw_value) = raw_opts.get(*name) { + if let Some(raw_value) = raw_opts.remove(*name) { if let SqlValue::Boolean(b) = raw_value { opts.insert((*name).to_string(), Value::Bool(*b)); } else { @@ -201,5 +189,11 @@ pub fn process_options( } } + // all options processed have been removed from the map + // so any leftover keys are options that shouldn't be here + if raw_opts.len() > 0 { + anyhow::bail!("Unknown options for QRep mirrors: {:#?}", raw_opts.into_keys().collect::>()); + } + Ok(opts) } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 028bfd4566..88692f3853 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -264,16 +264,6 @@ impl FlowGrpcClient { cfg.wait_between_batches_seconds = n as u32; } } - "batch_size_int" => { - if let Some(n) = n.as_i64() { - cfg.batch_size_int = n as u32; - } - } - "batch_duration_timestamp" => { - if let Some(n) = n.as_i64() { - cfg.batch_duration_seconds = n as u32; - } - } "num_rows_per_partition" => { if let Some(n) = n.as_i64() { cfg.num_rows_per_partition = n as u32; diff --git a/protos/flow.proto b/protos/flow.proto index 0ce5bc44b4..e6676df93b 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -251,7 +251,9 @@ message QRepConfig { bool initial_copy_only = 8; QRepSyncMode sync_mode = 9; + // DEPRECATED: eliminate when breaking changes are allowed. uint32 batch_size_int = 10; + // DEPRECATED: eliminate when breaking changes are allowed. uint32 batch_duration_seconds = 11; uint32 max_parallel_workers = 12; From 9ffdf07ad61f4521c24b7ab4479425c917043dcd Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 23 Oct 2023 00:03:55 +0530 Subject: [PATCH 2/3] fixing lints, and tests pt.1 --- flow/e2e/congen.go | 1 + nexus/analyzer/src/qrep.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index 02f5b21947..c5228258b0 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -199,6 +199,7 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig( ret.WriteMode = &protos.QRepWriteMode{ WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND, } + ret.NumRowsPerPartition = 1000 return ret, nil } diff --git a/nexus/analyzer/src/qrep.rs b/nexus/analyzer/src/qrep.rs index dcc379c5ac..adf290acfe 100644 --- a/nexus/analyzer/src/qrep.rs +++ b/nexus/analyzer/src/qrep.rs @@ -191,7 +191,7 @@ pub fn process_options( // all options processed have been removed from the map // so any leftover keys are options that shouldn't be here - if raw_opts.len() > 0 { + if !raw_opts.is_empty() { anyhow::bail!("Unknown options for QRep mirrors: {:#?}", raw_opts.into_keys().collect::>()); } From fab9a742bb68e07e2c4ba60c102dc458d39075fc Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Mon, 23 Oct 2023 12:12:36 +0530 Subject: [PATCH 3/3] fixing tests pt.2 --- flow/e2e/snowflake/qrep_flow_sf_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 8da1df3c5f..cdcfaeca98 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -108,7 +108,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple() dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) - query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", snowflakeSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( @@ -153,7 +153,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) - query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", snowflakeSuffix, tblName) qrepConfig, err := e2e.CreateQRepWorkflowConfig( @@ -173,7 +173,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) - // assert that error contains "invalid connection configs" err = env.GetWorkflowError() s.NoError(err) @@ -218,7 +217,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) - // assert that error contains "invalid connection configs" err = env.GetWorkflowError() s.NoError(err) @@ -240,7 +238,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) - query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}", + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}", snowflakeSuffix, tblName) sfPeer := s.sfHelper.Peer @@ -263,7 +261,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration( // Verify workflow completes without error s.True(env.IsWorkflowCompleted()) - // assert that error contains "invalid connection configs" err = env.GetWorkflowError() s.NoError(err)