Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove code for int,timestamp partitioning using min,max values #555

Merged
merged 4 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 1 addition & 109 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
)

const qRepMetadataTableName = "_peerdb_query_replication_metadata"
Expand Down Expand Up @@ -63,36 +62,7 @@ func (c *PostgresConnector) GetQRepPartitions(
// log.Warnf("failed to lock table %s: %v", config.WatermarkTable, err)
// }

if config.NumRowsPerPartition > 0 {
return c.getNumRowsPartitions(tx, config, last)
}

minValue, maxValue, err := c.getMinMaxValues(tx, config, last)
if err != nil {
return nil, err
}

var partitions []*protos.QRepPartition
switch v := minValue.(type) {
case int64:
maxValue := maxValue.(int64) + 1
partitions, err = c.getIntPartitions(v, maxValue, config.BatchSizeInt)
case time.Time:
maxValue := maxValue.(time.Time).Add(time.Microsecond)
partitions, err = c.getTimePartitions(v, maxValue, config.BatchDurationSeconds)
// only hit when there is no data in the source table
case nil:
log.Warnf("no records to replicate for flow job %s, returning", config.FlowJobName)
return make([]*protos.QRepPartition, 0), nil
default:
return nil, fmt.Errorf("unsupported type: %T", v)
}

if err != nil {
return nil, err
}

return partitions, nil
return c.getNumRowsPartitions(tx, config, last)
}

func (c *PostgresConnector) setTransactionSnapshot(tx pgx.Tx) error {
Expand Down Expand Up @@ -608,81 +578,3 @@ func (c *PostgresConnector) isPartitionSynced(partitionID string) (bool, error)

return count > 0, nil
}

func (c *PostgresConnector) getTimePartitions(
start time.Time,
end time.Time,
batchDurationSeconds uint32,
) ([]*protos.QRepPartition, error) {
if batchDurationSeconds == 0 {
return nil, fmt.Errorf("batch duration must be greater than 0")
}

batchDuration := time.Duration(batchDurationSeconds) * time.Second
var partitions []*protos.QRepPartition

for start.Before(end) {
partitionEnd := start.Add(batchDuration)
if partitionEnd.After(end) {
partitionEnd = end
}

rangePartition := protos.PartitionRange{
Range: &protos.PartitionRange_TimestampRange{
TimestampRange: &protos.TimestampPartitionRange{
Start: timestamppb.New(start),
End: timestamppb.New(partitionEnd),
},
},
}

partitions = append(partitions, &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &rangePartition,
})

start = partitionEnd
}

return partitions, nil
}

func (c *PostgresConnector) getIntPartitions(
start int64, end int64, batchSizeInt uint32) ([]*protos.QRepPartition, error) {
var partitions []*protos.QRepPartition
batchSize := int64(batchSizeInt)

if batchSize == 0 {
return nil, fmt.Errorf("batch size cannot be 0")
}

for start < end {
partitionEnd := start + batchSize
// safeguard against integer overflow
if partitionEnd > end || partitionEnd < start {
partitionEnd = end
}

rangePartition := protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{
Start: start,
End: partitionEnd,
},
},
}

partitions = append(partitions, &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &rangePartition,
})

if partitionEnd == end {
break
}

start = partitionEnd
}

return partitions, nil
}
60 changes: 0 additions & 60 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,6 @@ type testCase struct {
wantErr bool
}

func newTestCase(schema string, name string, duration uint32, wantErr bool) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
query := fmt.Sprintf(
`SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`,
schemaQualifiedTable)
return &testCase{
name: name,
config: &protos.QRepConfig{
FlowJobName: "test_flow_job",
BatchDurationSeconds: duration,
Query: query,
WatermarkTable: schemaQualifiedTable,
WatermarkColumn: "from",
},
want: []*protos.QRepPartition{},
wantErr: wantErr,
}
}

func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum int) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
query := fmt.Sprintf(
Expand Down Expand Up @@ -155,47 +136,6 @@ func TestGetQRepPartitions(t *testing.T) {

// Define the test cases
testCases := []*testCase{
newTestCase(
schemaName,
"ensure all days are in 1 partition",
secondsInADay*100,
false,
).appendPartition(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"ensure all days are in 30 partitions",
secondsInADay,
false,
).appendPartitions(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
29,
).appendPartition(
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"ensure all days are in 60 partitions",
secondsInADay/2,
false,
).appendPartitions(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
58,
).appendPartition(
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"test for error condition with batch size 0",
0,
true,
),
newTestCaseForNumRows(
schemaName,
"ensure all rows are in 1 partition if num_rows_per_partition is size of table",
Expand Down
1 change: 1 addition & 0 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig(
ret.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
}
ret.NumRowsPerPartition = 1000

return ret, nil
}
9 changes: 3 additions & 6 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple()

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}",
query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
Expand Down Expand Up @@ -153,7 +153,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}",
query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
Expand All @@ -173,7 +173,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

Expand Down Expand Up @@ -218,7 +217,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

Expand All @@ -240,7 +238,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}",
query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

sfPeer := s.sfHelper.Peer
Expand All @@ -263,7 +261,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(
// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

Expand Down
8 changes: 0 additions & 8 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,6 @@ func QRepFlowWorkflow(
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
}

if config.BatchDurationSeconds == 0 {
config.BatchDurationSeconds = 60
}

if config.BatchSizeInt == 0 {
config.BatchSizeInt = 10000
}

// register a signal handler to terminate the workflow
terminateWorkflow := false
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
Expand Down
34 changes: 14 additions & 20 deletions nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,11 @@ lazy_static::lazy_static! {
default_value: 10,
required: false,
},
QRepOptionType::Int {
name: "batch_size_int",
min_value: Some(1),
default_value: 1000,
required: false,
},
QRepOptionType::Int {
name: "batch_duration_timestamp",
min_value: Some(1),
default_value: 60,
required: false,
},
QRepOptionType::Int {
name: "num_rows_per_partition",
min_value: Some(0),
default_value: 0,
required: false,
min_value: Some(1),
default_value: 50000,
required: true,
},
QRepOptionType::Boolean {
name: "initial_copy_only",
Expand All @@ -112,7 +100,7 @@ lazy_static::lazy_static! {
}

pub fn process_options(
raw_opts: HashMap<&str, &SqlValue>,
mut raw_opts: HashMap<&str, &SqlValue>,
) -> anyhow::Result<HashMap<String, Value>> {
let mut opts: HashMap<String, Value> = HashMap::new();

Expand All @@ -124,7 +112,7 @@ pub fn process_options(
required,
accepted_values,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::SingleQuotedString(str) = raw_value {
if let Some(values) = accepted_values {
if !values.contains(&str.as_str()) {
Expand All @@ -147,7 +135,7 @@ pub fn process_options(
default_value,
required,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::Number(num_str, _) = raw_value {
let num = num_str.parse::<u32>()?;
if let Some(min) = min_value {
Expand All @@ -168,7 +156,7 @@ pub fn process_options(
}
QRepOptionType::StringArray { name } => {
// read it as a string and split on comma
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::SingleQuotedString(str) = raw_value {
let values: Vec<Value> = str
.split(',')
Expand All @@ -185,7 +173,7 @@ pub fn process_options(
default_value,
required,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::Boolean(b) = raw_value {
opts.insert((*name).to_string(), Value::Bool(*b));
} else {
Expand All @@ -201,5 +189,11 @@ pub fn process_options(
}
}

// all options processed have been removed from the map
// so any leftover keys are options that shouldn't be here
if !raw_opts.is_empty() {
anyhow::bail!("Unknown options for QRep mirrors: {:#?}", raw_opts.into_keys().collect::<Vec<&str>>());
}

Ok(opts)
}
10 changes: 0 additions & 10 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,6 @@ impl FlowGrpcClient {
cfg.wait_between_batches_seconds = n as u32;
}
}
"batch_size_int" => {
if let Some(n) = n.as_i64() {
cfg.batch_size_int = n as u32;
}
}
"batch_duration_timestamp" => {
if let Some(n) = n.as_i64() {
cfg.batch_duration_seconds = n as u32;
}
}
"num_rows_per_partition" => {
if let Some(n) = n.as_i64() {
cfg.num_rows_per_partition = n as u32;
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ message QRepConfig {
bool initial_copy_only = 8;
QRepSyncMode sync_mode = 9;

// DEPRECATED: eliminate when breaking changes are allowed.
uint32 batch_size_int = 10;
// DEPRECATED: eliminate when breaking changes are allowed.
uint32 batch_duration_seconds = 11;

uint32 max_parallel_workers = 12;
Expand Down
Loading