Skip to content

Commit

Permalink
analyze dml statement for fk in semanitc analysis phase
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Oct 10, 2023
1 parent e5eea9c commit 001f201
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 20 deletions.
5 changes: 5 additions & 0 deletions go/vt/schemadiff/semantics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/semantics"
"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -55,6 +56,10 @@ func (si *declarativeSchemaInformation) ConnCollation() collations.ID {
return 45
}

func (si *declarativeSchemaInformation) ForeignKeyMode(keyspace string) (vschemapb.Keyspace_ForeignKeyMode, error) {
return vschemapb.Keyspace_FK_UNMANAGED, nil
}

// addTable adds a fake table with an empty column list
func (si *declarativeSchemaInformation) addTable(tableName string) {
tbl := &vindexes.Table{
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vtgate/semantics/FakeSI.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)
Expand All @@ -41,6 +42,10 @@ func (s *FakeSI) FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Tab
return nil, s.VindexTables[sqlparser.String(tablename)], "", 0, nil, nil
}

func (FakeSI) ConnCollation() collations.ID {
func (*FakeSI) ConnCollation() collations.ID {
return 45
}

func (s *FakeSI) ForeignKeyMode(keyspace string) (vschemapb.Keyspace_ForeignKeyMode, error) {
return vschemapb.Keyspace_FK_UNMANAGED, nil
}
171 changes: 152 additions & 19 deletions go/vt/vtgate/semantics/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package semantics

import (
"vitess.io/vitess/go/mysql/collations"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

// analyzer controls the flow of the analysis.
Expand Down Expand Up @@ -74,9 +76,7 @@ func Analyze(statement sqlparser.Statement, currentDb string, si SchemaInformati
}

// Creation of the semantic table
semTable := analyzer.newSemTable(statement, si.ConnCollation())

return semTable, nil
return analyzer.newSemTable(statement, si.ConnCollation())
}

// AnalyzeStrict analyzes the parsed query, and fails the analysis for any possible errors
Expand All @@ -96,7 +96,7 @@ func AnalyzeStrict(statement sqlparser.Statement, currentDb string, si SchemaInf
return st, nil
}

func (a *analyzer) newSemTable(statement sqlparser.Statement, coll collations.ID) *SemTable {
func (a *analyzer) newSemTable(statement sqlparser.Statement, coll collations.ID) (*SemTable, error) {
var comments *sqlparser.ParsedComments
commentedStmt, isCommented := statement.(sqlparser.Commented)
if isCommented {
Expand All @@ -107,22 +107,29 @@ func (a *analyzer) newSemTable(statement sqlparser.Statement, coll collations.ID
columns[union] = info.exprs
}

return &SemTable{
Recursive: a.binder.recursive,
Direct: a.binder.direct,
ExprTypes: a.typer.exprTypes,
Tables: a.tables.Tables,
NotSingleRouteErr: a.projErr,
NotUnshardedErr: a.unshardedErr,
Warning: a.warning,
Comments: comments,
ColumnEqualities: map[columnName][]sqlparser.Expr{},
Collation: coll,
ExpandedColumns: a.rewriter.expandedColumns,
columns: columns,
StatementIDs: a.scoper.statementIDs,
QuerySignature: a.sig,
childFks, parentFks, err := a.getInvolvedForeignKeys(statement)
if err != nil {
return nil, err
}

return &SemTable{
Recursive: a.binder.recursive,
Direct: a.binder.direct,
ExprTypes: a.typer.exprTypes,
Tables: a.tables.Tables,
NotSingleRouteErr: a.projErr,
NotUnshardedErr: a.unshardedErr,
Warning: a.warning,
Comments: comments,
ColumnEqualities: map[columnName][]sqlparser.Expr{},
Collation: coll,
ExpandedColumns: a.rewriter.expandedColumns,
columns: columns,
StatementIDs: a.scoper.statementIDs,
QuerySignature: a.sig,
ChildForeignKeysInvolved: childFks,
ParentForeignKeysInvolved: parentFks,
}, nil
}

func (a *analyzer) setError(err error) {
Expand Down Expand Up @@ -312,6 +319,132 @@ func (a *analyzer) noteQuerySignature(node sqlparser.SQLNode) {
}
}

// TODO: comment
func (a *analyzer) getInvolvedForeignKeys(statement sqlparser.Statement) (map[TableSet][]vindexes.ChildFKInfo, map[TableSet][]vindexes.ParentFKInfo, error) {
switch stmt := statement.(type) {
case *sqlparser.Insert, *sqlparser.Delete:
return a.getAllManagedForeignKeys()
case *sqlparser.Update:
allChildFks, allParentFks, err := a.getAllManagedForeignKeys()
if err != nil {
return nil, nil, err
}

if len(allChildFks) == 0 && len(allParentFks) == 0 {
return nil, nil, nil
}

pFksRequired := make(map[TableSet][]bool, len(allParentFks))
cFksRequired := make(map[TableSet][]bool, len(allChildFks))
for ts, fks := range allParentFks {
pFksRequired[ts] = make([]bool, len(fks))
}
for ts, fks := range allChildFks {
cFksRequired[ts] = make([]bool, len(fks))
}

UpdExprToTableSet := make(map[*sqlparser.ColName]TableSet)

// Go over all the update expressions
for _, updateExpr := range stmt.Exprs {
deps := a.binder.direct.dependencies(updateExpr.Name)
if deps.NumberOfTables() != 1 {
panic("expected to have single table dependency")
}
UpdExprToTableSet[updateExpr.Name] = deps
childFks := allChildFks[deps]
parentFKs := allParentFks[deps]

// Any foreign key to a child table for a column that has been updated
// will require the cascade operations or restrict verification to happen, so we include all such foreign keys.
for idx, childFk := range childFks {
if childFk.ParentColumns.FindColumn(updateExpr.Name.Name) >= 0 {
cFksRequired[deps][idx] = true
}
}
// If we are setting a column to NULL, then we don't need to verify the existance of an
// equivalent row in the parent table, even if this column was part of a foreign key to a parent table.
if sqlparser.IsNull(updateExpr.Expr) {
continue
}
// We add all the possible parent foreign key constraints that need verification that an equivalent row
// exists, given that this column has changed.
for idx, parentFk := range parentFKs {
if parentFk.ChildColumns.FindColumn(updateExpr.Name.Name) >= 0 {
pFksRequired[deps][idx] = true
}
}
}
// For the parent foreign keys, if any of the columns part of the fk is set to NULL,
// then, we don't care for the existence of an equivalent row in the parent table.
for _, updateExpr := range stmt.Exprs {
if !sqlparser.IsNull(updateExpr.Expr) {
continue
}
ts := UpdExprToTableSet[updateExpr.Name]
parentFKs := allParentFks[ts]
for idx, parentFk := range parentFKs {
if parentFk.ChildColumns.FindColumn(updateExpr.Name.Name) >= 0 {
pFksRequired[ts][idx] = false
}
}
}

// Get the filtered lists and return them.
pFksNeedsHandling := map[TableSet][]vindexes.ParentFKInfo{}
cFksNeedsHandling := map[TableSet][]vindexes.ChildFKInfo{}
for ts, parentFks := range allParentFks {
var pFKNeeded []vindexes.ParentFKInfo
for idx, fk := range parentFks {
if pFksRequired[ts][idx] {
pFKNeeded = append(pFKNeeded, fk)
}
}
pFksNeedsHandling[ts] = pFKNeeded

}
for ts, childFks := range allChildFks {
var cFKNeeded []vindexes.ChildFKInfo
for idx, fk := range childFks {
if cFksRequired[ts][idx] {
cFKNeeded = append(cFKNeeded, fk)
}
}
cFksNeedsHandling[ts] = cFKNeeded

}
return cFksNeedsHandling, pFksNeedsHandling, nil
default:
return nil, nil, nil
}
}

// TODO: comment
func (a *analyzer) getAllManagedForeignKeys() (map[TableSet][]vindexes.ChildFKInfo, map[TableSet][]vindexes.ParentFKInfo, error) {
allChildFKs := make(map[TableSet][]vindexes.ChildFKInfo)
allParentFKs := make(map[TableSet][]vindexes.ParentFKInfo)

for idx, table := range a.tables.Tables {
vi := table.GetVindexTable()
if vi == nil || vi.Keyspace == nil {
// If is not a real table, so should be skipped.
continue
}
fkMode, err := a.tables.si.ForeignKeyMode(vi.Keyspace.Name)
if err != nil {
return nil, nil, err
}
if fkMode != vschemapb.Keyspace_FK_MANAGED {
continue
}

ts := SingleTableSet(idx)
allChildFKs[ts] = vi.ChildForeignKeys
allParentFKs[ts] = vi.ParentForeignKeys
}
return allChildFKs, allParentFKs, nil
}

// ProjError is used to mark an error as something that should only be returned
// if the planner fails to merge everything down to a single route
type ProjError struct {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/semantics/info_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -1712,3 +1713,7 @@ func (i *infoSchemaWithColumns) FindTableOrVindex(tbl sqlparser.TableName) (*vin
func (i *infoSchemaWithColumns) ConnCollation() collations.ID {
return i.inner.ConnCollation()
}

func (i *infoSchemaWithColumns) ForeignKeyMode(keyspace string) (vschemapb.Keyspace_ForeignKeyMode, error) {
return i.inner.ForeignKeyMode(keyspace)
}
7 changes: 7 additions & 0 deletions go/vt/vtgate/semantics/semantic_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -125,6 +126,10 @@ type (

// QuerySignature is used to identify shortcuts in the planning process
QuerySignature QuerySignature

// TODO - comments
ChildForeignKeysInvolved map[TableSet][]vindexes.ChildFKInfo
ParentForeignKeysInvolved map[TableSet][]vindexes.ParentFKInfo
}

columnName struct {
Expand All @@ -136,6 +141,8 @@ type (
SchemaInformation interface {
FindTableOrVindex(tablename sqlparser.TableName) (*vindexes.Table, vindexes.Vindex, string, topodatapb.TabletType, key.Destination, error)
ConnCollation() collations.ID
// ForeignKeyMode returns the foreign_key flag value
ForeignKeyMode(keyspace string) (vschemapb.Keyspace_ForeignKeyMode, error)
}
)

Expand Down

0 comments on commit 001f201

Please sign in to comment.