Skip to content

Commit

Permalink
remove code for handling using min,max values for partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 22, 2023
1 parent eb4024d commit 1de9388
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 207 deletions.
110 changes: 1 addition & 109 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
)

const qRepMetadataTableName = "_peerdb_query_replication_metadata"
Expand Down Expand Up @@ -63,36 +62,7 @@ func (c *PostgresConnector) GetQRepPartitions(
// log.Warnf("failed to lock table %s: %v", config.WatermarkTable, err)
// }

if config.NumRowsPerPartition > 0 {
return c.getNumRowsPartitions(tx, config, last)
}

minValue, maxValue, err := c.getMinMaxValues(tx, config, last)
if err != nil {
return nil, err
}

var partitions []*protos.QRepPartition
switch v := minValue.(type) {
case int64:
maxValue := maxValue.(int64) + 1
partitions, err = c.getIntPartitions(v, maxValue, config.BatchSizeInt)
case time.Time:
maxValue := maxValue.(time.Time).Add(time.Microsecond)
partitions, err = c.getTimePartitions(v, maxValue, config.BatchDurationSeconds)
// only hit when there is no data in the source table
case nil:
log.Warnf("no records to replicate for flow job %s, returning", config.FlowJobName)
return make([]*protos.QRepPartition, 0), nil
default:
return nil, fmt.Errorf("unsupported type: %T", v)
}

if err != nil {
return nil, err
}

return partitions, nil
return c.getNumRowsPartitions(tx, config, last)
}

func (c *PostgresConnector) setTransactionSnapshot(tx pgx.Tx) error {
Expand Down Expand Up @@ -607,81 +577,3 @@ func (c *PostgresConnector) isPartitionSynced(partitionID string) (bool, error)

return count > 0, nil
}

func (c *PostgresConnector) getTimePartitions(
start time.Time,
end time.Time,
batchDurationSeconds uint32,
) ([]*protos.QRepPartition, error) {
if batchDurationSeconds == 0 {
return nil, fmt.Errorf("batch duration must be greater than 0")
}

batchDuration := time.Duration(batchDurationSeconds) * time.Second
var partitions []*protos.QRepPartition

for start.Before(end) {
partitionEnd := start.Add(batchDuration)
if partitionEnd.After(end) {
partitionEnd = end
}

rangePartition := protos.PartitionRange{
Range: &protos.PartitionRange_TimestampRange{
TimestampRange: &protos.TimestampPartitionRange{
Start: timestamppb.New(start),
End: timestamppb.New(partitionEnd),
},
},
}

partitions = append(partitions, &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &rangePartition,
})

start = partitionEnd
}

return partitions, nil
}

func (c *PostgresConnector) getIntPartitions(
start int64, end int64, batchSizeInt uint32) ([]*protos.QRepPartition, error) {
var partitions []*protos.QRepPartition
batchSize := int64(batchSizeInt)

if batchSize == 0 {
return nil, fmt.Errorf("batch size cannot be 0")
}

for start < end {
partitionEnd := start + batchSize
// safeguard against integer overflow
if partitionEnd > end || partitionEnd < start {
partitionEnd = end
}

rangePartition := protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{
Start: start,
End: partitionEnd,
},
},
}

partitions = append(partitions, &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &rangePartition,
})

if partitionEnd == end {
break
}

start = partitionEnd
}

return partitions, nil
}
60 changes: 0 additions & 60 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,6 @@ type testCase struct {
wantErr bool
}

func newTestCase(schema string, name string, duration uint32, wantErr bool) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
query := fmt.Sprintf(
`SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`,
schemaQualifiedTable)
return &testCase{
name: name,
config: &protos.QRepConfig{
FlowJobName: "test_flow_job",
BatchDurationSeconds: duration,
Query: query,
WatermarkTable: schemaQualifiedTable,
WatermarkColumn: "from",
},
want: []*protos.QRepPartition{},
wantErr: wantErr,
}
}

func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum int) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
query := fmt.Sprintf(
Expand Down Expand Up @@ -155,47 +136,6 @@ func TestGetQRepPartitions(t *testing.T) {

// Define the test cases
testCases := []*testCase{
newTestCase(
schemaName,
"ensure all days are in 1 partition",
secondsInADay*100,
false,
).appendPartition(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"ensure all days are in 30 partitions",
secondsInADay,
false,
).appendPartitions(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
29,
).appendPartition(
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"ensure all days are in 60 partitions",
secondsInADay/2,
false,
).appendPartitions(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
58,
).appendPartition(
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"test for error condition with batch size 0",
0,
true,
),
newTestCaseForNumRows(
schemaName,
"ensure all rows are in 1 partition if num_rows_per_partition is size of table",
Expand Down
8 changes: 0 additions & 8 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,6 @@ func QRepFlowWorkflow(
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
}

if config.BatchDurationSeconds == 0 {
config.BatchDurationSeconds = 60
}

if config.BatchSizeInt == 0 {
config.BatchSizeInt = 10000
}

// register a signal handler to terminate the workflow
terminateWorkflow := false
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
Expand Down
34 changes: 14 additions & 20 deletions nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,11 @@ lazy_static::lazy_static! {
default_value: 10,
required: false,
},
QRepOptionType::Int {
name: "batch_size_int",
min_value: Some(1),
default_value: 1000,
required: false,
},
QRepOptionType::Int {
name: "batch_duration_timestamp",
min_value: Some(1),
default_value: 60,
required: false,
},
QRepOptionType::Int {
name: "num_rows_per_partition",
min_value: Some(0),
default_value: 0,
required: false,
min_value: Some(1),
default_value: 50000,
required: true,
},
QRepOptionType::Boolean {
name: "initial_copy_only",
Expand All @@ -112,7 +100,7 @@ lazy_static::lazy_static! {
}

pub fn process_options(
raw_opts: HashMap<&str, &SqlValue>,
mut raw_opts: HashMap<&str, &SqlValue>,
) -> anyhow::Result<HashMap<String, Value>> {
let mut opts: HashMap<String, Value> = HashMap::new();

Expand All @@ -124,7 +112,7 @@ pub fn process_options(
required,
accepted_values,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::SingleQuotedString(str) = raw_value {
if let Some(values) = accepted_values {
if !values.contains(&str.as_str()) {
Expand All @@ -147,7 +135,7 @@ pub fn process_options(
default_value,
required,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::Number(num_str, _) = raw_value {
let num = num_str.parse::<u32>()?;
if let Some(min) = min_value {
Expand All @@ -168,7 +156,7 @@ pub fn process_options(
}
QRepOptionType::StringArray { name } => {
// read it as a string and split on comma
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::SingleQuotedString(str) = raw_value {
let values: Vec<Value> = str
.split(',')
Expand All @@ -185,7 +173,7 @@ pub fn process_options(
default_value,
required,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::Boolean(b) = raw_value {
opts.insert((*name).to_string(), Value::Bool(*b));
} else {
Expand All @@ -201,5 +189,11 @@ pub fn process_options(
}
}

// all options processed have been removed from the map
// so any leftover keys are options that shouldn't be here
if raw_opts.len() > 0 {
anyhow::bail!("Unknown options for QRep mirrors: {:#?}", raw_opts.into_keys().collect::<Vec<&str>>());
}

Ok(opts)
}
10 changes: 0 additions & 10 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,6 @@ impl FlowGrpcClient {
cfg.wait_between_batches_seconds = n as u32;
}
}
"batch_size_int" => {
if let Some(n) = n.as_i64() {
cfg.batch_size_int = n as u32;
}
}
"batch_duration_timestamp" => {
if let Some(n) = n.as_i64() {
cfg.batch_duration_seconds = n as u32;
}
}
"num_rows_per_partition" => {
if let Some(n) = n.as_i64() {
cfg.num_rows_per_partition = n as u32;
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ message QRepConfig {
bool initial_copy_only = 8;
QRepSyncMode sync_mode = 9;

// DEPRECATED: eliminate when breaking changes are allowed.
uint32 batch_size_int = 10;
// DEPRECATED: eliminate when breaking changes are allowed.
uint32 batch_duration_seconds = 11;

uint32 max_parallel_workers = 12;
Expand Down

0 comments on commit 1de9388

Please sign in to comment.