Skip to content

Commit

Permalink
Fix: Soft Delete for Bigquery Snapshot (#862)
Browse files Browse the repository at this point in the history
BigQuery uses destination table schema to derive AVRO schema. We create
two columns on the destination which makes the AVRO writing fail. This
PR fixes that by skipping the soft delete column (the synced_at column
was already being skipped) for AVRO writing
  • Loading branch information
Amogh-Bharadwaj authored Dec 20, 2023
1 parent 6b4e0e3 commit 9ec86d4
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 104 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *BigQueryConnector) SyncQRepRecords(

avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName)
tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName)
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
Expand Down
13 changes: 9 additions & 4 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
flowJobName, dstTableName, syncBatchID),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "")
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, "", "")
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down Expand Up @@ -108,6 +108,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
dstTableMetadata *bigquery.TableMetadata,
stream *model.QRecordStream,
syncedAtCol string,
softDeleteCol string,
) (int, error) {
startTime := time.Now()
flowLog := slog.Group("sync_metadata",
Expand All @@ -116,7 +117,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
slog.String("destinationTable", dstTableName),
)
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol)
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol, softDeleteCol)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand All @@ -139,8 +140,11 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
stmts := []string{"BEGIN TRANSACTION;"}

selector := "*"
if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
if syncedAtCol != "" { // PeerDB column
selector = "*, CURRENT_TIMESTAMP"
selector += ", CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT %s FROM `%s.%s`;",
Expand Down Expand Up @@ -187,12 +191,13 @@ type AvroSchema struct {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata,
syncedAtCol string,
softDeleteCol string,
) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := make(map[string]struct{})

for _, bqField := range dstTableMetadata.Schema {
if bqField.Name == syncedAtCol {
if bqField.Name == syncedAtCol || bqField.Name == softDeleteCol {
continue
}
avroType, err := GetAvroType(bqField)
Expand Down
209 changes: 110 additions & 99 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (s *SnapshotFlowExecution) cloneTable(
MaxParallelWorkers: numWorkers,
StagingPath: s.config.SnapshotStagingPath,
SyncedAtColName: s.config.SyncedAtColName,
SoftDeleteColName: s.config.SoftDeleteColName,
WriteMode: &protos.QRepWriteMode{
WriteType: protos.QRepWriteType_QREP_WRITE_MODE_APPEND,
},
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ pub struct QRepConfig {
pub dst_table_full_resync: bool,
#[prost(string, tag="19")]
pub synced_at_col_name: ::prost::alloc::string::String,
#[prost(string, tag="20")]
pub soft_delete_col_name: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2771,6 +2771,9 @@ impl serde::Serialize for QRepConfig {
if !self.synced_at_col_name.is_empty() {
len += 1;
}
if !self.soft_delete_col_name.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.QRepConfig", len)?;
if !self.flow_job_name.is_empty() {
struct_ser.serialize_field("flowJobName", &self.flow_job_name)?;
Expand Down Expand Up @@ -2831,6 +2834,9 @@ impl serde::Serialize for QRepConfig {
if !self.synced_at_col_name.is_empty() {
struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?;
}
if !self.soft_delete_col_name.is_empty() {
struct_ser.serialize_field("softDeleteColName", &self.soft_delete_col_name)?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -2878,6 +2884,8 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
"dstTableFullResync",
"synced_at_col_name",
"syncedAtColName",
"soft_delete_col_name",
"softDeleteColName",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -2901,6 +2909,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
SetupWatermarkTableOnDestination,
DstTableFullResync,
SyncedAtColName,
SoftDeleteColName,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -2942,6 +2951,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
"setupWatermarkTableOnDestination" | "setup_watermark_table_on_destination" => Ok(GeneratedField::SetupWatermarkTableOnDestination),
"dstTableFullResync" | "dst_table_full_resync" => Ok(GeneratedField::DstTableFullResync),
"syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand Down Expand Up @@ -2980,6 +2990,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
let mut setup_watermark_table_on_destination__ = None;
let mut dst_table_full_resync__ = None;
let mut synced_at_col_name__ = None;
let mut soft_delete_col_name__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::FlowJobName => {
Expand Down Expand Up @@ -3106,6 +3117,12 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
}
synced_at_col_name__ = Some(map.next_value()?);
}
GeneratedField::SoftDeleteColName => {
if soft_delete_col_name__.is_some() {
return Err(serde::de::Error::duplicate_field("softDeleteColName"));
}
soft_delete_col_name__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -3131,6 +3148,7 @@ impl<'de> serde::Deserialize<'de> for QRepConfig {
setup_watermark_table_on_destination: setup_watermark_table_on_destination__.unwrap_or_default(),
dst_table_full_resync: dst_table_full_resync__.unwrap_or_default(),
synced_at_col_name: synced_at_col_name__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__.unwrap_or_default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ message QRepConfig {
bool dst_table_full_resync = 18;

string synced_at_col_name = 19;
string soft_delete_col_name = 20;
}

message QRepPartition {
Expand Down
17 changes: 17 additions & 0 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ export interface QRepConfig {
*/
dstTableFullResync: boolean;
syncedAtColName: string;
softDeleteColName: string;
}

export interface QRepPartition {
Expand Down Expand Up @@ -5309,6 +5310,7 @@ function createBaseQRepConfig(): QRepConfig {
setupWatermarkTableOnDestination: false,
dstTableFullResync: false,
syncedAtColName: "",
softDeleteColName: "",
};
}

Expand Down Expand Up @@ -5371,6 +5373,9 @@ export const QRepConfig = {
if (message.syncedAtColName !== "") {
writer.uint32(154).string(message.syncedAtColName);
}
if (message.softDeleteColName !== "") {
writer.uint32(162).string(message.softDeleteColName);
}
return writer;
},

Expand Down Expand Up @@ -5514,6 +5519,13 @@ export const QRepConfig = {

message.syncedAtColName = reader.string();
continue;
case 20:
if (tag !== 162) {
break;
}

message.softDeleteColName = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand Down Expand Up @@ -5548,6 +5560,7 @@ export const QRepConfig = {
: false,
dstTableFullResync: isSet(object.dstTableFullResync) ? Boolean(object.dstTableFullResync) : false,
syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : "",
softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : "",
};
},

Expand Down Expand Up @@ -5610,6 +5623,9 @@ export const QRepConfig = {
if (message.syncedAtColName !== "") {
obj.syncedAtColName = message.syncedAtColName;
}
if (message.softDeleteColName !== "") {
obj.softDeleteColName = message.softDeleteColName;
}
return obj;
},

Expand Down Expand Up @@ -5643,6 +5659,7 @@ export const QRepConfig = {
message.setupWatermarkTableOnDestination = object.setupWatermarkTableOnDestination ?? false;
message.dstTableFullResync = object.dstTableFullResync ?? false;
message.syncedAtColName = object.syncedAtColName ?? "";
message.softDeleteColName = object.softDeleteColName ?? "";
return message;
},
};
Expand Down

0 comments on commit 9ec86d4

Please sign in to comment.