diff --git a/worker/internal/benthos/utils.go b/worker/internal/benthos/utils.go index e14034b88e..c6c7b49c95 100644 --- a/worker/internal/benthos/utils.go +++ b/worker/internal/benthos/utils.go @@ -31,6 +31,7 @@ func IsCriticalError(errMsg string) bool { "Cannot add or update a child row", "a foreign key constraint fails", "could not identify an equality operator", + "violates not-null constraint", } for _, errStr := range criticalErrors { diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/query-builder.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/query-builder.go index 4b9cab6af7..254a3c9ba8 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/query-builder.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/query-builder.go @@ -343,6 +343,14 @@ func buildSubsetJoins(table string, data map[string][]*SubsetColumnConstraint, w joins := []*sqlJoin{} wheres := []string{} + if seen := visited[table]; seen { + return &tableSubset{ + Joins: joins, + WhereClauses: wheres, + } + } + visited[table] = true + if condition, exists := whereClauses[table]; exists { wheres = append(wheres, condition) } @@ -352,28 +360,27 @@ func buildSubsetJoins(table string, data map[string][]*SubsetColumnConstraint, w if col.ForeignKey.Table == "" && col.ForeignKey.Columns == nil { continue } - // handle aliased table - var alias *string - joinTable := col.ForeignKey.Table - if col.ForeignKey.OriginalTable != nil && *col.ForeignKey.OriginalTable != "" { - alias = &col.ForeignKey.Table - joinTable = *col.ForeignKey.OriginalTable - } + if !visited[col.ForeignKey.Table] { + // handle aliased table + var alias *string + joinTable := col.ForeignKey.Table + if col.ForeignKey.OriginalTable != nil && *col.ForeignKey.OriginalTable != "" { + alias = &col.ForeignKey.Table + joinTable = *col.ForeignKey.OriginalTable + } - joinColMap := map[string]string{} - for idx, c := range col.ForeignKey.Columns { - joinColMap[c] = col.Columns[idx] - } - joins = append(joins, &sqlJoin{ - JoinType: innerJoin, - JoinTable: joinTable, - BaseTable: table, - Alias: alias, - JoinColumnsMap: joinColMap, - }) + joinColMap := map[string]string{} + for idx, c := range col.ForeignKey.Columns { + joinColMap[c] = col.Columns[idx] + } + joins = append(joins, &sqlJoin{ + JoinType: innerJoin, + JoinTable: joinTable, + BaseTable: table, + Alias: alias, + JoinColumnsMap: joinColMap, + }) - if !visited[col.ForeignKey.Table] { - visited[col.ForeignKey.Table] = true sub := buildSubsetJoins(col.ForeignKey.Table, data, whereClauses, visited) joins = append(joins, sub.Joins...) wheres = append(wheres, sub.WhereClauses...) @@ -581,7 +588,7 @@ func filterForeignKeysWithSubset(runConfigMap map[string][]*tabledependency.RunC for table, configs := range runConfigMap { filteredConstraints[table] = []*sql_manager.ColumnConstraint{} for _, c := range configs { - if c.RunType == tabledependency.RunTypeInsert && len(c.DependsOn) > 0 { + if c.RunType == tabledependency.RunTypeInsert { if tableConstraints, ok := constraints[table]; ok { for _, colDef := range tableConstraints { if exists := tableSubsetMap[colDef.ForeignKey.Table]; exists { diff --git a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/query-builder_test.go b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/query-builder_test.go index 2fab4166f1..511c3baabe 100644 --- a/worker/pkg/workflows/datasync/activities/gen-benthos-configs/query-builder_test.go +++ b/worker/pkg/workflows/datasync/activities/gen-benthos-configs/query-builder_test.go @@ -862,9 +862,139 @@ func Test_buildSelectQueryMap_CircularDependency(t *testing.T) { } expected := map[string]string{ - "public.b": `SELECT "id", "name", "a_id" FROM "public"."b" WHERE public.b.name = 'neo';`, - "public.c": `SELECT "public"."c"."id", "public"."c"."b_id" FROM "public"."c" INNER JOIN "public"."b" ON ("public"."b"."id" = "public"."c"."b_id") WHERE public.b.name = 'neo';`, "public.a": `SELECT "public"."a"."id", "public"."a"."c_id" FROM "public"."a" INNER JOIN "public"."c" ON ("public"."c"."id" = "public"."a"."c_id") INNER JOIN "public"."b" ON ("public"."b"."id" = "public"."c"."b_id") WHERE public.b.name = 'neo';`, + "public.b": `SELECT "public"."b"."id", "public"."b"."name", "public"."b"."a_id" FROM "public"."b" INNER JOIN "public"."a" ON ("public"."a"."id" = "public"."b"."a_id") INNER JOIN "public"."c" ON ("public"."c"."id" = "public"."a"."c_id") WHERE public.b.name = 'neo';`, + "public.c": `SELECT "public"."c"."id", "public"."c"."b_id" FROM "public"."c" INNER JOIN "public"."b" ON ("public"."b"."id" = "public"."c"."b_id") INNER JOIN "public"."a" ON ("public"."a"."id" = "public"."b"."a_id") WHERE public.b.name = 'neo';`, + } + sql, err := buildSelectQueryMap("postgres", mappings, sourceTableOpts, tableDependencies, dependencyConfigs, true, map[string]map[string]*sql_manager.ColumnInfo{}) + require.NoError(t, err) + require.Equal(t, expected, sql) +} + +func Test_buildSelectQueryMap_circularDependency_additional_table(t *testing.T) { + whereName := "name = 'neo'" + mappings := map[string]*tableMapping{ + "public.addresses": { + Schema: "public", + Table: "addresses", + Mappings: []*mgmtv1alpha1.JobMapping{ + { + Schema: "public", + Table: "addresses", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, + }, + }, + { + Schema: "public", + Table: "addresses", + Column: "order_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, + }, + }, + }, + }, + "public.customers": { + Schema: "public", + Table: "customers", + Mappings: []*mgmtv1alpha1.JobMapping{ + { + Schema: "public", + Table: "customers", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, + }, + }, + { + Schema: "public", + Table: "customers", + Column: "address_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, + }, + }, + }, + }, + "public.orders": { + Schema: "public", + Table: "orders", + Mappings: []*mgmtv1alpha1.JobMapping{ + { + Schema: "public", + Table: "orders", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, + }, + }, + { + Schema: "public", + Table: "orders", + Column: "customer_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, + }, + }, + }, + }, + "public.payments": { + Schema: "public", + Table: "payments", + Mappings: []*mgmtv1alpha1.JobMapping{ + { + Schema: "public", + Table: "payments", + Column: "id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, + }, + }, + { + Schema: "public", + Table: "payments", + Column: "customer_id", + Transformer: &mgmtv1alpha1.JobMappingTransformer{ + Source: mgmtv1alpha1.TransformerSource_TRANSFORMER_SOURCE_GENERATE_DEFAULT, + }, + }, + }, + }, + } + sourceTableOpts := map[string]*sqlSourceTableOptions{ + "public.addresses": { + WhereClause: &whereName, + }, + } + tableDependencies := map[string][]*sql_manager.ColumnConstraint{ + "public.addresses": { + {Columns: []string{"order_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.orders", Columns: []string{"id"}}}, + }, + "public.customers": { + {Columns: []string{"address_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.addresses", Columns: []string{"id"}}}, + }, + "public.orders": { + {Columns: []string{"customer_id"}, NotNullable: []bool{false}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.customers", Columns: []string{"id"}}}, + }, + "public.payments": { + {Columns: []string{"customer_id"}, NotNullable: []bool{true}, ForeignKey: &sql_manager.ReferenceKey{Table: "public.customers", Columns: []string{"id"}}}, + }, + } + dependencyConfigs := []*tabledependency.RunConfig{ + {Table: "public.orders", RunType: tabledependency.RunTypeInsert, Columns: []string{"id"}, DependsOn: []*tabledependency.DependsOn{}}, + {Table: "public.addresses", RunType: tabledependency.RunTypeInsert, Columns: []string{"id", "order_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "public.orders", Columns: []string{"id"}}}}, + {Table: "public.customers", RunType: tabledependency.RunTypeInsert, Columns: []string{"id", "address_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "public.addresses", Columns: []string{"id"}}}}, + {Table: "public.payments", RunType: tabledependency.RunTypeInsert, Columns: []string{"id", "customer_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "public.customers", Columns: []string{"id"}}}}, + {Table: "public.orders", RunType: tabledependency.RunTypeUpdate, Columns: []string{"customer_id"}, DependsOn: []*tabledependency.DependsOn{{Table: "public.orders", Columns: []string{"id"}}, {Table: "public.customers", Columns: []string{"id"}}}}, + } + expected := + map[string]string{ + "public.addresses": `SELECT "public"."addresses"."id", "public"."addresses"."order_id" FROM "public"."addresses" INNER JOIN "public"."orders" ON ("public"."orders"."id" = "public"."addresses"."order_id") INNER JOIN "public"."customers" ON ("public"."customers"."id" = "public"."orders"."customer_id") WHERE public.addresses.name = 'neo';`, + "public.customers": `SELECT "public"."customers"."id", "public"."customers"."address_id" FROM "public"."customers" INNER JOIN "public"."addresses" ON ("public"."addresses"."id" = "public"."customers"."address_id") INNER JOIN "public"."orders" ON ("public"."orders"."id" = "public"."addresses"."order_id") WHERE public.addresses.name = 'neo';`, + "public.orders": `SELECT "public"."orders"."id", "public"."orders"."customer_id" FROM "public"."orders" INNER JOIN "public"."customers" ON ("public"."customers"."id" = "public"."orders"."customer_id") INNER JOIN "public"."addresses" ON ("public"."addresses"."id" = "public"."customers"."address_id") WHERE public.addresses.name = 'neo';`, + "public.payments": `SELECT "public"."payments"."id", "public"."payments"."customer_id" FROM "public"."payments" INNER JOIN "public"."customers" ON ("public"."customers"."id" = "public"."payments"."customer_id") INNER JOIN "public"."addresses" ON ("public"."addresses"."id" = "public"."customers"."address_id") INNER JOIN "public"."orders" ON ("public"."orders"."id" = "public"."addresses"."order_id") WHERE public.addresses.name = 'neo';`, } sql, err := buildSelectQueryMap("postgres", mappings, sourceTableOpts, tableDependencies, dependencyConfigs, true, map[string]map[string]*sql_manager.ColumnInfo{}) require.NoError(t, err) @@ -1997,6 +2127,76 @@ func Test_buildSelectQueryMap_shouldContinue(t *testing.T) { require.Equal(t, expected, sql) } +func Test_filterForeignKeysWithSubset_partialtables(t *testing.T) { + var emptyWhere *string + where := "id = '36f594af-6d53-4a48-a9b7-b889e2df349e'" + runConfigMap := map[string][]*tabledependency.RunConfig{ + "circle.addresses": { + { + Table: "circle.addresses", + Columns: []string{"id"}, + DependsOn: []*tabledependency.DependsOn{}, + RunType: tabledependency.RunTypeInsert, + PrimaryKeys: []string{"id"}, + WhereClause: &where, + }, + { + Table: "circle.addresses", + Columns: []string{"order_id"}, + DependsOn: []*tabledependency.DependsOn{{Table: "circle.addresses", Columns: []string{"id"}}, {Table: "circle.orders", Columns: []string{"id"}}}, + RunType: tabledependency.RunTypeUpdate, + PrimaryKeys: []string{"id"}, + WhereClause: &where, + }, + }, + "circle.customers": { + { + Table: "circle.customers", + Columns: []string{"id", "address_id"}, + DependsOn: []*tabledependency.DependsOn{{Table: "circle.addresses", Columns: []string{"id"}}}, + RunType: tabledependency.RunTypeInsert, + PrimaryKeys: []string{"id"}, + WhereClause: emptyWhere, + }, + }, + } + + constraints := map[string][]*sql_manager.ColumnConstraint{ + "circle.addresses": { + { + Columns: []string{"order_id"}, + NotNullable: []bool{false}, + ForeignKey: &sql_manager.ReferenceKey{Table: "circle.orders", Columns: []string{"id"}}, + }, + }, + "circle.customers": { + { + Columns: []string{"address_id"}, + NotNullable: []bool{false}, + ForeignKey: &sql_manager.ReferenceKey{Table: "circle.addresses", Columns: []string{"id"}}, + }, + }, + } + + whereClauses := map[string]string{ + "circle.addresses": "id = '36f594af-6d53-4a48-a9b7-b889e2df349e'", + } + + expected := map[string][]*sql_manager.ColumnConstraint{ + "circle.addresses": {}, + "circle.customers": { + { + Columns: []string{"address_id"}, + NotNullable: []bool{false}, + ForeignKey: &sql_manager.ReferenceKey{Table: "circle.addresses", Columns: []string{"id"}}, + }, + }, + } + + actual := filterForeignKeysWithSubset(runConfigMap, constraints, whereClauses) + require.Equal(t, expected, actual) +} + func Test_qualifyWhereColumnNames_mysql(t *testing.T) { tests := []struct { name string