diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 386476192b..ae2b0aba4e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -245,7 +245,7 @@ func (p *PostgresCDCSource) consumeStream( if err != nil { return nil, fmt.Errorf("error getting pkey column value: %w", err) } - pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal)) + pkeyColsMerged = append(pkeyColsMerged, fmt.Sprintf("%v", pkeyColVal.Value)) } tablePkeyVal := model.TableWithPkey{ TableName: tableName, diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index e9eb565840..1c89edf6cf 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -566,7 +566,10 @@ func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedT updateStmts := make([]string, 0) for _, cols := range unchangedToastColsLists { - unchangedColsArray := strings.Split(cols, ",") + unchangedColsArray := make([]string, 0) + for _, unchangedToastCol := range strings.Split(cols, ",") { + unchangedColsArray = append(unchangedColsArray, fmt.Sprintf(`"%s"`, unchangedToastCol)) + } otherCols := utils.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0) for _, colName := range otherCols { diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index f2c9bcb18d..14cfef5690 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -129,7 +129,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { "id": string(qvalue.QValueKindInt64), "c1": string(qvalue.QValueKindInt64), }, - PrimaryKeyColumn: "id", + PrimaryKeyColumns: []string{"id"}, } output, err := s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, @@ -158,7 +158,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { "c1": string(qvalue.QValueKindInt64), "c2": string(qvalue.QValueKindInt64), }, - PrimaryKeyColumn: "id", + PrimaryKeyColumns: []string{"id"}, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, @@ -188,7 +188,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { "c2": string(qvalue.QValueKindInt64), "c3": string(qvalue.QValueKindInt64), }, - PrimaryKeyColumn: "id", + PrimaryKeyColumns: []string{"id"}, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName}, @@ -218,7 +218,7 @@ func (s *PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { "c2": string(qvalue.QValueKindInt64), "c3": string(qvalue.QValueKindInt64), }, - PrimaryKeyColumn: "id", + PrimaryKeyColumns: []string{"id"}, } output, err = s.connector.GetTableSchema(&protos.GetTableSchemaBatchInput{ TableIdentifiers: []string{dstTableName},