Skip to content

Commit

Permalink
feat: log and ignore the error in reading udfs
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 committed Aug 22, 2024
1 parent 538dd4c commit 36b02fa
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
7 changes: 7 additions & 0 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,13 @@ func (db *DB) AddQueryPattern(queryPattern string, expectedResult *sqltypes.Resu
db.patternData[queryPattern] = exprResult{queryPattern: queryPattern, expr: expr, result: &result}
}

// RemoveQueryPattern removes a query pattern that was previously added.
func (db *DB) RemoveQueryPattern(queryPattern string) {
db.mu.Lock()
defer db.mu.Unlock()
delete(db.patternData, queryPattern)
}

// RejectQueryPattern allows a query pattern to be rejected with an error
func (db *DB) RejectQueryPattern(queryPattern, error string) {
expr := regexp.MustCompile("(?is)^" + queryPattern + "$")
Expand Down
5 changes: 5 additions & 0 deletions go/vt/logutil/throttled.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ var (
errorDepth = log.ErrorDepth
)

// GetLastLogTime gets the last log time for the throttled logger.
func (tl *ThrottledLogger) GetLastLogTime() time.Time {
return tl.lastlogTime
}

func (tl *ThrottledLogger) log(logF logFunc, format string, v ...any) {
now := time.Now()

Expand Down
13 changes: 8 additions & 5 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/servenv"
Expand Down Expand Up @@ -85,9 +86,10 @@ type Engine struct {

historian *historian

conns *connpool.Pool
ticks *timer.Timer
reloadTimeout time.Duration
conns *connpool.Pool
ticks *timer.Timer
reloadTimeout time.Duration
throttledLogger *logutil.ThrottledLogger

// dbCreationFailed is for preventing log spam.
dbCreationFailed bool
Expand All @@ -109,7 +111,8 @@ func NewEngine(env tabletenv.Env) *Engine {
Size: 3,
IdleTimeout: env.Config().OltpReadPool.IdleTimeout,
}),
ticks: timer.NewTimer(reloadTime),
ticks: timer.NewTimer(reloadTime),
throttledLogger: logutil.NewThrottledLogger("schema-tracker", 1*time.Minute),
}
se.schemaCopy = env.Config().SignalWhenSchemaChange
_ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval)
Expand Down Expand Up @@ -448,7 +451,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {

udfsChanged, err := getChangedUserDefinedFunctions(ctx, conn.Conn, shouldUseDatabase)
if err != nil {
return err
se.throttledLogger.Errorf("error in getting changed UDFs: %v", err)
}

rec := concurrency.AllErrorRecorder{}
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,8 @@ func TestEngineReload(t *testing.T) {
}

// adding query pattern for udfs
db.AddQueryPattern("SELECT name.*", &sqltypes.Result{})
udfQueryPattern := "SELECT name.*"
db.AddQueryPattern(udfQueryPattern, &sqltypes.Result{})

// Verify the list of created, altered and dropped tables seen.
se.RegisterNotifier("test", func(full map[string]*Table, created, altered, dropped []*Table, _ bool) {
Expand All @@ -1344,6 +1345,16 @@ func TestEngineReload(t *testing.T) {
err = se.reload(context.Background(), false)
require.NoError(t, err)
require.NoError(t, db.LastError())
require.Zero(t, se.throttledLogger.GetLastLogTime())

// Now if we remove the query pattern for udfs, schema engine shouldn't fail.
// Instead we should see a log message with the error.
db.RemoveQueryPattern(udfQueryPattern)
se.UnregisterNotifier("test")
err = se.reload(context.Background(), false)
require.NoError(t, err)
// Check for the udf error being logged. The last log time should be less than a second.
require.Less(t, time.Since(se.throttledLogger.GetLastLogTime()), 1*time.Second)
}

// TestEngineReload tests the vreplication specific GetTableForPos function to ensure
Expand Down

0 comments on commit 36b02fa

Please sign in to comment.