Skip to content

Commit

Permalink
fix circular dependencies subsetting (#1991)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored May 21, 2024
1 parent 1e9a02c commit 49a7e6b
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 23 deletions.
1 change: 1 addition & 0 deletions worker/internal/benthos/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func IsCriticalError(errMsg string) bool {
"Cannot add or update a child row",
"a foreign key constraint fails",
"could not identify an equality operator",
"violates not-null constraint",
}

for _, errStr := range criticalErrors {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,14 @@ func buildSubsetJoins(table string, data map[string][]*SubsetColumnConstraint, w
joins := []*sqlJoin{}
wheres := []string{}

if seen := visited[table]; seen {
return &tableSubset{
Joins: joins,
WhereClauses: wheres,
}
}
visited[table] = true

if condition, exists := whereClauses[table]; exists {
wheres = append(wheres, condition)
}
Expand All @@ -352,28 +360,27 @@ func buildSubsetJoins(table string, data map[string][]*SubsetColumnConstraint, w
if col.ForeignKey.Table == "" && col.ForeignKey.Columns == nil {
continue
}
// handle aliased table
var alias *string
joinTable := col.ForeignKey.Table
if col.ForeignKey.OriginalTable != nil && *col.ForeignKey.OriginalTable != "" {
alias = &col.ForeignKey.Table
joinTable = *col.ForeignKey.OriginalTable
}
if !visited[col.ForeignKey.Table] {
// handle aliased table
var alias *string
joinTable := col.ForeignKey.Table
if col.ForeignKey.OriginalTable != nil && *col.ForeignKey.OriginalTable != "" {
alias = &col.ForeignKey.Table
joinTable = *col.ForeignKey.OriginalTable
}

joinColMap := map[string]string{}
for idx, c := range col.ForeignKey.Columns {
joinColMap[c] = col.Columns[idx]
}
joins = append(joins, &sqlJoin{
JoinType: innerJoin,
JoinTable: joinTable,
BaseTable: table,
Alias: alias,
JoinColumnsMap: joinColMap,
})
joinColMap := map[string]string{}
for idx, c := range col.ForeignKey.Columns {
joinColMap[c] = col.Columns[idx]
}
joins = append(joins, &sqlJoin{
JoinType: innerJoin,
JoinTable: joinTable,
BaseTable: table,
Alias: alias,
JoinColumnsMap: joinColMap,
})

if !visited[col.ForeignKey.Table] {
visited[col.ForeignKey.Table] = true
sub := buildSubsetJoins(col.ForeignKey.Table, data, whereClauses, visited)
joins = append(joins, sub.Joins...)
wheres = append(wheres, sub.WhereClauses...)
Expand Down Expand Up @@ -581,7 +588,7 @@ func filterForeignKeysWithSubset(runConfigMap map[string][]*tabledependency.RunC
for table, configs := range runConfigMap {
filteredConstraints[table] = []*sql_manager.ColumnConstraint{}
for _, c := range configs {
if c.RunType == tabledependency.RunTypeInsert && len(c.DependsOn) > 0 {
if c.RunType == tabledependency.RunTypeInsert {
if tableConstraints, ok := constraints[table]; ok {
for _, colDef := range tableConstraints {
if exists := tableSubsetMap[colDef.ForeignKey.Table]; exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,9 +862,139 @@ func Test_buildSelectQueryMap_CircularDependency(t *testing.T) {
}
expected :=
map[string]string{
"public.b": `SELECT "id", "name", "a_id" FROM "public"."b" WHERE public.b.name = 'neo';`,
"public.c": `SELECT "public"."c"."id", "public"."c"."b_id" FROM "public"."c" INNER JOIN "public"."b" ON ("public"."b"."id" = "public"."c"."b_id") WHERE public.b.name = 'neo';`,
"public.a": `SELECT "public"."a"."id", "public"."a"."c_id" FROM "public"."a" INNER JOIN "public"."c" ON ("public"."c"."id" = "public"."a"."c_id") INNER JOIN "public"."b" ON ("public"."b"."id" = "public"."c"."b_id") WHERE public.b.name = 'neo';`,
"public.b": `SELECT "public"."b"."id", "public"."b"."name", "public"."b"."a_id" FROM "public"."b" INNER JOIN "public"."a" ON ("public"."a"."id" = "public"."b"."a_id") INNER JOIN "public"."c" ON ("public"."c"."id" = "public"."a"."c_id") WHERE public.b.name = 'neo';`,
"public.c": `SELECT "public"."c"."id", "public"."c"."b_id" FROM "public"."c" INNER JOIN "public"."b" ON ("public"."b"."id" = "public"."c"."b_id") INNER JOIN "public"."a" ON ("public"."a"."id" = "public"."b"."a_id") WHERE public.b.name = 'neo';`,
}
sql, err := buildSelectQueryMap("postgres", mappings, sourceTableOpts, tableDependencies, dependencyConfigs, true, map[string]map[string]*sql_manager.ColumnInfo{})
require.NoError(t, err)
require.Equal(t, expected, sql)
}

func Test_buildSelectQueryMap_circularDependency_additional_table(t *testing.T) {
whereName := "name = 'neo'"
mappings := map[string]*tableMapping{
"public.addresses": {
Schema: "public",
Table: "addresses",
Mappings: []*mgmtv1alpha1.JobMapping{
{
Schema: "public",
Table: "addresses",
Column: "id",
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT,
},
},
{
Schema: "public",
Table: "addresses",
Column: "order_id",
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT,
},
},
},
},
"public.customers": {
Schema: "public",
Table: "customers",
Mappings: []*mgmtv1alpha1.JobMapping{
{
Schema: "public",
Table: "customers",
Column: "id",
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT,
},
},
{
Schema: "public",
Table: "customers",
Column: "address_id",
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT,
},
},
},
},
"public.orders": {
Schema: "public",
Table: "orders",
Mappings: []*mgmtv1alpha1.JobMapping{
{
Schema: "public",
Table: "orders",
Column: "id",
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT,
},
},
{
Schema: "public",
Table: "orders",
Column: "customer_id",
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT,
},
},
},
},
"public.payments": {
Schema: "public",
Table: "payments",
Mappings: []*mgmtv1alpha1.JobMapping{
{
Schema: "public",
Table: "payments",
Column: "id",
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT,
},
},
{
Schema: "public",
Table: "payments",
Column: "customer_id",
Transformer: &mgmtv1alpha1.JobMappingTransformer{
Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT,
},
},
},
},
}
sourceTableOpts := map[string]*sqlSourceTableOptions{
"public.addresses": {
WhereClause: &whereName,
},
}
tableDependencies := map[string][]*sql_manager.ColumnConstraint{
"public.addresses": {
{Columns: []string{"order_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.orders", Columns: []string{"id"}}},
},
"public.customers": {
{Columns: []string{"address_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.addresses", Columns: []string{"id"}}},
},
"public.orders": {
{Columns: []string{"customer_id"}, NotNullable: []bool{false}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.customers", Columns: []string{"id"}}},
},
"public.payments": {
{Columns: []string{"customer_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.customers", Columns: []string{"id"}}},
},
}
dependencyConfigs := []*tabledependency.RunConfig{
{Table: "public.orders", RunType: tabledependency.RunTypeInsert, Columns: []string{"id"}, DependsOn: []*tabledependency.DependsOn{}},
{Table: "public.addresses", RunType: tabledependency.RunTypeInsert, Columns: []string{"id", "order_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "public.orders", Columns: []string{"id"}}}},
{Table: "public.customers", RunType: tabledependency.RunTypeInsert, Columns: []string{"id", "address_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "public.addresses", Columns: []string{"id"}}}},
{Table: "public.payments", RunType: tabledependency.RunTypeInsert, Columns: []string{"id", "customer_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "public.customers", Columns: []string{"id"}}}},
{Table: "public.orders", RunType: tabledependency.RunTypeUpdate, Columns: []string{"customer_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "public.orders", Columns: []string{"id"}}, {Table: "public.customers", Columns: []string{"id"}}}},
}
expected :=
map[string]string{
"public.addresses": `SELECT "public"."addresses"."id", "public"."addresses"."order_id" FROM "public"."addresses" INNER JOIN "public"."orders" ON ("public"."orders"."id" = "public"."addresses"."order_id") INNER JOIN "public"."customers" ON ("public"."customers"."id" = "public"."orders"."customer_id") WHERE public.addresses.name = 'neo';`,
"public.customers": `SELECT "public"."customers"."id", "public"."customers"."address_id" FROM "public"."customers" INNER JOIN "public"."addresses" ON ("public"."addresses"."id" = "public"."customers"."address_id") INNER JOIN "public"."orders" ON ("public"."orders"."id" = "public"."addresses"."order_id") WHERE public.addresses.name = 'neo';`,
"public.orders": `SELECT "public"."orders"."id", "public"."orders"."customer_id" FROM "public"."orders" INNER JOIN "public"."customers" ON ("public"."customers"."id" = "public"."orders"."customer_id") INNER JOIN "public"."addresses" ON ("public"."addresses"."id" = "public"."customers"."address_id") WHERE public.addresses.name = 'neo';`,
"public.payments": `SELECT "public"."payments"."id", "public"."payments"."customer_id" FROM "public"."payments" INNER JOIN "public"."customers" ON ("public"."customers"."id" = "public"."payments"."customer_id") INNER JOIN "public"."addresses" ON ("public"."addresses"."id" = "public"."customers"."address_id") INNER JOIN "public"."orders" ON ("public"."orders"."id" = "public"."addresses"."order_id") WHERE public.addresses.name = 'neo';`,
}
sql, err := buildSelectQueryMap("postgres", mappings, sourceTableOpts, tableDependencies, dependencyConfigs, true, map[string]map[string]*sql_manager.ColumnInfo{})
require.NoError(t, err)
Expand Down Expand Up @@ -1997,6 +2127,76 @@ func Test_buildSelectQueryMap_shouldContinue(t *testing.T) {
require.Equal(t, expected, sql)
}

func Test_filterForeignKeysWithSubset_partialtables(t *testing.T) {
var emptyWhere *string
where := "id = '36f594af-6d53-4a48-a9b7-b889e2df349e'"
runConfigMap := map[string][]*tabledependency.RunConfig{
"circle.addresses": {
{
Table: "circle.addresses",
Columns: []string{"id"},
DependsOn: []*tabledependency.DependsOn{},
RunType: tabledependency.RunTypeInsert,
PrimaryKeys: []string{"id"},
WhereClause: &where,
},
{
Table: "circle.addresses",
Columns: []string{"order_id"},
DependsOn: []*tabledependency.DependsOn{{Table: "circle.addresses", Columns: []string{"id"}}, {Table: "circle.orders", Columns: []string{"id"}}},
RunType: tabledependency.RunTypeUpdate,
PrimaryKeys: []string{"id"},
WhereClause: &where,
},
},
"circle.customers": {
{
Table: "circle.customers",
Columns: []string{"id", "address_id"},
DependsOn: []*tabledependency.DependsOn{{Table: "circle.addresses", Columns: []string{"id"}}},
RunType: tabledependency.RunTypeInsert,
PrimaryKeys: []string{"id"},
WhereClause: emptyWhere,
},
},
}

constraints := map[string][]*sql_manager.ColumnConstraint{
"circle.addresses": {
{
Columns: []string{"order_id"},
NotNullable: []bool{false},
ForeignKey: &sql_manager.ReferenceKey{Table: "circle.orders", Columns: []string{"id"}},
},
},
"circle.customers": {
{
Columns: []string{"address_id"},
NotNullable: []bool{false},
ForeignKey: &sql_manager.ReferenceKey{Table: "circle.addresses", Columns: []string{"id"}},
},
},
}

whereClauses := map[string]string{
"circle.addresses": "id = '36f594af-6d53-4a48-a9b7-b889e2df349e'",
}

expected := map[string][]*sql_manager.ColumnConstraint{
"circle.addresses": {},
"circle.customers": {
{
Columns: []string{"address_id"},
NotNullable: []bool{false},
ForeignKey: &sql_manager.ReferenceKey{Table: "circle.addresses", Columns: []string{"id"}},
},
},
}

actual := filterForeignKeysWithSubset(runConfigMap, constraints, whereClauses)
require.Equal(t, expected, actual)
}

func Test_qualifyWhereColumnNames_mysql(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 49a7e6b

Please sign in to comment.