Skip to content

Commit

Permalink
fixed TOAST handling for composite pkeys
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 16, 2023
1 parent c2ad57c commit 4e91865
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (p *PostgresCDCSource) consumeStream(
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal))
pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal.Value))
}
tablePkeyVal := model.TableWithPkey{
TableName: tableName,
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,10 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedT
updateStmts := make([]string, 0)

for _, cols := range unchangedToastColsLists {
unchangedColsArray := strings.Split(cols, ",")
unchangedColsArray := make([]string, 0)
for _, unchangedToastCol := range strings.Split(cols, ",") {
unchangedColsArray = append(unchangedColsArray, fmt.Sprintf(`"%s"`, unchangedToastCol))
}
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
for _, colName := range otherCols {
Expand Down
8 changes: 4 additions & 4 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
"id": string(qvalue.QValueKindInt64),
"c1": string(qvalue.QValueKindInt64),
},
PrimaryKeyColumn: "id",
PrimaryKeyColumns: []string{"id"},
}
output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{dstTableName},
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
"c1": string(qvalue.QValueKindInt64),
"c2": string(qvalue.QValueKindInt64),
},
PrimaryKeyColumn: "id",
PrimaryKeyColumns: []string{"id"},
}
output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{dstTableName},
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
"c2": string(qvalue.QValueKindInt64),
"c3": string(qvalue.QValueKindInt64),
},
PrimaryKeyColumn: "id",
PrimaryKeyColumns: []string{"id"},
}
output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{dstTableName},
Expand Down Expand Up @@ -218,7 +218,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
"c2": string(qvalue.QValueKindInt64),
"c3": string(qvalue.QValueKindInt64),
},
PrimaryKeyColumn: "id",
PrimaryKeyColumns: []string{"id"},
}
output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{
TableIdentifiers: []string{dstTableName},
Expand Down

0 comments on commit 4e91865

Please sign in to comment.