Skip to content

Commit

Permalink
remove code for int,timestamp partitioning using min,max values (#555)
Browse files Browse the repository at this point in the history
Only mechanism for partitioning will be `num_rows_per_partition` which
is now a mandatory option. Unrecognized options in QRep mirror commands
will now cause an error from the SQL interface layer.
  • Loading branch information
heavycrystal authored Oct 23, 2023
1 parent b1be612 commit 909bbb1
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 213 deletions.
110 changes: 1 addition & 109 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
)

const qRepMetadataTableName = "_peerdb_query_replication_metadata"
Expand Down Expand Up @@ -63,36 +62,7 @@ func (c *PostgresConnector) GetQRepPartitions(
// log.Warnf("failed to lock table %s: %v", config.WatermarkTable, err)
// }

if config.NumRowsPerPartition > 0 {
return c.getNumRowsPartitions(tx, config, last)
}

minValue, maxValue, err := c.getMinMaxValues(tx, config, last)
if err != nil {
return nil, err
}

var partitions []*protos.QRepPartition
switch v := minValue.(type) {
case int64:
maxValue := maxValue.(int64) + 1
partitions, err = c.getIntPartitions(v, maxValue, config.BatchSizeInt)
case time.Time:
maxValue := maxValue.(time.Time).Add(time.Microsecond)
partitions, err = c.getTimePartitions(v, maxValue, config.BatchDurationSeconds)
// only hit when there is no data in the source table
case nil:
log.Warnf("no records to replicate for flow job %s, returning", config.FlowJobName)
return make([]*protos.QRepPartition, 0), nil
default:
return nil, fmt.Errorf("unsupported type: %T", v)
}

if err != nil {
return nil, err
}

return partitions, nil
return c.getNumRowsPartitions(tx, config, last)
}

func (c *PostgresConnector) setTransactionSnapshot(tx pgx.Tx) error {
Expand Down Expand Up @@ -608,81 +578,3 @@ func (c *PostgresConnector) isPartitionSynced(partitionID string) (bool, error)

return count > 0, nil
}

func (c *PostgresConnector) getTimePartitions(
start time.Time,
end time.Time,
batchDurationSeconds uint32,
) ([]*protos.QRepPartition, error) {
if batchDurationSeconds == 0 {
return nil, fmt.Errorf("batch duration must be greater than 0")
}

batchDuration := time.Duration(batchDurationSeconds) * time.Second
var partitions []*protos.QRepPartition

for start.Before(end) {
partitionEnd := start.Add(batchDuration)
if partitionEnd.After(end) {
partitionEnd = end
}

rangePartition := protos.PartitionRange{
Range: &protos.PartitionRange_TimestampRange{
TimestampRange: &protos.TimestampPartitionRange{
Start: timestamppb.New(start),
End: timestamppb.New(partitionEnd),
},
},
}

partitions = append(partitions, &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &rangePartition,
})

start = partitionEnd
}

return partitions, nil
}

func (c *PostgresConnector) getIntPartitions(
start int64, end int64, batchSizeInt uint32) ([]*protos.QRepPartition, error) {
var partitions []*protos.QRepPartition
batchSize := int64(batchSizeInt)

if batchSize == 0 {
return nil, fmt.Errorf("batch size cannot be 0")
}

for start < end {
partitionEnd := start + batchSize
// safeguard against integer overflow
if partitionEnd > end || partitionEnd < start {
partitionEnd = end
}

rangePartition := protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{
Start: start,
End: partitionEnd,
},
},
}

partitions = append(partitions, &protos.QRepPartition{
PartitionId: uuid.New().String(),
Range: &rangePartition,
})

if partitionEnd == end {
break
}

start = partitionEnd
}

return partitions, nil
}
60 changes: 0 additions & 60 deletions flow/connectors/postgres/qrep_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,6 @@ type testCase struct {
wantErr bool
}

func newTestCase(schema string, name string, duration uint32, wantErr bool) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
query := fmt.Sprintf(
`SELECT * FROM %s WHERE "from" >= {{.start}} AND "from" < {{.end}}`,
schemaQualifiedTable)
return &testCase{
name: name,
config: &protos.QRepConfig{
FlowJobName: "test_flow_job",
BatchDurationSeconds: duration,
Query: query,
WatermarkTable: schemaQualifiedTable,
WatermarkColumn: "from",
},
want: []*protos.QRepPartition{},
wantErr: wantErr,
}
}

func newTestCaseForNumRows(schema string, name string, rows uint32, expectedNum int) *testCase {
schemaQualifiedTable := fmt.Sprintf("%s.test", schema)
query := fmt.Sprintf(
Expand Down Expand Up @@ -155,47 +136,6 @@ func TestGetQRepPartitions(t *testing.T) {

// Define the test cases
testCases := []*testCase{
newTestCase(
schemaName,
"ensure all days are in 1 partition",
secondsInADay*100,
false,
).appendPartition(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"ensure all days are in 30 partitions",
secondsInADay,
false,
).appendPartitions(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
29,
).appendPartition(
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"ensure all days are in 60 partitions",
secondsInADay/2,
false,
).appendPartitions(
time.Date(2010, time.January, 1, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
58,
).appendPartition(
time.Date(2010, time.January, 30, 10, 0, 0, 0, time.UTC),
time.Date(2010, time.January, 30, 10, 0, 0, 1000, time.UTC),
),
newTestCase(
schemaName,
"test for error condition with batch size 0",
0,
true,
),
newTestCaseForNumRows(
schemaName,
"ensure all rows are in 1 partition if num_rows_per_partition is size of table",
Expand Down
1 change: 1 addition & 0 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig(
ret.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
}
ret.NumRowsPerPartition = 1000

return ret, nil
}
9 changes: 3 additions & 6 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple()

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}",
query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
Expand Down Expand Up @@ -153,7 +153,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}",
query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
Expand All @@ -173,7 +173,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

Expand Down Expand Up @@ -218,7 +217,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

Expand All @@ -240,7 +238,7 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at >= {{.start}} AND updated_at < {{.end}}",
query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE updated_at BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

sfPeer := s.sfHelper.Peer
Expand All @@ -263,7 +261,6 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration(
// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

Expand Down
8 changes: 0 additions & 8 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,6 @@ func QRepFlowWorkflow(
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
}

if config.BatchDurationSeconds == 0 {
config.BatchDurationSeconds = 60
}

if config.BatchSizeInt == 0 {
config.BatchSizeInt = 10000
}

// register a signal handler to terminate the workflow
terminateWorkflow := false
signalChan := workflow.GetSignalChannel(ctx, shared.CDCFlowSignalName)
Expand Down
34 changes: 14 additions & 20 deletions nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,11 @@ lazy_static::lazy_static! {
default_value: 10,
required: false,
},
QRepOptionType::Int {
name: "batch_size_int",
min_value: Some(1),
default_value: 1000,
required: false,
},
QRepOptionType::Int {
name: "batch_duration_timestamp",
min_value: Some(1),
default_value: 60,
required: false,
},
QRepOptionType::Int {
name: "num_rows_per_partition",
min_value: Some(0),
default_value: 0,
required: false,
min_value: Some(1),
default_value: 50000,
required: true,
},
QRepOptionType::Boolean {
name: "initial_copy_only",
Expand All @@ -112,7 +100,7 @@ lazy_static::lazy_static! {
}

pub fn process_options(
raw_opts: HashMap<&str, &SqlValue>,
mut raw_opts: HashMap<&str, &SqlValue>,
) -> anyhow::Result<HashMap<String, Value>> {
let mut opts: HashMap<String, Value> = HashMap::new();

Expand All @@ -124,7 +112,7 @@ pub fn process_options(
required,
accepted_values,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::SingleQuotedString(str) = raw_value {
if let Some(values) = accepted_values {
if !values.contains(&str.as_str()) {
Expand All @@ -147,7 +135,7 @@ pub fn process_options(
default_value,
required,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::Number(num_str, _) = raw_value {
let num = num_str.parse::<u32>()?;
if let Some(min) = min_value {
Expand All @@ -168,7 +156,7 @@ pub fn process_options(
}
QRepOptionType::StringArray { name } => {
// read it as a string and split on comma
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::SingleQuotedString(str) = raw_value {
let values: Vec<Value> = str
.split(',')
Expand All @@ -185,7 +173,7 @@ pub fn process_options(
default_value,
required,
} => {
if let Some(raw_value) = raw_opts.get(*name) {
if let Some(raw_value) = raw_opts.remove(*name) {
if let SqlValue::Boolean(b) = raw_value {
opts.insert((*name).to_string(), Value::Bool(*b));
} else {
Expand All @@ -201,5 +189,11 @@ pub fn process_options(
}
}

// all options processed have been removed from the map
// so any leftover keys are options that shouldn't be here
if !raw_opts.is_empty() {
anyhow::bail!("Unknown options for QRep mirrors: {:#?}", raw_opts.into_keys().collect::<Vec<&str>>());
}

Ok(opts)
}
10 changes: 0 additions & 10 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,6 @@ impl FlowGrpcClient {
cfg.wait_between_batches_seconds = n as u32;
}
}
"batch_size_int" => {
if let Some(n) = n.as_i64() {
cfg.batch_size_int = n as u32;
}
}
"batch_duration_timestamp" => {
if let Some(n) = n.as_i64() {
cfg.batch_duration_seconds = n as u32;
}
}
"num_rows_per_partition" => {
if let Some(n) = n.as_i64() {
cfg.num_rows_per_partition = n as u32;
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ message QRepConfig {
bool initial_copy_only = 8;
QRepSyncMode sync_mode = 9;

// DEPRECATED: eliminate when breaking changes are allowed.
uint32 batch_size_int = 10;
// DEPRECATED: eliminate when breaking changes are allowed.
uint32 batch_duration_seconds = 11;

uint32 max_parallel_workers = 12;
Expand Down

0 comments on commit 909bbb1

Please sign in to comment.