Skip to content

Commit

Permalink
exclude columns
Browse files Browse the repository at this point in the history
Currently only extends things to handle exclude field from
PeerDB-io/sqlparser-rs#21
but doesn't do anything with it
  • Loading branch information
serprex committed Nov 3, 2023
1 parent 0aec792 commit aa72c90
Show file tree
Hide file tree
Showing 14 changed files with 631 additions and 578 deletions.
5 changes: 2 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,6 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
var stream *model.QRecordStream
bufferSize := shared.FetchAndChannelSize
var wg sync.WaitGroup
var numRecords int64

var goroutineErr error = nil
if config.SourcePeer.Type == protos.DBType_POSTGRES {
Expand All @@ -530,7 +529,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
pullPgRecords := func() {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(config, partition, stream)
numRecords = int64(tmp)
numRecords := int64(tmp)
if err != nil {
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
Expand All @@ -551,7 +550,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
if err != nil {
return fmt.Errorf("failed to pull records: %w", err)
}
numRecords = int64(recordBatch.NumRecords)
numRecords := int64(recordBatch.NumRecords)
log.WithFields(log.Fields{
"flowName": config.FlowJobName,
}).Infof("pulled %d records\n", len(recordBatch.Records))
Expand Down
1,152 changes: 581 additions & 571 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type RecordItems struct {
func NewRecordItems() *RecordItems {
return &RecordItems{
colToValIdx: make(map[string]int),
// create a slice of 64 qvalues so that we don't have to allocate memory
// create a slice of 32 qvalues so that we don't have to allocate memory
// for each record to reduce GC pressure
values: make([]*qvalue.QValue, 0, 32),
}
Expand Down
4 changes: 4 additions & 0 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
.partition_key
.as_ref()
.map(|s| s.to_string()),
exclude: table_mapping
.exclude
.as_ref()
.map(|ss| ss.iter().map(|s| s.to_string()).collect())
});
}

Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl FlowGrpcClient {
source_table_identifier: mapping.source_table_identifier.clone(),
destination_table_identifier: mapping.destination_table_identifier.clone(),
partition_key: mapping.partition_key.clone().unwrap_or_default(),
exclude: mapping.exclude.clone().unwrap_or_default(),
});
});

Expand Down
1 change: 1 addition & 0 deletions nexus/pt/src/flow_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct FlowJobTableMapping {
pub source_table_identifier: String,
pub destination_table_identifier: String,
pub partition_key: Option<String>,
pub exclude: Option<Vec<String>>,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct TableMapping {
pub destination_table_identifier: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub partition_key: ::prost::alloc::string::String,
#[prost(string, repeated, tag = "4")]
pub exclude: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
17 changes: 17 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5528,6 +5528,9 @@ impl serde::Serialize for TableMapping {
if !self.partition_key.is_empty() {
len += 1;
}
if !self.exclude.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.TableMapping", len)?;
if !self.source_table_identifier.is_empty() {
struct_ser.serialize_field("sourceTableIdentifier", &self.source_table_identifier)?;
Expand All @@ -5538,6 +5541,9 @@ impl serde::Serialize for TableMapping {
if !self.partition_key.is_empty() {
struct_ser.serialize_field("partitionKey", &self.partition_key)?;
}
if !self.exclude.is_empty() {
struct_ser.serialize_field("exclude", &self.exclude)?;
}
struct_ser.end()
}
}
Expand All @@ -5554,13 +5560,15 @@ impl<'de> serde::Deserialize<'de> for TableMapping {
"destinationTableIdentifier",
"partition_key",
"partitionKey",
"exclude",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
SourceTableIdentifier,
DestinationTableIdentifier,
PartitionKey,
Exclude,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -5586,6 +5594,7 @@ impl<'de> serde::Deserialize<'de> for TableMapping {
"sourceTableIdentifier" | "source_table_identifier" => Ok(GeneratedField::SourceTableIdentifier),
"destinationTableIdentifier" | "destination_table_identifier" => Ok(GeneratedField::DestinationTableIdentifier),
"partitionKey" | "partition_key" => Ok(GeneratedField::PartitionKey),
"exclude" => Ok(GeneratedField::Exclude),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -5608,6 +5617,7 @@ impl<'de> serde::Deserialize<'de> for TableMapping {
let mut source_table_identifier__ = None;
let mut destination_table_identifier__ = None;
let mut partition_key__ = None;
let mut exclude__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::SourceTableIdentifier => {
Expand All @@ -5628,6 +5638,12 @@ impl<'de> serde::Deserialize<'de> for TableMapping {
}
partition_key__ = Some(map.next_value()?);
}
GeneratedField::Exclude => {
if exclude__.is_some() {
return Err(serde::de::Error::duplicate_field("exclude"));
}
exclude__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -5637,6 +5653,7 @@ impl<'de> serde::Deserialize<'de> for TableMapping {
source_table_identifier: source_table_identifier__.unwrap_or_default(),
destination_table_identifier: destination_table_identifier__.unwrap_or_default(),
partition_key: partition_key__.unwrap_or_default(),
exclude: exclude__.unwrap_or_default(),
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion nexus/sqlparser-rs
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ message TableMapping {
string source_table_identifier = 1;
string destination_table_identifier = 2;
string partition_key = 3;
repeated string exclude = 4;
}

message FlowConnectionConfigs {
Expand Down
1 change: 1 addition & 0 deletions ui/app/dto/MirrorsDTO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export type TableMapRow = {
source: string;
destination: string;
partitionKey: string;
exclude: string[];
};
1 change: 1 addition & 0 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const reformattedTableMapping = (tableMapping: TableMapRow[]) => {
sourceTableIdentifier: row.source,
destinationTableIdentifier: row.destination,
partitionKey: row.partitionKey,
exclude: row.exclude,
};
});
return mapping;
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/create/tablemapping.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const TableMapping = ({
const [loading, setLoading] = useState(false);

const handleAddRow = (source: string) => {
setRows([...rows, { source, destination: source, partitionKey: '' }]);
setRows([...rows, { source, destination: source, partitionKey: '', exclude: [] }]);
};

const handleRemoveRow = (source: string) => {
Expand Down
18 changes: 17 additions & 1 deletion ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export interface TableMapping {
sourceTableIdentifier: string;
destinationTableIdentifier: string;
partitionKey: string;
exclude: string[];
}

export interface FlowConnectionConfigs {
Expand Down Expand Up @@ -691,7 +692,7 @@ export const RelationMessage = {
};

function createBaseTableMapping(): TableMapping {
return { sourceTableIdentifier: "", destinationTableIdentifier: "", partitionKey: "" };
return { sourceTableIdentifier: "", destinationTableIdentifier: "", partitionKey: "", exclude: [] };
}

export const TableMapping = {
Expand All @@ -705,6 +706,9 @@ export const TableMapping = {
if (message.partitionKey !== "") {
writer.uint32(26).string(message.partitionKey);
}
for (const v of message.exclude) {
writer.uint32(34).string(v!);
}
return writer;
},

Expand Down Expand Up @@ -736,6 +740,13 @@ export const TableMapping = {

message.partitionKey = reader.string();
continue;
case 4:
if (tag !== 34) {
break;
}

message.exclude.push(reader.string());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -752,6 +763,7 @@ export const TableMapping = {
? String(object.destinationTableIdentifier)
: "",
partitionKey: isSet(object.partitionKey) ? String(object.partitionKey) : "",
exclude: Array.isArray(object?.exclude) ? object.exclude.map((e: any) => String(e)) : [],
};
},

Expand All @@ -766,6 +778,9 @@ export const TableMapping = {
if (message.partitionKey !== "") {
obj.partitionKey = message.partitionKey;
}
if (message.exclude?.length) {
obj.exclude = message.exclude;
}
return obj;
},

Expand All @@ -777,6 +792,7 @@ export const TableMapping = {
message.sourceTableIdentifier = object.sourceTableIdentifier ?? "";
message.destinationTableIdentifier = object.destinationTableIdentifier ?? "";
message.partitionKey = object.partitionKey ?? "";
message.exclude = object.exclude?.map((e) => e) || [];
return message;
},
};
Expand Down

0 comments on commit aa72c90

Please sign in to comment.