Skip to content

Commit

Permalink
Fix subsetting table with json column (#1982)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored May 21, 2024
1 parent 97d8f15 commit 34ceca3
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 34 deletions.
1 change: 1 addition & 0 deletions worker/internal/benthos/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func IsCriticalError(errMsg string) bool {
"Duplicate entry",
"Cannot add or update a child row",
"a foreign key constraint fails",
"could not identify an equality operator",
}

for _, errStr := range criticalErrors {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
if err != nil {
return nil, fmt.Errorf("unable to retrieve database foreign key constraints: %w", err)
}
tableQueryMap, err := buildSelectQueryMap(db.Driver, groupedTableMapping, sourceTableOpts, fkReferenceMap, runConfigs, sqlSourceOpts.SubsetByForeignKeyConstraints)
tableQueryMap, err := buildSelectQueryMap(db.Driver, groupedTableMapping, sourceTableOpts, fkReferenceMap, runConfigs, sqlSourceOpts.SubsetByForeignKeyConstraints, groupedSchemas)
if err != nil {
return nil, fmt.Errorf("unable to build select queries: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func buildSelectJoinQuery(
func buildSelectRecursiveQuery(
driver, schema, table string,
columns []string,
columnInfoMap map[string]*sql_manager.ColumnInfo,
dependencies []*selfReferencingCircularDependency,
joins []*sqlJoin,
whereClauses []string,
Expand All @@ -148,7 +149,12 @@ func buildSelectRecursiveQuery(

selectColumns := make([]any, len(columns))
for i, col := range columns {
selectColumns[i] = buildSqlIdentifier(schema, table, col)
colInfo := columnInfoMap[col]
if driver == sql_manager.PostgresDriver && colInfo != nil && colInfo.DataType == "json" {
selectColumns[i] = goqu.L("to_jsonb(?)", goqu.I(buildSqlIdentifier(schema, table, col))).As(col)
} else {
selectColumns[i] = buildSqlIdentifier(schema, table, col)
}
}
selectQuery := builder.From(sqltable).Select(selectColumns...)

Expand Down Expand Up @@ -220,6 +226,7 @@ func buildSelectQueryMap(
tableDependencies map[string][]*sql_manager.ColumnConstraint,
dependencyConfigs []*tabledependency.RunConfig,
subsetByForeignKeyConstraints bool,
groupedColumnInfo map[string]map[string]*sql_manager.ColumnInfo,
) (map[string]string, error) {
// map of table -> where clause
tableWhereMap := map[string]string{}
Expand Down Expand Up @@ -262,6 +269,7 @@ func buildSelectQueryMap(
for table := range dependencyMap {
tableMapping := groupedMappings[table]
config := subsetConfigs[table]
columnInfoMap := groupedColumnInfo[table]

selectCols := []string{}
for _, m := range tableMapping.Mappings {
Expand All @@ -273,6 +281,7 @@ func buildSelectQueryMap(
tableMapping.Schema,
tableMapping.Table,
buildPlainColumns(tableMapping.Mappings),
columnInfoMap,
config.SelfReferencingCircularDependency,
config.Joins,
config.WhereClauses,
Expand Down
Loading

0 comments on commit 34ceca3

Please sign in to comment.