Skip to content

Commit

Permalink
rebased a very old branch
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Oct 9, 2023
1 parent 54a372e commit d4e0997
Show file tree
Hide file tree
Showing 9 changed files with 1,069 additions and 455 deletions.
24 changes: 11 additions & 13 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,19 +248,17 @@ func (p *PostgresCDCSource) consumeStream(
}
_, ok := records.TablePKeyLastSeen[tablePkeyVal]
if !ok {
result.Records = append(result.Records, rec)
result.TablePKeyLastSeen[tablePkeyVal] = len(result.Records) - 1
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
} else {
oldRec := result.Records[result.TablePKeyLastSeen[tablePkeyVal]]
// iterate through unchanged toast cols and set them
for col, val := range oldRec.GetItems() {
if _, ok := r.NewItems[col]; !ok {
r.NewItems[col] = val
delete(r.UnchangedToastColumns, col)
}
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
oldRec := records.Records[records.TablePKeyLastSeen[tablePkeyVal]]
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(oldRec.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
}
records.Records = append(records.Records, rec)
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
}
case *model.InsertRecord:
pkeyColsMerged := make([]string, 0)
Expand All @@ -276,9 +274,9 @@ func (p *PostgresCDCSource) consumeStream(
TableName: tableName,
PkeyColVal: strings.Join(pkeyColsMerged, " "),
}
result.Records = append(result.Records, rec)
records.Records = append(records.Records, rec)
// all columns will be set in insert record, so add it to the map
result.TablePKeyLastSeen[tablePkeyVal] = len(result.Records) - 1
records.TablePKeyLastSeen[tablePkeyVal] = len(records.Records) - 1
case *model.DeleteRecord:
records.Records = append(records.Records, rec)
case *model.RelationRecord:
Expand Down
38 changes: 9 additions & 29 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,45 +543,25 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
}
flattenedCastsSQL := strings.TrimSuffix(strings.Join(flattenedCastsSQLArray, ","), ",")

// return fmt.Sprintf(mergeStatementSQL, primaryKeyColumnCast, internalSchema, rawTableIdentifier,
// destinationTableIdentifier, flattenedCastsSQL, normalizedTableSchema.PrimaryKeyColumn,
// normalizedTableSchema.PrimaryKeyColumn, insertColumnsSQL, insertValuesSQL, updateStatements)
// }

// func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedToastColsLists []string) string {
// updateStmts := make([]string, 0)

// for _, cols := range unchangedToastColsLists {
// unchangedColsArray := strings.Split(cols, ",")
// otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
// tmpArray := make([]string, 0)
// for _, colName := range otherCols {
// tmpArray = append(tmpArray, fmt.Sprintf("%s=src.%s", colName, colName))
// }
// ssep := strings.Join(tmpArray, ",")
// updateStmt := fmt.Sprintf(`WHEN MATCHED AND
// src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s'
// THEN UPDATE SET %s `, cols, ssep)
// updateStmts = append(updateStmts, updateStmt)
// }
// return strings.Join(updateStmts, "\n")
// }
return fmt.Sprintf(mergeStatementSQL, primaryKeyColumnCast, internalSchema, rawTableIdentifier,
destinationTableIdentifier, flattenedCastsSQL, normalizedTableSchema.PrimaryKeyColumn,
normalizedTableSchema.PrimaryKeyColumn, insertColumnsSQL, insertValuesSQL, updateStatements)
}

func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedToastColsLists []string) string {
updateStmts := make([]string, 0)

for _, cols := range unchangedToastColsLists {
unchangedColsArray := strings.Split(cols, ",")
for i, col := range unchangedColsArray {
unchangedColsArray[i] = fmt.Sprintf("\"%s\"", col)
}
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("%s=src.%s", colName, colName))
}
ssep := strings.Join(tmpArray, ",")
quotedCols := strings.Join(unchangedColsArray, ",")
updateStmt := fmt.Sprintf(`WHEN MATCHED AND
src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, quotedCols, ssep)
src._peerdb_record_type=1 AND _peerdb_unchanged_toast_columns='%s'
THEN UPDATE SET %s `, cols, ssep)
updateStmts = append(updateStmts, updateStmt)
}
return strings.Join(updateStmts, "\n")
Expand Down
21 changes: 4 additions & 17 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,20 +575,6 @@ func (c *PostgresConnector) getTableSchemaForTable(
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}

pkey, err := c.getPrimaryKeyColumn(schemaTable)
if err != nil {
if !isFullReplica {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
}

res := &protos.TableSchema{
TableIdentifier: tableName,
Columns: make(map[string]string),
PrimaryKeyColumn: pkey,
IsReplicaIdentityFull: isFullReplica,
}

// Get the column names and types
rows, err := c.pool.Query(c.ctx,
fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, tableName), pgx.QueryExecModeSimpleProtocol)
Expand All @@ -603,9 +589,10 @@ func (c *PostgresConnector) getTableSchemaForTable(
}

res := &protos.TableSchema{
TableIdentifier: req.TableIdentifier,
Columns: make(map[string]string),
PrimaryKeyColumns: pKeyCols,
TableIdentifier: tableName,
Columns: make(map[string]string),
PrimaryKeyColumns: pKeyCols,
IsReplicaIdentityFull: isFullReplica,
}

for _, fieldDescription := range rows.FieldDescriptions() {
Expand Down
Loading

0 comments on commit d4e0997

Please sign in to comment.