From f2292969b0e5cd004ed26a9226a16881bbd525e8 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 22 Apr 2024 11:31:15 +0530 Subject: [PATCH] add udfs to vschema on update Signed-off-by: Harshit Gangal --- go/vt/vtgate/schema/tracker.go | 7 +++--- go/vt/vtgate/vschema_manager.go | 6 ++++++ go/vt/vtgate/vschema_manager_test.go | 32 ++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index bc622a7ca1f..a1b2009d0e1 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -164,23 +164,22 @@ func (t *Tracker) loadUDFs(conn queryservice.QueryService, target *querypb.Targe t.mu.Lock() defer t.mu.Unlock() + var udfs []string err := conn.GetSchema(t.ctx, target, querypb.SchemaTableType_UDFS, nil, func(schemaRes *querypb.GetSchemaResponse) error { - var udfs []string for _, udf := range schemaRes.Udfs { if !udf.Aggregating { continue } udfs = append(udfs, udf.Name) } - - t.udfs[target.Keyspace] = udfs return nil }) if err != nil { log.Errorf("error fetching new UDFs for %v: %w", target.Keyspace, err) return err } - log.Infof("finished loading UDFs for keyspace %s", target.Keyspace) + t.udfs[target.Keyspace] = udfs + log.Infof("finished loading %d UDFs for keyspace %s", len(udfs), target.Keyspace) return nil } diff --git a/go/vt/vtgate/vschema_manager.go b/go/vt/vtgate/vschema_manager.go index 8b346a0274f..dbac5589ce8 100644 --- a/go/vt/vtgate/vschema_manager.go +++ b/go/vt/vtgate/vschema_manager.go @@ -204,6 +204,7 @@ func (vm *VSchemaManager) updateFromSchema(vschema *vindexes.VSchema) { for ksName, ks := range vschema.Keyspaces { vm.updateTableInfo(vschema, ks, ksName) vm.updateViewInfo(ks, ksName) + vm.updateUDFsInfo(ks, ksName) } } @@ -267,6 +268,11 @@ func (vm *VSchemaManager) updateTableInfo(vschema *vindexes.VSchema, ks *vindexe } } +// updateUDFsInfo updates the aggregate UDFs in the Vschema. +func (vm *VSchemaManager) updateUDFsInfo(ks *vindexes.KeyspaceSchema, ksName string) { + ks.AggregateUDFs = vm.schema.UDFs(ksName) +} + func markErrorIfCyclesInFk(vschema *vindexes.VSchema) { for ksName, ks := range vschema.Keyspaces { // Only check cyclic foreign keys for keyspaces that have diff --git a/go/vt/vtgate/vschema_manager_test.go b/go/vt/vtgate/vschema_manager_test.go index 230ea961437..32f83f0021a 100644 --- a/go/vt/vtgate/vschema_manager_test.go +++ b/go/vt/vtgate/vschema_manager_test.go @@ -438,6 +438,38 @@ func TestRebuildVSchema(t *testing.T) { } } +// TestVSchemaUDFsUpdate tests that the UDFs are updated in the VSchema. +func TestVSchemaUDFsUpdate(t *testing.T) { + ks := &vindexes.Keyspace{Name: "ks", Sharded: true} + + vm := &VSchemaManager{} + var vs *vindexes.VSchema + vm.subscriber = func(vschema *vindexes.VSchema, _ *VSchemaStats) { + vs = vschema + vs.ResetCreated() + } + vm.schema = &fakeSchema{udfs: []string{"udf1", "udf2"}} + vm.VSchemaUpdate(&vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks": {Sharded: true}, + }, + }, nil) + + utils.MustMatchFn(".globalTables", ".uniqueVindexes")(t, &vindexes.VSchema{ + RoutingRules: map[string]*vindexes.RoutingRule{}, + Keyspaces: map[string]*vindexes.KeyspaceSchema{ + "ks": { + Keyspace: ks, + ForeignKeyMode: vschemapb.Keyspace_unmanaged, + Tables: map[string]*vindexes.Table{}, + Vindexes: map[string]vindexes.Vindex{}, + AggregateUDFs: []string{"udf1", "udf2"}, + }, + }, + }, vs) + utils.MustMatch(t, vs, vm.currentVschema, "currentVschema does not match Vschema") +} + func TestMarkErrorIfCyclesInFk(t *testing.T) { ksName := "ks" keyspace := &vindexes.Keyspace{