From 2cb7903891379772b80447bb36452a711bbaaa7d Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Wed, 30 Nov 2022 16:39:37 +0100 Subject: [PATCH] Fix closing the body for HTTP requests (#11842) (#11854) * Fix closing the body for HTTP requests (#11842) One of the easy things to miss that can cause resource leaks is not closing a response body for an HTTP request. There's a linter available to make it easier to catch this, so this enables that linter and fixes all the cases reported. This is almost all test cases, except for one production code path case in the throttler. Signed-off-by: Dirkjan Bussink * Always close body for HTTP requests (#10835) Not closing the body leads to leaking goroutines for the reader / writer of the body. Found when validating that tests don't leak goroutines (to ensure other things get closed properly, but these cases where also found). Signed-off-by: Dirkjan Bussink * Fix remaining body close issues Signed-off-by: Dirkjan Bussink Signed-off-by: Dirkjan Bussink --- .golangci.yml | 4 +- go/test/endtoend/cluster/topo_process.go | 16 +++-- .../endtoend/cluster/vtctlclient_process.go | 5 +- go/test/endtoend/cluster/vtctld_process.go | 6 +- go/test/endtoend/cluster/vtgate_process.go | 12 ++-- go/test/endtoend/cluster/vttablet_process.go | 6 +- go/test/endtoend/cluster/vtworker_process.go | 7 +- go/test/endtoend/clustertest/main_test.go | 9 +-- go/test/endtoend/clustertest/vtctld_test.go | 21 +++--- go/test/endtoend/clustertest/vtgate_test.go | 69 ++++++++++--------- go/test/endtoend/clustertest/vttablet_test.go | 16 +++-- go/test/endtoend/messaging/message_test.go | 7 +- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 28 ++++---- go/test/endtoend/sharding/base_sharding.go | 3 +- .../buffer/buffer_test_helpers.go | 9 ++- go/test/endtoend/tabletgateway/vtgate_test.go | 10 +-- .../tabletmanager/primary/tablet_test.go | 35 +++++----- .../tabletmanager/tablet_health_test.go | 1 + .../tabletmanager/throttler/throttler_test.go | 43 ++++++++---- .../throttler_custom_config/throttler_test.go | 23 ++++--- go/test/endtoend/topoconncache/main_test.go | 9 +-- go/test/endtoend/vreplication/helper_test.go | 10 ++- .../vreplication/vreplication_test.go | 30 ++++---- .../endtoend/vtcombo/vttest_sample_test.go | 31 +++++---- .../restarttablet/schema_restart_test.go | 3 +- go/test/endtoend/vtorc/utils/utils.go | 5 +- go/test/endtoend/worker/worker_test.go | 17 +++-- go/vt/vtctld/api_test.go | 24 +++---- .../tabletserver/throttle/throttler.go | 1 + go/vt/workflow/long_polling_test.go | 4 +- go/vt/workflow/websocket_test.go | 3 +- test.go | 4 +- 32 files changed, 263 insertions(+), 208 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 7d2d0d65f45..4fbe33d436a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -18,19 +18,17 @@ linters: disable-all: true enable: # Defaults - - deadcode - errcheck - govet - ineffassign - - structcheck - typecheck - - varcheck - staticcheck - gosimple # Extras - gofmt - goimports + - bodyclose # revive is a replacement for golint, but we do not run it in CI for now. # This is only enabled as a post-commit hook diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index accc8de2de6..88bffd66752 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -299,10 +299,8 @@ func (topo *TopoProcess) IsHealthy() bool { if err != nil { return false } - if resp.StatusCode == 200 { - return true - } - return false + defer resp.Body.Close() + return resp.StatusCode == 200 } func (topo *TopoProcess) removeTopoDirectories(Cell string) { @@ -321,11 +319,17 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er if command == "mkdir" { req, _ := http.NewRequest("PUT", url, payload) req.Header.Add("content-type", "application/json") - _, err = http.DefaultClient.Do(req) + resp, err := http.DefaultClient.Do(req) + if err == nil { + defer resp.Body.Close() + } return err } else if command == "rmdir" { req, _ := http.NewRequest("DELETE", url+"?dir=true", payload) - _, err = http.DefaultClient.Do(req) + resp, err := http.DefaultClient.Do(req) + if err == nil { + defer resp.Body.Close() + } return err } else { return nil diff --git a/go/test/endtoend/cluster/vtctlclient_process.go b/go/test/endtoend/cluster/vtctlclient_process.go index 7004fbd97d8..aa61507d452 100644 --- a/go/test/endtoend/cluster/vtctlclient_process.go +++ b/go/test/endtoend/cluster/vtctlclient_process.go @@ -258,8 +258,5 @@ func (vtctlclient *VtctlClientProcess) InitTablet(tablet *Vttablet, cell string, // shouldRetry tells us if the command should be retried based on the results/output -- meaning that it // is likely an ephemeral or recoverable issue that is likely to succeed when retried. func shouldRetry(cmdResults string) bool { - if strings.Contains(cmdResults, "Deadlock found when trying to get lock; try restarting transaction") { - return true - } - return false + return strings.Contains(cmdResults, "Deadlock found when trying to get lock; try restarting transaction") } diff --git a/go/test/endtoend/cluster/vtctld_process.go b/go/test/endtoend/cluster/vtctld_process.go index b379a7e8dc1..074234fb97c 100644 --- a/go/test/endtoend/cluster/vtctld_process.go +++ b/go/test/endtoend/cluster/vtctld_process.go @@ -118,10 +118,8 @@ func (vtctld *VtctldProcess) IsHealthy() bool { if err != nil { return false } - if resp.StatusCode == 200 { - return true - } - return false + defer resp.Body.Close() + return resp.StatusCode == 200 } // TearDown shutdowns the running vtctld service diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index 2250bd41b66..fa0c1d54d91 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -172,10 +172,9 @@ func (vtgate *VtgateProcess) WaitForStatus() bool { if err != nil { return false } - if resp.StatusCode == 200 { - return true - } - return false + defer resp.Body.Close() + + return resp.StatusCode == 200 } // GetStatusForTabletOfShard function gets status for a specific tablet of a shard in keyspace @@ -185,6 +184,8 @@ func (vtgate *VtgateProcess) GetStatusForTabletOfShard(name string, endPointsCou if err != nil { return false } + defer resp.Body.Close() + if resp.StatusCode == 200 { resultMap := make(map[string]any) respByte, _ := io.ReadAll(resp.Body) @@ -294,6 +295,8 @@ func (vtgate *VtgateProcess) GetVars() (map[string]any, error) { if err != nil { return nil, fmt.Errorf("error getting response from %s", vtgate.VerifyURL) } + defer resp.Body.Close() + if resp.StatusCode == 200 { respByte, _ := io.ReadAll(resp.Body) err := json.Unmarshal(respByte, &resultMap) @@ -313,6 +316,7 @@ func (vtgate *VtgateProcess) ReadVSchema() (*interface{}, error) { if err != nil { return nil, err } + defer resp.Body.Close() res, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index 9f1be9255fd..46f55f579e3 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -164,9 +164,9 @@ func (vttablet *VttabletProcess) GetStatus() string { if err != nil { return "" } + defer resp.Body.Close() if resp.StatusCode == 200 { respByte, _ := io.ReadAll(resp.Body) - defer resp.Body.Close() return string(respByte) } return "" @@ -178,6 +178,8 @@ func (vttablet *VttabletProcess) GetVars() map[string]any { if err != nil { return nil } + defer resp.Body.Close() + if resp.StatusCode == 200 { resultMap := make(map[string]any) respByte, _ := io.ReadAll(resp.Body) @@ -196,6 +198,8 @@ func (vttablet *VttabletProcess) GetStatusDetails() string { if err != nil { return fmt.Sprintf("Status details failed: %v", err.Error()) } + defer resp.Body.Close() + respByte, _ := io.ReadAll(resp.Body) return string(respByte) } diff --git a/go/test/endtoend/cluster/vtworker_process.go b/go/test/endtoend/cluster/vtworker_process.go index bbd41e4b8b6..e20c277aeb6 100644 --- a/go/test/endtoend/cluster/vtworker_process.go +++ b/go/test/endtoend/cluster/vtworker_process.go @@ -114,10 +114,8 @@ func (vtworker *VtworkerProcess) IsHealthy() bool { if err != nil { return false } - if resp.StatusCode == 200 { - return true - } - return false + defer resp.Body.Close() + return resp.StatusCode == 200 } // TearDown shutdowns the running vtworker process @@ -221,6 +219,7 @@ func (vtworker *VtworkerProcess) GetVars() (map[string]any, error) { if err != nil { return nil, fmt.Errorf("error getting response from %s", vtworker.VerifyURL) } + defer resp.Body.Close() if resp.StatusCode == 200 { respByte, _ := io.ReadAll(resp.Body) err := json.Unmarshal(respByte, &resultMap) diff --git a/go/test/endtoend/clustertest/main_test.go b/go/test/endtoend/clustertest/main_test.go index 0be66d56af8..35da40a3edb 100644 --- a/go/test/endtoend/clustertest/main_test.go +++ b/go/test/endtoend/clustertest/main_test.go @@ -107,9 +107,10 @@ func testURL(t *testing.T, url string, testCaseName string) { // getStatusForUrl returns the status code for the URL func getStatusForURL(url string) int { - resp, _ := http.Get(url) - if resp != nil { - return resp.StatusCode + resp, err := http.Get(url) + if err != nil { + return 0 } - return 0 + defer resp.Body.Close() + return resp.StatusCode } diff --git a/go/test/endtoend/clustertest/vtctld_test.go b/go/test/endtoend/clustertest/vtctld_test.go index 36fcb51d97d..7533ce4a24e 100644 --- a/go/test/endtoend/clustertest/vtctld_test.go +++ b/go/test/endtoend/clustertest/vtctld_test.go @@ -62,13 +62,15 @@ func TestVtctldProcess(t *testing.T) { func testTopoDataAPI(t *testing.T, url string) { resp, err := http.Get(url) - require.Nil(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, resp.StatusCode, 200) resultMap := make(map[string]any) - respByte, _ := io.ReadAll(resp.Body) + respByte, err := io.ReadAll(resp.Body) + require.NoError(t, err) err = json.Unmarshal(respByte, &resultMap) - require.Nil(t, err) + require.NoError(t, err) errorValue := reflect.ValueOf(resultMap["Error"]) assert.Empty(t, errorValue.String()) @@ -83,7 +85,7 @@ func testTopoDataAPI(t *testing.T, url string) { func testListAllTablets(t *testing.T) { // first w/o any filters, aside from cell result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets", clusterInstance.Cell) - require.Nil(t, err) + require.NoError(t, err) tablets := getAllTablets() @@ -104,7 +106,7 @@ func testListAllTablets(t *testing.T) { "ListAllTablets", "--", "--keyspace", clusterInstance.Keyspaces[0].Name, "--tablet_type", "primary", clusterInstance.Cell) - require.Nil(t, err) + require.NoError(t, err) // We should only return a single primary tablet per shard in the first keyspace tabletsFromCMD = strings.Split(result, "\n") @@ -115,9 +117,10 @@ func testListAllTablets(t *testing.T) { func testTabletStatus(t *testing.T) { resp, err := http.Get(fmt.Sprintf("http://%s:%d", clusterInstance.Hostname, clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].HTTPPort)) - require.Nil(t, err) + require.NoError(t, err) + defer resp.Body.Close() respByte, err := io.ReadAll(resp.Body) - require.Nil(t, err) + require.NoError(t, err) result := string(respByte) log.Infof("Tablet status response: %v", result) assert.True(t, strings.Contains(result, `Alias: 400) } else { @@ -234,11 +235,11 @@ func checkHealth(t *testing.T, port int, shouldError bool) { func checkTabletType(t *testing.T, tabletAlias string, typeWant string) { result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", tabletAlias) - require.Nil(t, err) + require.NoError(t, err) var tablet topodatapb.Tablet err = json2.Unmarshal([]byte(result), &tablet) - require.Nil(t, err) + require.NoError(t, err) actualType := tablet.GetType() got := fmt.Sprintf("%d", actualType) diff --git a/go/test/endtoend/tabletmanager/tablet_health_test.go b/go/test/endtoend/tabletmanager/tablet_health_test.go index dc83eb58b66..5261c417fd1 100644 --- a/go/test/endtoend/tabletmanager/tablet_health_test.go +++ b/go/test/endtoend/tabletmanager/tablet_health_test.go @@ -202,6 +202,7 @@ func checkHealth(t *testing.T, port int, shouldError bool) { url := fmt.Sprintf("http://localhost:%d/healthz", port) resp, err := http.Get(url) require.NoError(t, err) + defer resp.Body.Close() if shouldError { assert.True(t, resp.StatusCode > 400) } else { diff --git a/go/test/endtoend/tabletmanager/throttler/throttler_test.go b/go/test/endtoend/tabletmanager/throttler/throttler_test.go index 4b03e624d18..d87f4d3abaa 100644 --- a/go/test/endtoend/tabletmanager/throttler/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler/throttler_test.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var ( @@ -162,7 +163,8 @@ func TestThrottlerBeforeMetricsCollected(t *testing.T) { // {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"} { resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusNotFound, resp.StatusCode) } } @@ -171,8 +173,9 @@ func warmUpHeartbeat(t *testing.T) (respStatus int) { // because we run with -heartbeat_on_demand_duration=5s, the heartbeat is "cold" right now. // Let's warm it up. resp, err := throttleCheck(primaryTablet) + require.NoError(t, err) + defer resp.Body.Close() time.Sleep(time.Second) - assert.NoError(t, err) return resp.StatusCode } @@ -188,23 +191,27 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { time.Sleep(time.Second) { resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } { resp, body, err := throttledApps(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Contains(t, body, "always-throttled-app") } { resp, err := throttleCheckSelf(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } { resp, err := throttleCheckSelf(replicaTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } } @@ -221,18 +228,21 @@ func TestLag(t *testing.T) { // {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"} { resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) } { resp, err := throttleCheckSelf(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() // self (on primary) is unaffected by replication lag assert.Equal(t, http.StatusOK, resp.StatusCode) } { resp, err := throttleCheckSelf(replicaTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) } } @@ -248,17 +258,20 @@ func TestLag(t *testing.T) { time.Sleep(time.Second) { resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } { resp, err := throttleCheckSelf(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } { resp, err := throttleCheckSelf(replicaTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } } @@ -276,7 +289,8 @@ func TestNoReplicas(t *testing.T) { respStatus := warmUpHeartbeat(t) assert.Equal(t, http.StatusOK, respStatus) resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } { @@ -288,7 +302,8 @@ func TestNoReplicas(t *testing.T) { respStatus := warmUpHeartbeat(t) assert.NotEqual(t, http.StatusOK, respStatus) resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } } diff --git a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go index e1ac38c5d6f..3c21c4d9c5e 100644 --- a/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_custom_config/throttler_test.go @@ -160,7 +160,8 @@ func TestThrottlerThresholdOK(t *testing.T) { { resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } } @@ -173,12 +174,14 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { // {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""} { resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } { resp, err := throttleCheckSelf(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } } @@ -196,12 +199,14 @@ func TestThreadsRunning(t *testing.T) { // {"StatusCode":429,"Value":2,"Threshold":2,"Message":"Threshold exceeded"} { resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) } { resp, err := throttleCheckSelf(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode) } }) @@ -210,12 +215,14 @@ func TestThreadsRunning(t *testing.T) { // Restore { resp, err := throttleCheck(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } { resp, err := throttleCheckSelf(primaryTablet) - assert.NoError(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) } }) @@ -226,7 +233,7 @@ func vtgateExec(t *testing.T, query string, expectError string) *sqltypes.Result ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) - require.Nil(t, err) + require.NoError(t, err) defer conn.Close() qr, err := conn.ExecuteFetch(query, 1000, true) diff --git a/go/test/endtoend/topoconncache/main_test.go b/go/test/endtoend/topoconncache/main_test.go index 4cdb1ec2398..c8d71027bcf 100644 --- a/go/test/endtoend/topoconncache/main_test.go +++ b/go/test/endtoend/topoconncache/main_test.go @@ -239,9 +239,10 @@ func testURL(t *testing.T, url string, testCaseName string) { // getStatusForUrl returns the status code for the URL func getStatusForURL(url string) int { - resp, _ := http.Get(url) - if resp != nil { - return resp.StatusCode + resp, err := http.Get(url) + if err != nil { + return 0 } - return 0 + defer resp.Body.Close() + return resp.StatusCode } diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 264c748a5bb..5d37a7ff2c2 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -97,10 +97,8 @@ func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query stri func checkHealth(t *testing.T, url string) bool { resp, err := http.Get(url) require.NoError(t, err) - if err != nil || resp.StatusCode != 200 { - return false - } - return true + defer resp.Body.Close() + return resp.StatusCode == 200 } func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query string, want string) { @@ -129,9 +127,9 @@ func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess timer := time.NewTimer(defaultTimeout) defer timer.Stop() for { - _, output, err := throttlerCheckSelf(tablet, appName) + output, err := throttlerCheckSelf(tablet, appName) require.NoError(t, err) - require.NotNil(t, output) + gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode") require.NoError(t, err) if wantCode == gotCode { diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index a748ecede57..bcdda81ac8a 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -81,34 +81,36 @@ func init() { defaultReplicas = 1 } -func throttleResponse(tablet *cluster.VttabletProcess, path string) (resp *http.Response, respBody string, err error) { +func throttleResponse(tablet *cluster.VttabletProcess, path string) (respBody string, err error) { apiURL := fmt.Sprintf("http://%s:%d/%s", tablet.TabletHostname, tablet.Port, path) - resp, err = httpClient.Get(apiURL) + resp, err := httpClient.Get(apiURL) if err != nil { - return resp, respBody, err + return "", err } + defer resp.Body.Close() b, err := io.ReadAll(resp.Body) respBody = string(b) - return resp, respBody, err + return respBody, err } -func throttleApp(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) { +func throttleApp(tablet *cluster.VttabletProcess, app string) (string, error) { return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app)) } -func unthrottleApp(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) { +func unthrottleApp(tablet *cluster.VttabletProcess, app string) (string, error) { return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app)) } -func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (resp *http.Response, respBody string, err error) { +func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (respBody string, err error) { apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, app) - resp, err = httpClient.Get(apiURL) + resp, err := httpClient.Get(apiURL) if err != nil { - return resp, respBody, err + return "", err } + defer resp.Body.Close() b, err := io.ReadAll(resp.Body) respBody = string(b) - return resp, respBody, err + return respBody, err } func TestVreplicationCopyThrottling(t *testing.T) { @@ -882,7 +884,7 @@ func materializeProduct(t *testing.T) { t.Run("throttle-app-product", func(t *testing.T) { // Now, throttle the streamer on source tablets, insert some rows for _, tab := range productTablets { - _, body, err := throttleApp(tab, sourceThrottlerAppName) + body, err := throttleApp(tab, sourceThrottlerAppName) assert.NoError(t, err) assert.Contains(t, body, sourceThrottlerAppName) @@ -900,7 +902,7 @@ func materializeProduct(t *testing.T) { t.Run("unthrottle-app-product", func(t *testing.T) { // unthrottle on source tablets, and expect the rows to show up for _, tab := range productTablets { - _, body, err := unthrottleApp(tab, sourceThrottlerAppName) + body, err := unthrottleApp(tab, sourceThrottlerAppName) assert.NoError(t, err) assert.Contains(t, body, sourceThrottlerAppName) // give time for unthrottling to take effect and for target to fetch data @@ -915,7 +917,7 @@ func materializeProduct(t *testing.T) { // Now, throttle vreplication (vcopier/vapplier) on target tablets, and // insert some more rows. for _, tab := range customerTablets { - _, body, err := throttleApp(tab, targetThrottlerAppName) + body, err := throttleApp(tab, targetThrottlerAppName) assert.NoError(t, err) assert.Contains(t, body, targetThrottlerAppName) // Wait for throttling to take effect (caching will expire by this time): @@ -933,7 +935,7 @@ func materializeProduct(t *testing.T) { t.Run("unthrottle-app-customer", func(t *testing.T) { // unthrottle on target tablets, and expect the rows to show up for _, tab := range customerTablets { - _, body, err := unthrottleApp(tab, targetThrottlerAppName) + body, err := unthrottleApp(tab, targetThrottlerAppName) assert.NoError(t, err) assert.Contains(t, body, targetThrottlerAppName) } diff --git a/go/test/endtoend/vtcombo/vttest_sample_test.go b/go/test/endtoend/vtcombo/vttest_sample_test.go index 8e359b7849a..4ff578530f7 100644 --- a/go/test/endtoend/vtcombo/vttest_sample_test.go +++ b/go/test/endtoend/vtcombo/vttest_sample_test.go @@ -30,7 +30,7 @@ import ( "strings" "testing" - mysql "github.com/go-sql-driver/mysql" + "github.com/go-sql-driver/mysql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -123,12 +123,13 @@ func TestMain(m *testing.M) { func TestStandalone(t *testing.T) { // validate debug vars resp, err := http.Get(fmt.Sprintf("http://%s/debug/vars", vtctldAddr)) - require.Nil(t, err) + require.NoError(t, err) + defer resp.Body.Close() require.Equal(t, 200, resp.StatusCode) resultMap := make(map[string]any) respByte, _ := io.ReadAll(resp.Body) err = json.Unmarshal(respByte, &resultMap) - require.Nil(t, err) + require.NoError(t, err) cmd := resultMap["cmdline"] require.NotNil(t, cmd, "cmdline is not available in debug vars") tmp, _ := cmd.([]any) @@ -136,7 +137,7 @@ func TestStandalone(t *testing.T) { ctx := context.Background() conn, err := vtgateconn.Dial(ctx, grpcAddress) - require.Nil(t, err) + require.NoError(t, err) defer conn.Close() cfg := mysql.NewConfig() @@ -155,9 +156,9 @@ func TestStandalone(t *testing.T) { assertTabletsPresent(t) err = localCluster.TearDown() - require.Nil(t, err) + require.NoError(t, err) err = localCluster.Setup() - require.Nil(t, err) + require.NoError(t, err) assertInsertedRowsExist(ctx, t, conn, idStart, rowCount) assertTabletsPresent(t) @@ -169,7 +170,7 @@ func assertInsertedRowsExist(ctx context.Context, t *testing.T, conn *vtgateconn "id_start": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(idStart), 10))}, } res, err := cur.Execute(ctx, "select * from test_table where id >= :id_start", bindVariables) - require.Nil(t, err) + require.NoError(t, err) assert.Equal(t, rowCount, len(res.Rows)) @@ -178,7 +179,7 @@ func assertInsertedRowsExist(ctx context.Context, t *testing.T, conn *vtgateconn "id_start": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(idStart), 10))}, } res, err = cur.Execute(ctx, "select * from test_table where id = :id_start", bindVariables) - require.Nil(t, err) + require.NoError(t, err) require.Equal(t, 1, len(res.Rows)) assert.Equal(t, "VARCHAR(\"test1000\")", res.Rows[0][1].String()) } @@ -199,7 +200,7 @@ func assertRouting(ctx context.Context, t *testing.T, db *sql.DB) { func assertCanInsertRow(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn) { cur := conn.Session(ks1+":80-@primary", nil) _, err := cur.Execute(ctx, "begin", nil) - require.Nil(t, err) + require.NoError(t, err) i := 0x810000000000000 bindVariables := map[string]*querypb.BindVariable{ @@ -209,10 +210,10 @@ func assertCanInsertRow(ctx context.Context, t *testing.T, conn *vtgateconn.VTGa } query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)" _, err = cur.Execute(ctx, query, bindVariables) - require.Nil(t, err) + require.NoError(t, err) _, err = cur.Execute(ctx, "commit", nil) - require.Nil(t, err) + require.NoError(t, err) } func insertManyRows(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateConn, idStart, rowCount int) { @@ -220,7 +221,7 @@ func insertManyRows(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateCo query := "insert into test_table (id, msg, keyspace_id) values (:id, :msg, :keyspace_id)" _, err := cur.Execute(ctx, "begin", nil) - require.Nil(t, err) + require.NoError(t, err) for i := idStart; i < idStart+rowCount; i++ { bindVariables := map[string]*querypb.BindVariable{ @@ -229,11 +230,11 @@ func insertManyRows(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateCo "keyspace_id": {Type: querypb.Type_UINT64, Value: []byte(strconv.FormatInt(int64(i), 10))}, } _, err = cur.Execute(ctx, query, bindVariables) - require.Nil(t, err) + require.NoError(t, err) } _, err = cur.Execute(ctx, "commit", nil) - require.Nil(t, err) + require.NoError(t, err) } func assertTabletsPresent(t *testing.T) { @@ -242,7 +243,7 @@ func assertTabletsPresent(t *testing.T) { log.Infof("Running vtctlclient with command: %v", tmpCmd.Args) output, err := tmpCmd.CombinedOutput() - require.Nil(t, err) + require.NoError(t, err) numPrimary, numReplica, numRdonly, numDash80, num80Dash, numRouted := 0, 0, 0, 0, 0, 0 lines := strings.Split(string(output), "\n") diff --git a/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go b/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go index 9d3de02f846..6990aa01f88 100644 --- a/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go +++ b/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go @@ -158,7 +158,8 @@ func TestVSchemaTrackerKeyspaceReInit(t *testing.T) { func readVSchema(t *testing.T, vtgate *cluster.VtgateProcess, results *any) { httpClient := &http.Client{Timeout: 5 * time.Second} resp, err := httpClient.Get(vtgate.VSchemaURL) - require.Nil(t, err) + require.NoError(t, err) + defer resp.Body.Close() assert.Equal(t, 200, resp.StatusCode) json.NewDecoder(resp.Body).Decode(results) } diff --git a/go/test/endtoend/vtorc/utils/utils.go b/go/test/endtoend/vtorc/utils/utils.go index 9d83636d825..b6020918b6a 100644 --- a/go/test/endtoend/vtorc/utils/utils.go +++ b/go/test/endtoend/vtorc/utils/utils.go @@ -726,6 +726,7 @@ func MakeAPICall(t *testing.T, url string) (status int, response string) { t.Helper() res, err := http.Get(url) require.NoError(t, err) + defer res.Body.Close() bodyBytes, err := ioutil.ReadAll(res.Body) require.NoError(t, err) body := string(bodyBytes) @@ -799,7 +800,7 @@ func SetupNewClusterSemiSync(t *testing.T) *VtOrcClusterInfo { vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory) - out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, fmt.Sprintf("--durability-policy=semi_sync")) + out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") require.NoError(t, err, out) // create topo server connection @@ -877,7 +878,7 @@ func AddSemiSyncKeyspace(t *testing.T, clusterInfo *VtOrcClusterInfo) { } vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInfo.ClusterInstance.VtctldProcess.GrpcPort, clusterInfo.ClusterInstance.TmpDirectory) - out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceSemiSyncName, fmt.Sprintf("--durability-policy=semi_sync")) + out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceSemiSyncName, "--durability-policy=semi_sync") require.NoError(t, err, out) } diff --git a/go/test/endtoend/worker/worker_test.go b/go/test/endtoend/worker/worker_test.go index cb4dc6aa79e..4d9bed8c756 100644 --- a/go/test/endtoend/worker/worker_test.go +++ b/go/test/endtoend/worker/worker_test.go @@ -125,6 +125,9 @@ func TestWebInterface(t *testing.T) { startTime := time.Now() for { resp, err := http.Get(baseURL + "/status") + if err == nil { + resp.Body.Close() + } if err != nil && !time.Now().After(startTime.Add(10*time.Second)) { time.Sleep(10 * time.Millisecond) continue @@ -142,30 +145,34 @@ func TestWebInterface(t *testing.T) { return http.ErrUseLastResponse } resp, err := http.Post(baseURL+"/Debugging/Ping", "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) - assert.Nil(t, err) + require.NoError(t, err) + resp.Body.Close() assert.Equal(t, 307, resp.StatusCode) // Wait for the Ping command to finish. pollForVars(t, "done") // Verify that the command logged something and it's available at /status. resp, err = http.Get(baseURL + "/status") - assert.Nil(t, err) + require.NoError(t, err) if resp.StatusCode == 200 { respByte, _ := io.ReadAll(resp.Body) respStr := string(respByte) assert.Contains(t, respStr, "Ping command was called with message: 'pong'", fmt.Sprintf("Command did not log output to /status: %s", respStr)) } + resp.Body.Close() // Reset the job. - _, err = http.Get(baseURL + "/reset") - assert.Nil(t, err) + resp, err = http.Get(baseURL + "/reset") + require.NoError(t, err) + resp.Body.Close() resp, err = http.Get(baseURL + "/status") - assert.Nil(t, err) + require.NoError(t, err) if resp.StatusCode == 200 { respByte, _ := io.ReadAll(resp.Body) statusAfterReset := string(respByte) assert.Contains(t, statusAfterReset, "This worker is idle.", "/status does not indicate that the reset was successful") } + resp.Body.Close() i++ } diff --git a/go/vt/vtctld/api_test.go b/go/vt/vtctld/api_test.go index 904367c99df..f4265b5c088 100644 --- a/go/vt/vtctld/api_test.go +++ b/go/vt/vtctld/api_test.go @@ -26,6 +26,8 @@ import ( "strings" "testing" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/wrangler" @@ -464,29 +466,19 @@ func TestAPI(t *testing.T) { switch in.method { case "GET": resp, err = http.Get(server.URL + apiPrefix + in.path) + require.NoError(t, err) + defer resp.Body.Close() case "POST": resp, err = http.Post(server.URL+apiPrefix+in.path, "application/json", strings.NewReader(in.body)) + require.NoError(t, err) + defer resp.Body.Close() default: t.Fatalf("[%v] unknown method: %v", in.path, in.method) - return - } - - if err != nil { - t.Fatalf("[%v] http error: %v", in.path, err) - return } body, err := io.ReadAll(resp.Body) - resp.Body.Close() - - if err != nil { - t.Fatalf("[%v] io.ReadAll(resp.Body) error: %v", in.path, err) - return - } - - if resp.StatusCode != in.statusCode { - t.Fatalf("[%v] got unexpected status code %d, want %d", in.path, resp.StatusCode, in.statusCode) - } + require.NoError(t, err) + require.Equal(t, in.statusCode, resp.StatusCode) got := compactJSON(body) want := compactJSON([]byte(in.want)) diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 4c75e2fa5af..a1a33a2d493 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -491,6 +491,7 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, mySQLThrottleMetric.Err = err return mySQLThrottleMetric } + defer resp.Body.Close() b, err := io.ReadAll(resp.Body) if err != nil { mySQLThrottleMetric.Err = err diff --git a/go/vt/workflow/long_polling_test.go b/go/vt/workflow/long_polling_test.go index 85dbabd3f28..a1705dd2346 100644 --- a/go/vt/workflow/long_polling_test.go +++ b/go/vt/workflow/long_polling_test.go @@ -94,9 +94,11 @@ func TestLongPolling(t *testing.T) { u.Path = "/workflow/action/1" message := `{"path":"/uuid1","name":"button1"}` buf := bytes.NewReader([]byte(message)) - if _, err := http.Post(u.String(), "application/json; charset=utf-8", buf); err != nil { + pResp, err := http.Post(u.String(), "application/json; charset=utf-8", buf) + if err != nil { t.Fatalf("/action/1 post failed: %v", err) } + pResp.Body.Close() for timeout := 0; ; timeout++ { // This is an asynchronous action, need to take the lock. tw.mu.Lock() diff --git a/go/vt/workflow/websocket_test.go b/go/vt/workflow/websocket_test.go index e47b730e9ad..4a08422f532 100644 --- a/go/vt/workflow/websocket_test.go +++ b/go/vt/workflow/websocket_test.go @@ -46,10 +46,11 @@ func TestWebSocket(t *testing.T) { // Start a client websocket. u := url.URL{Scheme: "ws", Host: listener.Addr().String(), Path: "/workflow"} - c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + c, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { t.Fatalf("WebSocket dial failed: %v", err) } + defer resp.Body.Close() // Read the original full dump. _, tree, err := c.ReadMessage() diff --git a/test.go b/test.go index 5a4f9ea30db..aabe8fab363 100755 --- a/test.go +++ b/test.go @@ -619,9 +619,11 @@ type TestStats struct { func sendStats(values url.Values) { if *remoteStats != "" { log.Printf("Sending remote stats to %v", *remoteStats) - if _, err := http.PostForm(*remoteStats, values); err != nil { + resp, err := http.PostForm(*remoteStats, values) + if err != nil { log.Printf("Can't send remote stats: %v", err) } + defer resp.Body.Close() } }