Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully shutdown VTGate instances #14219

Merged
merged 4 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,15 @@ func (c *Conn) IsMarkedForClose() bool {
return c.closing
}

func (c *Conn) IsShuttingDown() bool {
return c.listener.shutdown.Load()
}

// GetTestConn returns a conn for testing purpose only.
func GetTestConn() *Conn {
return newConn(testConn{})
}

func GetTestServerConn(listener *Listener) *Conn {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the function already says Test but maybe it is a good idea to add a comment something like -

// GetTestServerConn is only meant to be used for testing. It creates a server connection using a testConn and the provided listener.

And I would move this function to the conn_fake.go file. That files defines the testConn struct too and it contains all the test code we keep in the package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GuptaManan100 Thanks! I moved this and the other GetTestConn() function to conn_fake.go.

return newServerConn(testConn{}, listener)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this function (and GetTestConn above it) be moved to conn_flaky_test.go instead? So it's in a proper _test.go file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to move it but it's not able find it then. Could it be that functions in _test.go are not exported?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes functions in test files can only be used locally. They aren't exported.

From the Internet -

Files matching *_test.go are only compiled when testing the package they are part of. If you are testing package A that uses package B, you won't have access to the _test.go code from package B.

11 changes: 8 additions & 3 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ func startSpan(ctx context.Context, query, label string) (trace.Span, context.Co
}

func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
session := vh.session(c)
if c.IsShuttingDown() && !session.InTransaction {
c.MarkForClose()
return sqlerror.NewSQLError(sqlerror.ERServerShutdown, sqlerror.SSNetError, "Server shutdown in progress")
}

ctx, cancel := context.WithCancel(context.Background())
c.UpdateCancelCtx(cancel)

Expand Down Expand Up @@ -229,7 +235,6 @@ func (vh *vtgateHandler) ComQuery(c *mysql.Conn, query string, callback func(*sq
"VTGate MySQL Connector" /* subcomponent: part of the client */)
ctx = callerid.NewContext(ctx, ef, im)

session := vh.session(c)
if !session.InTransaction {
vh.busyConnections.Add(1)
}
Expand Down Expand Up @@ -614,11 +619,11 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys

func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
if srv.tcpListener != nil {
srv.tcpListener.Close()
srv.tcpListener.Shutdown()
srv.tcpListener = nil
}
if srv.unixListener != nil {
srv.unixListener.Close()
srv.unixListener.Shutdown()
srv.unixListener = nil
}
if srv.sigChan != nil {
Expand Down
77 changes: 77 additions & 0 deletions go/vt/vtgate/plugin_mysql_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,80 @@ func TestKillMethods(t *testing.T) {
require.EqualError(t, cancelCtx.Err(), "context canceled")
require.True(t, mysqlConn.IsMarkedForClose())
}

func TestGracefulShutdown(t *testing.T) {
executor, _, _, _, _ := createExecutorEnv(t)

vh := newVtgateHandler(&VTGate{executor: executor, timings: timings, rowsReturned: rowsReturned, rowsAffected: rowsAffected})
th := &testHandler{}
listener, err := mysql.NewListener("tcp", "127.0.0.1:", mysql.NewAuthServerNone(), th, 0, 0, false, false, 0)
require.NoError(t, err)
defer listener.Close()

// add a connection
mysqlConn := mysql.GetTestServerConn(listener)
mysqlConn.ConnectionID = 1
mysqlConn.UserData = &mysql.StaticUserData{}
vh.connections[1] = mysqlConn

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

listener.Shutdown()

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
require.EqualError(t, err, "Server shutdown in progress (errno 1053) (sqlstate 08S01)")

require.True(t, mysqlConn.IsMarkedForClose())
}

func TestGracefulShutdownWithTransaction(t *testing.T) {
executor, _, _, _, _ := createExecutorEnv(t)

vh := newVtgateHandler(&VTGate{executor: executor, timings: timings, rowsReturned: rowsReturned, rowsAffected: rowsAffected})
th := &testHandler{}
listener, err := mysql.NewListener("tcp", "127.0.0.1:", mysql.NewAuthServerNone(), th, 0, 0, false, false, 0)
require.NoError(t, err)
defer listener.Close()

// add a connection
mysqlConn := mysql.GetTestServerConn(listener)
mysqlConn.ConnectionID = 1
mysqlConn.UserData = &mysql.StaticUserData{}
vh.connections[1] = mysqlConn

err = vh.ComQuery(mysqlConn, "BEGIN", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

listener.Shutdown()

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

err = vh.ComQuery(mysqlConn, "COMMIT", func(result *sqltypes.Result) error {
return nil
})
assert.NoError(t, err)

require.False(t, mysqlConn.IsMarkedForClose())

err = vh.ComQuery(mysqlConn, "select 1", func(result *sqltypes.Result) error {
return nil
})
require.EqualError(t, err, "Server shutdown in progress (errno 1053) (sqlstate 08S01)")

require.True(t, mysqlConn.IsMarkedForClose())
}