Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added QRep overwrite mode, to truncate destination table #385

Merged
merged 6 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 0 additions & 89 deletions flow/cmd/qrep_api.go

This file was deleted.

7 changes: 7 additions & 0 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e
return fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, qRepMetadataTableName, err)
}

if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

return nil
}

Expand Down
57 changes: 36 additions & 21 deletions flow/connectors/postgres/postgres_cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,23 @@ func (suite *PostgresCDCTestSuite) insertSimpleRecords(srcTableName string) {
func (suite *PostgresCDCTestSuite) validateInsertedSimpleRecords(records []model.Record, srcTableName string,
dstTableName string) {
suite.Equal(3, len(records))
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "quick"}})
matchData := []*model.RecordItems{
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(2)},
"name": {Kind: qvalue.QValueKindString, Value: "quick"},
}),
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(4)},
"name": {Kind: qvalue.QValueKindString, Value: "brown"},
}),
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(8)},
"name": {Kind: qvalue.QValueKindString, Value: "fox"},
}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "quick"}}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(4)},
{Kind: qvalue.QValueKindString, Value: "brown"}}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(8)},
{Kind: qvalue.QValueKindString, Value: "fox"}}),
}
for idx, record := range records {
suite.IsType(&model.InsertRecord{}, record)
Expand Down Expand Up @@ -90,22 +94,22 @@ func (suite *PostgresCDCTestSuite) validateSimpleMutatedRecords(records []model.
updateRecord := records[0].(*model.UpdateRecord)
suite.Equal(srcTableName, updateRecord.SourceTableName)
suite.Equal(dstTableName, updateRecord.DestinationTableName)
suite.Equal(model.RecordItems{}, updateRecord.OldItems)
suite.Equal(model.NewRecordItemWithData([]string{}, []*qvalue.QValue{}), updateRecord.OldItems)

items := model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(2)},
"name": {Kind: qvalue.QValueKindString, Value: "slow"},
})
items := model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "slow"}})
suite.Equal(items, updateRecord.NewItems)

suite.IsType(&model.DeleteRecord{}, records[1])
deleteRecord := records[1].(*model.DeleteRecord)
suite.Equal(srcTableName, deleteRecord.SourceTableName)
suite.Equal(dstTableName, deleteRecord.DestinationTableName)
items = model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(8)},
"name": {Kind: qvalue.QValueKindInvalid, Value: nil},
})
items = model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(8)},
{Kind: qvalue.QValueKindInvalid, Value: nil}})
suite.Equal(items, deleteRecord.Items)
}

Expand Down Expand Up @@ -780,6 +784,17 @@ func (suite *PostgresCDCTestSuite) TestToastHappyFlow() {
RelationMessageMapping: relationMessageMapping,
})
suite.failTestError(err)
recordsWithSchemaDelta, err = suite.connector.PullRecords(&model.PullRecordsRequest{
FlowJobName: toastHappyFlowName,
LastSyncState: nil,
IdleTimeout: 10 * time.Second,
MaxBatchSize: 100,
SrcTableIDNameMapping: relIDTableNameMapping,
TableNameMapping: tableNameMapping,
TableNameSchemaMapping: tableNameSchemaMapping,
RelationMessageMapping: relationMessageMapping,
})
suite.failTestError(err)
suite.Nil(recordsWithSchemaDelta.TableSchemaDelta)
suite.validateInsertedToastRecords(recordsWithSchemaDelta.RecordBatch.Records,
toastHappyFlowSrcTableName, toastHappyFlowDstTableName)
Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,14 @@ func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e
"flowName": config.FlowJobName,
}).Infof("Setup metadata table.")

if config.WriteMode != nil &&
config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.pool.Exec(c.ctx, fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
return err
}

if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.database.Exec(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig(

ret.SyncMode = syncMode
ret.StagingPath = c.StagingPath
ret.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
}

return ret, nil
}
30 changes: 18 additions & 12 deletions flow/generated/protos/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func NewRecordItems() *RecordItems {
}
}

func NewRecordItemWithData(data map[string]*qvalue.QValue) *RecordItems {
func NewRecordItemWithData(cols []string, val []*qvalue.QValue) *RecordItems {
recordItem := NewRecordItems()
for col, val := range data {
for i, col := range cols {
recordItem.colToValIdx[col] = len(recordItem.values)
recordItem.values = append(recordItem.values, val)
recordItem.values = append(recordItem.values, val[i])
}
return recordItem
}
Expand Down
12 changes: 0 additions & 12 deletions generate_go_protos.sh

This file was deleted.

2 changes: 1 addition & 1 deletion nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lazy_static::lazy_static! {
name: "mode",
default_val: Some("append"),
required: false,
accepted_values: Some(vec!["upsert", "append"]),
accepted_values: Some(vec!["upsert", "append", "overwrite"]),
},
QRepOptionType::StringArray {
name: "unique_key_columns",
Expand Down
11 changes: 11 additions & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ impl FlowGrpcClient {
cfg.write_mode = Some(wm);
}
"append" => cfg.write_mode = Some(wm),
"overwrite" => {
if !cfg.initial_copy_only {
return anyhow::Result::Err(
anyhow::anyhow!(
"write mode overwrite can only be set with initial_copy_only = true"
)
);
}
wm.write_type = QRepWriteType::QrepWriteModeOverwrite as i32;
cfg.write_mode = Some(wm);
}
_ => return anyhow::Result::Err(anyhow::anyhow!("invalid mode {}", s)),
}
}
Expand Down
4 changes: 4 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ impl QRepSyncMode {
pub enum QRepWriteType {
QrepWriteModeAppend = 0,
QrepWriteModeUpsert = 1,
/// only valid when initial_copy_true is set to true. TRUNCATES tables before reverting to APPEND.
QrepWriteModeOverwrite = 2,
}
impl QRepWriteType {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -492,13 +494,15 @@ impl QRepWriteType {
match self {
QRepWriteType::QrepWriteModeAppend => "QREP_WRITE_MODE_APPEND",
QRepWriteType::QrepWriteModeUpsert => "QREP_WRITE_MODE_UPSERT",
QRepWriteType::QrepWriteModeOverwrite => "QREP_WRITE_MODE_OVERWRITE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"QREP_WRITE_MODE_APPEND" => Some(Self::QrepWriteModeAppend),
"QREP_WRITE_MODE_UPSERT" => Some(Self::QrepWriteModeUpsert),
"QREP_WRITE_MODE_OVERWRITE" => Some(Self::QrepWriteModeOverwrite),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3210,6 +3210,7 @@ impl serde::Serialize for QRepWriteType {
let variant = match self {
Self::QrepWriteModeAppend => "QREP_WRITE_MODE_APPEND",
Self::QrepWriteModeUpsert => "QREP_WRITE_MODE_UPSERT",
Self::QrepWriteModeOverwrite => "QREP_WRITE_MODE_OVERWRITE",
};
serializer.serialize_str(variant)
}
Expand All @@ -3223,6 +3224,7 @@ impl<'de> serde::Deserialize<'de> for QRepWriteType {
const FIELDS: &[&str] = &[
"QREP_WRITE_MODE_APPEND",
"QREP_WRITE_MODE_UPSERT",
"QREP_WRITE_MODE_OVERWRITE",
];

struct GeneratedVisitor;
Expand Down Expand Up @@ -3267,6 +3269,7 @@ impl<'de> serde::Deserialize<'de> for QRepWriteType {
match value {
"QREP_WRITE_MODE_APPEND" => Ok(QRepWriteType::QrepWriteModeAppend),
"QREP_WRITE_MODE_UPSERT" => Ok(QRepWriteType::QrepWriteModeUpsert),
"QREP_WRITE_MODE_OVERWRITE" => Ok(QRepWriteType::QrepWriteModeOverwrite),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ enum QRepSyncMode {
enum QRepWriteType {
QREP_WRITE_MODE_APPEND = 0;
QREP_WRITE_MODE_UPSERT = 1;
// only valid when initial_copy_true is set to true. TRUNCATES tables before reverting to APPEND.
QREP_WRITE_MODE_OVERWRITE = 2;
}

message QRepWriteMode {
Expand Down
Loading
Loading