Skip to content

Commit

Permalink
patching FullTablePartition for pkey-less tables
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 22, 2023
1 parent 2de985b commit cc4c2e6
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 281 deletions.
14 changes: 8 additions & 6 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,15 @@ func generateCreateTableSQLForNormalizedTable(sourceTableIdentifier string,
}

// add composite primary key to the table
primaryKeyColsQuoted := make([]string, 0)
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsQuoted = append(primaryKeyColsQuoted,
fmt.Sprintf(`"%s"`, primaryKeyCol))
if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsQuoted = append(primaryKeyColsQuoted,
fmt.Sprintf(`"%s"`, primaryKeyCol))
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),",
strings.TrimSuffix(strings.Join(primaryKeyColsQuoted, ","), ",")))
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),",
strings.TrimSuffix(strings.Join(primaryKeyColsQuoted, ","), ",")))

return fmt.Sprintf(createNormalizedTableSQL, sourceTableIdentifier,
strings.TrimSuffix(strings.Join(createTableSQLArray, ""), ","))
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (c *PostgresConnector) GetTableSchema(
req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error) {
res := make(map[string]*protos.TableSchema)
for _, tableName := range req.TableIdentifiers {
tableSchema, err := c.getTableSchemaForTable(tableName)
tableSchema, err := c.getTableSchemaForTable(tableName, req.IgnorePkeyRequirements)
if err != nil {
return nil, err
}
Expand All @@ -562,6 +562,7 @@ func (c *PostgresConnector) GetTableSchema(

func (c *PostgresConnector) getTableSchemaForTable(
tableName string,
ignorePkeyRequirements bool,
) (*protos.TableSchema, error) {
schemaTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
Expand All @@ -584,7 +585,7 @@ func (c *PostgresConnector) getTableSchemaForTable(

pKeyCols, err := c.getPrimaryKeyColumns(schemaTable)
if err != nil {
if !isFullReplica {
if !(isFullReplica || ignorePkeyRequirements) {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
}
Expand Down
14 changes: 8 additions & 6 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,13 +916,15 @@ func generateCreateTableSQLForNormalizedTable(
}

// add composite primary key to the table
primaryKeyColsUpperQuoted := make([]string, 0)
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsUpperQuoted = append(primaryKeyColsUpperQuoted,
fmt.Sprintf(`"%s"`, strings.ToUpper(primaryKeyCol)))
if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
primaryKeyColsUpperQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsUpperQuoted = append(primaryKeyColsUpperQuoted,
fmt.Sprintf(`"%s"`, strings.ToUpper(primaryKeyCol)))
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),",
strings.TrimSuffix(strings.Join(primaryKeyColsUpperQuoted, ","), ",")))
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),",
strings.TrimSuffix(strings.Join(primaryKeyColsUpperQuoted, ","), ",")))

return fmt.Sprintf(createNormalizedTableSQL, sourceTableIdentifier,
strings.TrimSuffix(strings.Join(createTableSQLArray, ""), ","))
Expand Down
538 changes: 275 additions & 263 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package model

import "fmt"
import (
"fmt"

log "github.com/sirupsen/logrus"
)

type QRecordOrError struct {
Record *QRecord
Expand Down Expand Up @@ -69,6 +73,7 @@ func (s *QRecordStream) SetSchema(schema *QRecordSchema) error {
return fmt.Errorf("Schema already set")
}

log.Warnf("setting the schema bois")
s.schema <- &QRecordSchemaOrError{
Schema: schema,
}
Expand Down
5 changes: 3 additions & 2 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
})

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
IgnorePkeyRequirements: true,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ pub struct GetTableSchemaBatchInput {
pub peer_connection_config: ::core::option::Option<super::peerdb_peers::Peer>,
#[prost(string, repeated, tag="2")]
pub table_identifiers: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(bool, tag="3")]
pub ignore_pkey_requirements: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1820,13 +1820,19 @@ impl serde::Serialize for GetTableSchemaBatchInput {
if !self.table_identifiers.is_empty() {
len += 1;
}
if self.ignore_pkey_requirements {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.GetTableSchemaBatchInput", len)?;
if let Some(v) = self.peer_connection_config.as_ref() {
struct_ser.serialize_field("peerConnectionConfig", v)?;
}
if !self.table_identifiers.is_empty() {
struct_ser.serialize_field("tableIdentifiers", &self.table_identifiers)?;
}
if self.ignore_pkey_requirements {
struct_ser.serialize_field("ignorePkeyRequirements", &self.ignore_pkey_requirements)?;
}
struct_ser.end()
}
}
Expand All @@ -1841,12 +1847,15 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
"peerConnectionConfig",
"table_identifiers",
"tableIdentifiers",
"ignore_pkey_requirements",
"ignorePkeyRequirements",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
PeerConnectionConfig,
TableIdentifiers,
IgnorePkeyRequirements,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -1871,6 +1880,7 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
match value {
"peerConnectionConfig" | "peer_connection_config" => Ok(GeneratedField::PeerConnectionConfig),
"tableIdentifiers" | "table_identifiers" => Ok(GeneratedField::TableIdentifiers),
"ignorePkeyRequirements" | "ignore_pkey_requirements" => Ok(GeneratedField::IgnorePkeyRequirements),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -1892,6 +1902,7 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
{
let mut peer_connection_config__ = None;
let mut table_identifiers__ = None;
let mut ignore_pkey_requirements__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::PeerConnectionConfig => {
Expand All @@ -1906,6 +1917,12 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
}
table_identifiers__ = Some(map.next_value()?);
}
GeneratedField::IgnorePkeyRequirements => {
if ignore_pkey_requirements__.is_some() {
return Err(serde::de::Error::duplicate_field("ignorePkeyRequirements"));
}
ignore_pkey_requirements__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -1914,6 +1931,7 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
Ok(GetTableSchemaBatchInput {
peer_connection_config: peer_connection_config__,
table_identifiers: table_identifiers__.unwrap_or_default(),
ignore_pkey_requirements: ignore_pkey_requirements__.unwrap_or_default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ message TableSchema {
message GetTableSchemaBatchInput {
peerdb_peers.Peer peer_connection_config = 1;
repeated string table_identifiers = 2;
bool ignore_pkey_requirements = 3;
}

message GetTableSchemaBatchOutput {
Expand Down
18 changes: 17 additions & 1 deletion ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ export interface TableSchema_ColumnsEntry {
export interface GetTableSchemaBatchInput {
peerConnectionConfig: Peer | undefined;
tableIdentifiers: string[];
ignorePkeyRequirements: boolean;
}

export interface GetTableSchemaBatchOutput {
Expand Down Expand Up @@ -4028,7 +4029,7 @@ export const TableSchema_ColumnsEntry = {
};

function createBaseGetTableSchemaBatchInput(): GetTableSchemaBatchInput {
return { peerConnectionConfig: undefined, tableIdentifiers: [] };
return { peerConnectionConfig: undefined, tableIdentifiers: [], ignorePkeyRequirements: false };
}

export const GetTableSchemaBatchInput = {
Expand All @@ -4039,6 +4040,9 @@ export const GetTableSchemaBatchInput = {
for (const v of message.tableIdentifiers) {
writer.uint32(18).string(v!);
}
if (message.ignorePkeyRequirements === true) {
writer.uint32(24).bool(message.ignorePkeyRequirements);
}
return writer;
},

Expand All @@ -4063,6 +4067,13 @@ export const GetTableSchemaBatchInput = {

message.tableIdentifiers.push(reader.string());
continue;
case 3:
if (tag !== 24) {
break;
}

message.ignorePkeyRequirements = reader.bool();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -4078,6 +4089,7 @@ export const GetTableSchemaBatchInput = {
tableIdentifiers: Array.isArray(object?.tableIdentifiers)
? object.tableIdentifiers.map((e: any) => String(e))
: [],
ignorePkeyRequirements: isSet(object.ignorePkeyRequirements) ? Boolean(object.ignorePkeyRequirements) : false,
};
},

Expand All @@ -4089,6 +4101,9 @@ export const GetTableSchemaBatchInput = {
if (message.tableIdentifiers?.length) {
obj.tableIdentifiers = message.tableIdentifiers;
}
if (message.ignorePkeyRequirements === true) {
obj.ignorePkeyRequirements = message.ignorePkeyRequirements;
}
return obj;
},

Expand All @@ -4101,6 +4116,7 @@ export const GetTableSchemaBatchInput = {
? Peer.fromPartial(object.peerConnectionConfig)
: undefined;
message.tableIdentifiers = object.tableIdentifiers?.map((e) => e) || [];
message.ignorePkeyRequirements = object.ignorePkeyRequirements ?? false;
return message;
},
};
Expand Down

0 comments on commit cc4c2e6

Please sign in to comment.