Skip to content

Commit

Permalink
skips soft del column for avro
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 20, 2023
1 parent 6b4e0e3 commit 1c80b6f
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 104 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *BigQueryConnector) SyncQRepRecords(

avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName)
tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName)
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
Expand Down
13 changes: 9 additions & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "")
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "", "")
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down Expand Up @@ -108,6 +108,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
dstTableMetadata *bigquery.TableMetadata,
stream *model.QRecordStream,
syncedAtCol string,
softDeleteCol string,
) (int, error) {
startTime := time.Now()
flowLog := slog.Group("sync_metadata",
Expand All @@ -116,7 +117,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
slog.String("destinationTable", dstTableName),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol, softDeleteCol)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand All @@ -139,8 +140,11 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
stmts := []string{"BEGIN TRANSACTION;"}

selector := "*"
if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
if syncedAtCol != "" { // PeerDB column
selector = "*, CURRENT_TIMESTAMP"
selector += ", CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;",
Expand Down Expand Up @@ -187,12 +191,13 @@ type AvroSchema struct {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata,
syncedAtCol string,
softDeleteCol string,
) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := make(map[string]struct{})

for _, bqField := range dstTableMetadata.Schema {
if bqField.Name == syncedAtCol {
if bqField.Name == syncedAtCol || bqField.Name == softDeleteCol {
continue
}
avroType, err := GetAvroType(bqField)
Expand Down
209 changes: 110 additions & 99 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (s *SnapshotFlowExecution) cloneTable(
MaxParallelWorkers: numWorkers,
StagingPath: s.config.SnapshotStagingPath,
SyncedAtColName: s.config.SyncedAtColName,
SoftDeleteColName: s.config.SoftDeleteColName,
WriteMode: &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
},
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ pub struct QRepConfig {
pub dst_table_full_resync: bool,
#[prost(string, tag="19")]
pub synced_at_col_name: ::prost::alloc::string::String,
#[prost(string, tag="20")]
pub soft_delete_col_name: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2771,6 +2771,9 @@ impl serde::Serialize for QRepConfig {
if !self.synced_at_col_name.is_empty() {
len += 1;
}
if !self.soft_delete_col_name.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.QRepConfig", len)?;
if !self.flow_job_name.is_empty() {
struct_ser.serialize_field("flowJobName", &self.flow_job_name)?;
Expand Down Expand Up @@ -2831,6 +2834,9 @@ impl serde::Serialize for QRepConfig {
if !self.synced_at_col_name.is_empty() {
struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?;
}
if !self.soft_delete_col_name.is_empty() {
struct_ser.serialize_field("softDeleteColName", &self.soft_delete_col_name)?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -2878,6 +2884,8 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
"dstTableFullResync",
"synced_at_col_name",
"syncedAtColName",
"soft_delete_col_name",
"softDeleteColName",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -2901,6 +2909,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
SetupWatermarkTableOnDestination,
DstTableFullResync,
SyncedAtColName,
SoftDeleteColName,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -2942,6 +2951,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
"setupWatermarkTableOnDestination" | "setup_watermark_table_on_destination" => Ok(GeneratedField::SetupWatermarkTableOnDestination),
"dstTableFullResync" | "dst_table_full_resync" => Ok(GeneratedField::DstTableFullResync),
"syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand Down Expand Up @@ -2980,6 +2990,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
let mut setup_watermark_table_on_destination__ = None;
let mut dst_table_full_resync__ = None;
let mut synced_at_col_name__ = None;
let mut soft_delete_col_name__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::FlowJobName => {
Expand Down Expand Up @@ -3106,6 +3117,12 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
}
synced_at_col_name__ = Some(map.next_value()?);
}
GeneratedField::SoftDeleteColName => {
if soft_delete_col_name__.is_some() {
return Err(serde::de::Error::duplicate_field("softDeleteColName"));
}
soft_delete_col_name__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -3131,6 +3148,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
setup_watermark_table_on_destination: setup_watermark_table_on_destination__.unwrap_or_default(),
dst_table_full_resync: dst_table_full_resync__.unwrap_or_default(),
synced_at_col_name: synced_at_col_name__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__.unwrap_or_default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ message QRepConfig {
bool dst_table_full_resync = 18;

string synced_at_col_name = 19;
string soft_delete_col_name = 20;
}

message QRepPartition {
Expand Down
17 changes: 17 additions & 0 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ export interface QRepConfig {
*/
dstTableFullResync: boolean;
syncedAtColName: string;
softDeleteColName: string;
}

export interface QRepPartition {
Expand Down Expand Up @@ -5309,6 +5310,7 @@ function createBaseQRepConfig(): QRepConfig {
setupWatermarkTableOnDestination: false,
dstTableFullResync: false,
syncedAtColName: "",
softDeleteColName: "",
};
}

Expand Down Expand Up @@ -5371,6 +5373,9 @@ export const QRepConfig = {
if (message.syncedAtColName !== "") {
writer.uint32(154).string(message.syncedAtColName);
}
if (message.softDeleteColName !== "") {
writer.uint32(162).string(message.softDeleteColName);
}
return writer;
},

Expand Down Expand Up @@ -5514,6 +5519,13 @@ export const QRepConfig = {

message.syncedAtColName = reader.string();
continue;
case 20:
if (tag !== 162) {
break;
}

message.softDeleteColName = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand Down Expand Up @@ -5548,6 +5560,7 @@ export const QRepConfig = {
: false,
dstTableFullResync: isSet(object.dstTableFullResync) ? Boolean(object.dstTableFullResync) : false,
syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : "",
softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : "",
};
},

Expand Down Expand Up @@ -5610,6 +5623,9 @@ export const QRepConfig = {
if (message.syncedAtColName !== "") {
obj.syncedAtColName = message.syncedAtColName;
}
if (message.softDeleteColName !== "") {
obj.softDeleteColName = message.softDeleteColName;
}
return obj;
},

Expand Down Expand Up @@ -5643,6 +5659,7 @@ export const QRepConfig = {
message.setupWatermarkTableOnDestination = object.setupWatermarkTableOnDestination ?? false;
message.dstTableFullResync = object.dstTableFullResync ?? false;
message.syncedAtColName = object.syncedAtColName ?? "";
message.softDeleteColName = object.softDeleteColName ?? "";
return message;
},
};
Expand Down

0 comments on commit 1c80b6f

Please sign in to comment.