Skip to content

Commit

Permalink
added QRep overwrite mode, to truncate destination table (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Sep 15, 2023
1 parent 21e3dd8 commit bb6259a
Show file tree
Hide file tree
Showing 15 changed files with 873 additions and 141 deletions.
89 changes: 0 additions & 89 deletions flow/cmd/qrep_api.go

This file was deleted.

7 changes: 7 additions & 0 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e
return fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, qRepMetadataTableName, err)
}

if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.client.Query(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier)).Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

return nil
}

Expand Down
57 changes: 36 additions & 21 deletions flow/connectors/postgres/postgres_cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,23 @@ func (suite *PostgresCDCTestSuite) insertSimpleRecords(srcTableName string) {
func (suite *PostgresCDCTestSuite) validateInsertedSimpleRecords(records []model.Record, srcTableName string,
dstTableName string) {
suite.Equal(3, len(records))
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "quick"}})
matchData := []*model.RecordItems{
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(2)},
"name": {Kind: qvalue.QValueKindString, Value: "quick"},
}),
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(4)},
"name": {Kind: qvalue.QValueKindString, Value: "brown"},
}),
model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(8)},
"name": {Kind: qvalue.QValueKindString, Value: "fox"},
}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "quick"}}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(4)},
{Kind: qvalue.QValueKindString, Value: "brown"}}),
model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(8)},
{Kind: qvalue.QValueKindString, Value: "fox"}}),
}
for idx, record := range records {
suite.IsType(&model.InsertRecord{}, record)
Expand Down Expand Up @@ -90,22 +94,22 @@ func (suite *PostgresCDCTestSuite) validateSimpleMutatedRecords(records []model.
updateRecord := records[0].(*model.UpdateRecord)
suite.Equal(srcTableName, updateRecord.SourceTableName)
suite.Equal(dstTableName, updateRecord.DestinationTableName)
suite.Equal(model.RecordItems{}, updateRecord.OldItems)
suite.Equal(model.NewRecordItemWithData([]string{}, []*qvalue.QValue{}), updateRecord.OldItems)

items := model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(2)},
"name": {Kind: qvalue.QValueKindString, Value: "slow"},
})
items := model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(2)},
{Kind: qvalue.QValueKindString, Value: "slow"}})
suite.Equal(items, updateRecord.NewItems)

suite.IsType(&model.DeleteRecord{}, records[1])
deleteRecord := records[1].(*model.DeleteRecord)
suite.Equal(srcTableName, deleteRecord.SourceTableName)
suite.Equal(dstTableName, deleteRecord.DestinationTableName)
items = model.NewRecordItemWithData(map[string]*qvalue.QValue{
"id": {Kind: qvalue.QValueKindInt32, Value: int32(8)},
"name": {Kind: qvalue.QValueKindInvalid, Value: nil},
})
items = model.NewRecordItemWithData([]string{"id", "name"},
[]*qvalue.QValue{
{Kind: qvalue.QValueKindInt32, Value: int32(8)},
{Kind: qvalue.QValueKindInvalid, Value: nil}})
suite.Equal(items, deleteRecord.Items)
}

Expand Down Expand Up @@ -780,6 +784,17 @@ func (suite *PostgresCDCTestSuite) TestToastHappyFlow() {
RelationMessageMapping: relationMessageMapping,
})
suite.failTestError(err)
recordsWithSchemaDelta, err = suite.connector.PullRecords(&model.PullRecordsRequest{
FlowJobName: toastHappyFlowName,
LastSyncState: nil,
IdleTimeout: 10 * time.Second,
MaxBatchSize: 100,
SrcTableIDNameMapping: relIDTableNameMapping,
TableNameMapping: tableNameMapping,
TableNameSchemaMapping: tableNameSchemaMapping,
RelationMessageMapping: relationMessageMapping,
})
suite.failTestError(err)
suite.Nil(recordsWithSchemaDelta.TableSchemaDelta)
suite.validateInsertedToastRecords(recordsWithSchemaDelta.RecordBatch.Records,
toastHappyFlowSrcTableName, toastHappyFlowDstTableName)
Expand Down
8 changes: 8 additions & 0 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,14 @@ func (c *PostgresConnector) SetupQRepMetadataTables(config *protos.QRepConfig) e
"flowName": config.FlowJobName,
}).Infof("Setup metadata table.")

if config.WriteMode != nil &&
config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.pool.Exec(c.ctx, fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(config *protos.QRepConfig)
return err
}

if config.WriteMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE {
_, err = c.database.Exec(fmt.Sprintf("TRUNCATE TABLE %s", config.DestinationTableIdentifier))
if err != nil {
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
}
}

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ func (c *QRepFlowConnectionGenerationConfig) GenerateQRepConfig(

ret.SyncMode = syncMode
ret.StagingPath = c.StagingPath
ret.WriteMode = &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
}

return ret, nil
}
30 changes: 18 additions & 12 deletions flow/generated/protos/flow.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func NewRecordItems() *RecordItems {
}
}

func NewRecordItemWithData(data map[string]*qvalue.QValue) *RecordItems {
func NewRecordItemWithData(cols []string, val []*qvalue.QValue) *RecordItems {
recordItem := NewRecordItems()
for col, val := range data {
for i, col := range cols {
recordItem.colToValIdx[col] = len(recordItem.values)
recordItem.values = append(recordItem.values, val)
recordItem.values = append(recordItem.values, val[i])
}
return recordItem
}
Expand Down
12 changes: 0 additions & 12 deletions generate_go_protos.sh

This file was deleted.

2 changes: 1 addition & 1 deletion nexus/analyzer/src/qrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lazy_static::lazy_static! {
name: "mode",
default_val: Some("append"),
required: false,
accepted_values: Some(vec!["upsert", "append"]),
accepted_values: Some(vec!["upsert", "append", "overwrite"]),
},
QRepOptionType::StringArray {
name: "unique_key_columns",
Expand Down
11 changes: 11 additions & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ impl FlowGrpcClient {
cfg.write_mode = Some(wm);
}
"append" => cfg.write_mode = Some(wm),
"overwrite" => {
if !cfg.initial_copy_only {
return anyhow::Result::Err(
anyhow::anyhow!(
"write mode overwrite can only be set with initial_copy_only = true"
)
);
}
wm.write_type = QRepWriteType::QrepWriteModeOverwrite as i32;
cfg.write_mode = Some(wm);
}
_ => return anyhow::Result::Err(anyhow::anyhow!("invalid mode {}", s)),
}
}
Expand Down
4 changes: 4 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,8 @@ impl QRepSyncMode {
pub enum QRepWriteType {
QrepWriteModeAppend = 0,
QrepWriteModeUpsert = 1,
/// only valid when initial_copy_true is set to true. TRUNCATES tables before reverting to APPEND.
QrepWriteModeOverwrite = 2,
}
impl QRepWriteType {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -492,13 +494,15 @@ impl QRepWriteType {
match self {
QRepWriteType::QrepWriteModeAppend => "QREP_WRITE_MODE_APPEND",
QRepWriteType::QrepWriteModeUpsert => "QREP_WRITE_MODE_UPSERT",
QRepWriteType::QrepWriteModeOverwrite => "QREP_WRITE_MODE_OVERWRITE",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"QREP_WRITE_MODE_APPEND" => Some(Self::QrepWriteModeAppend),
"QREP_WRITE_MODE_UPSERT" => Some(Self::QrepWriteModeUpsert),
"QREP_WRITE_MODE_OVERWRITE" => Some(Self::QrepWriteModeOverwrite),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3210,6 +3210,7 @@ impl serde::Serialize for QRepWriteType {
let variant = match self {
Self::QrepWriteModeAppend => "QREP_WRITE_MODE_APPEND",
Self::QrepWriteModeUpsert => "QREP_WRITE_MODE_UPSERT",
Self::QrepWriteModeOverwrite => "QREP_WRITE_MODE_OVERWRITE",
};
serializer.serialize_str(variant)
}
Expand All @@ -3223,6 +3224,7 @@ impl<'de> serde::Deserialize<'de> for QRepWriteType {
const FIELDS: &[&str] = &[
"QREP_WRITE_MODE_APPEND",
"QREP_WRITE_MODE_UPSERT",
"QREP_WRITE_MODE_OVERWRITE",
];

struct GeneratedVisitor;
Expand Down Expand Up @@ -3267,6 +3269,7 @@ impl<'de> serde::Deserialize<'de> for QRepWriteType {
match value {
"QREP_WRITE_MODE_APPEND" => Ok(QRepWriteType::QrepWriteModeAppend),
"QREP_WRITE_MODE_UPSERT" => Ok(QRepWriteType::QrepWriteModeUpsert),
"QREP_WRITE_MODE_OVERWRITE" => Ok(QRepWriteType::QrepWriteModeOverwrite),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ enum QRepSyncMode {
enum QRepWriteType {
QREP_WRITE_MODE_APPEND = 0;
QREP_WRITE_MODE_UPSERT = 1;
// only valid when initial_copy_true is set to true. TRUNCATES tables before reverting to APPEND.
QREP_WRITE_MODE_OVERWRITE = 2;
}

message QRepWriteMode {
Expand Down
Loading

0 comments on commit bb6259a

Please sign in to comment.