Skip to content

Commit

Permalink
Fix closing the body for HTTP requests (vitessio#11842) (vitessio#11854
Browse files Browse the repository at this point in the history
)

* Fix closing the body for HTTP requests (vitessio#11842)

One of the easy things to miss that can cause resource leaks is not
closing a response body for an HTTP request.

There's a linter available to make it easier to catch this, so this
enables that linter and fixes all the cases reported.

This is almost all test cases, except for one production code path case
in the throttler.

Signed-off-by: Dirkjan Bussink <[email protected]>

* Always close body for HTTP requests (vitessio#10835)

Not closing the body leads to leaking goroutines for the reader / writer
of the body.

Found when validating that tests don't leak goroutines (to ensure other
things get closed properly, but these cases where also found).

Signed-off-by: Dirkjan Bussink <[email protected]>

* Fix remaining body close issues

Signed-off-by: Dirkjan Bussink <[email protected]>

Signed-off-by: Dirkjan Bussink <[email protected]>
  • Loading branch information
dbussink authored Nov 30, 2022
1 parent 3b19639 commit 2cb7903
Show file tree
Hide file tree
Showing 32 changed files with 263 additions and 208 deletions.
4 changes: 1 addition & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@ linters:
disable-all: true
enable:
# Defaults
- deadcode
- errcheck
- govet
- ineffassign
- structcheck
- typecheck
- varcheck
- staticcheck
- gosimple

# Extras
- gofmt
- goimports
- bodyclose

# revive is a replacement for golint, but we do not run it in CI for now.
# This is only enabled as a post-commit hook
Expand Down
16 changes: 10 additions & 6 deletions go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,8 @@ func (topo *TopoProcess) IsHealthy() bool {
if err != nil {
return false
}
if resp.StatusCode == 200 {
return true
}
return false
defer resp.Body.Close()
return resp.StatusCode == 200
}

func (topo *TopoProcess) removeTopoDirectories(Cell string) {
Expand All @@ -321,11 +319,17 @@ func (topo *TopoProcess) ManageTopoDir(command string, directory string) (err er
if command == "mkdir" {
req, _ := http.NewRequest("PUT", url, payload)
req.Header.Add("content-type", "application/json")
_, err = http.DefaultClient.Do(req)
resp, err := http.DefaultClient.Do(req)
if err == nil {
defer resp.Body.Close()
}
return err
} else if command == "rmdir" {
req, _ := http.NewRequest("DELETE", url+"?dir=true", payload)
_, err = http.DefaultClient.Do(req)
resp, err := http.DefaultClient.Do(req)
if err == nil {
defer resp.Body.Close()
}
return err
} else {
return nil
Expand Down
5 changes: 1 addition & 4 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,5 @@ func (vtctlclient *VtctlClientProcess) InitTablet(tablet *Vttablet, cell string,
// shouldRetry tells us if the command should be retried based on the results/output -- meaning that it
// is likely an ephemeral or recoverable issue that is likely to succeed when retried.
func shouldRetry(cmdResults string) bool {
if strings.Contains(cmdResults, "Deadlock found when trying to get lock; try restarting transaction") {
return true
}
return false
return strings.Contains(cmdResults, "Deadlock found when trying to get lock; try restarting transaction")
}
6 changes: 2 additions & 4 deletions go/test/endtoend/cluster/vtctld_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,8 @@ func (vtctld *VtctldProcess) IsHealthy() bool {
if err != nil {
return false
}
if resp.StatusCode == 200 {
return true
}
return false
defer resp.Body.Close()
return resp.StatusCode == 200
}

// TearDown shutdowns the running vtctld service
Expand Down
12 changes: 8 additions & 4 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,9 @@ func (vtgate *VtgateProcess) WaitForStatus() bool {
if err != nil {
return false
}
if resp.StatusCode == 200 {
return true
}
return false
defer resp.Body.Close()

return resp.StatusCode == 200
}

// GetStatusForTabletOfShard function gets status for a specific tablet of a shard in keyspace
Expand All @@ -185,6 +184,8 @@ func (vtgate *VtgateProcess) GetStatusForTabletOfShard(name string, endPointsCou
if err != nil {
return false
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
resultMap := make(map[string]any)
respByte, _ := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -294,6 +295,8 @@ func (vtgate *VtgateProcess) GetVars() (map[string]any, error) {
if err != nil {
return nil, fmt.Errorf("error getting response from %s", vtgate.VerifyURL)
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
respByte, _ := io.ReadAll(resp.Body)
err := json.Unmarshal(respByte, &resultMap)
Expand All @@ -313,6 +316,7 @@ func (vtgate *VtgateProcess) ReadVSchema() (*interface{}, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()
res, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
Expand Down
6 changes: 5 additions & 1 deletion go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ func (vttablet *VttabletProcess) GetStatus() string {
if err != nil {
return ""
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
respByte, _ := io.ReadAll(resp.Body)
defer resp.Body.Close()
return string(respByte)
}
return ""
Expand All @@ -178,6 +178,8 @@ func (vttablet *VttabletProcess) GetVars() map[string]any {
if err != nil {
return nil
}
defer resp.Body.Close()

if resp.StatusCode == 200 {
resultMap := make(map[string]any)
respByte, _ := io.ReadAll(resp.Body)
Expand All @@ -196,6 +198,8 @@ func (vttablet *VttabletProcess) GetStatusDetails() string {
if err != nil {
return fmt.Sprintf("Status details failed: %v", err.Error())
}
defer resp.Body.Close()

respByte, _ := io.ReadAll(resp.Body)
return string(respByte)
}
Expand Down
7 changes: 3 additions & 4 deletions go/test/endtoend/cluster/vtworker_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,8 @@ func (vtworker *VtworkerProcess) IsHealthy() bool {
if err != nil {
return false
}
if resp.StatusCode == 200 {
return true
}
return false
defer resp.Body.Close()
return resp.StatusCode == 200
}

// TearDown shutdowns the running vtworker process
Expand Down Expand Up @@ -221,6 +219,7 @@ func (vtworker *VtworkerProcess) GetVars() (map[string]any, error) {
if err != nil {
return nil, fmt.Errorf("error getting response from %s", vtworker.VerifyURL)
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
respByte, _ := io.ReadAll(resp.Body)
err := json.Unmarshal(respByte, &resultMap)
Expand Down
9 changes: 5 additions & 4 deletions go/test/endtoend/clustertest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ func testURL(t *testing.T, url string, testCaseName string) {

// getStatusForUrl returns the status code for the URL
func getStatusForURL(url string) int {
resp, _ := http.Get(url)
if resp != nil {
return resp.StatusCode
resp, err := http.Get(url)
if err != nil {
return 0
}
return 0
defer resp.Body.Close()
return resp.StatusCode
}
21 changes: 12 additions & 9 deletions go/test/endtoend/clustertest/vtctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ func TestVtctldProcess(t *testing.T) {

func testTopoDataAPI(t *testing.T, url string) {
resp, err := http.Get(url)
require.Nil(t, err)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, resp.StatusCode, 200)

resultMap := make(map[string]any)
respByte, _ := io.ReadAll(resp.Body)
respByte, err := io.ReadAll(resp.Body)
require.NoError(t, err)
err = json.Unmarshal(respByte, &resultMap)
require.Nil(t, err)
require.NoError(t, err)

errorValue := reflect.ValueOf(resultMap["Error"])
assert.Empty(t, errorValue.String())
Expand All @@ -83,7 +85,7 @@ func testTopoDataAPI(t *testing.T, url string) {
func testListAllTablets(t *testing.T) {
// first w/o any filters, aside from cell
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets", clusterInstance.Cell)
require.Nil(t, err)
require.NoError(t, err)

tablets := getAllTablets()

Expand All @@ -104,7 +106,7 @@ func testListAllTablets(t *testing.T) {
"ListAllTablets", "--", "--keyspace", clusterInstance.Keyspaces[0].Name,
"--tablet_type", "primary",
clusterInstance.Cell)
require.Nil(t, err)
require.NoError(t, err)

// We should only return a single primary tablet per shard in the first keyspace
tabletsFromCMD = strings.Split(result, "\n")
Expand All @@ -115,9 +117,10 @@ func testListAllTablets(t *testing.T) {

func testTabletStatus(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("http://%s:%d", clusterInstance.Hostname, clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].HTTPPort))
require.Nil(t, err)
require.NoError(t, err)
defer resp.Body.Close()
respByte, err := io.ReadAll(resp.Body)
require.Nil(t, err)
require.NoError(t, err)
result := string(respByte)
log.Infof("Tablet status response: %v", result)
assert.True(t, strings.Contains(result, `Alias: <a href="http://localhost:`))
Expand All @@ -126,13 +129,13 @@ func testTabletStatus(t *testing.T) {

func testExecuteAsDba(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
}

func testExecuteAsApp(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
}

Expand Down
69 changes: 35 additions & 34 deletions go/test/endtoend/clustertest/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestVtgateProcess(t *testing.T) {
verifyVtgateVariables(t, clusterInstance.VtgateProcess.VerifyURL)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "insert into customer(id, email) values(1,'email1')")
Expand All @@ -52,41 +52,42 @@ func TestVtgateProcess(t *testing.T) {
}

func verifyVtgateVariables(t *testing.T, url string) {
resp, _ := http.Get(url)
if resp != nil && resp.StatusCode == 200 {
resultMap := make(map[string]any)
respByte, _ := io.ReadAll(resp.Body)
err := json.Unmarshal(respByte, &resultMap)
require.Nil(t, err)
if resultMap["VtgateVSchemaCounts"] == nil {
t.Error("Vschema count should be present in variables")
}
vschemaCountMap := getMapFromJSON(resultMap, "VtgateVSchemaCounts")
if _, present := vschemaCountMap["Reload"]; !present {
t.Error("Reload count should be present in vschemacount")
} else if object := reflect.ValueOf(vschemaCountMap["Reload"]); object.NumField() <= 0 {
t.Error("Reload count should be greater than 0")
}
if _, present := vschemaCountMap["WatchError"]; present {
t.Error("There should not be any WatchError in VschemaCount")
}
if _, present := vschemaCountMap["Parsing"]; present {
t.Error("There should not be any Parsing in VschemaCount")
}
resp, err := http.Get(url)
require.NoError(t, err)
defer resp.Body.Close()

if resultMap["HealthcheckConnections"] == nil {
t.Error("HealthcheckConnections count should be present in variables")
}
require.Equal(t, 200, resp.StatusCode)
resultMap := make(map[string]any)
respByte, err := io.ReadAll(resp.Body)
require.NoError(t, err)
err = json.Unmarshal(respByte, &resultMap)
require.NoError(t, err)
if resultMap["VtgateVSchemaCounts"] == nil {
t.Error("Vschema count should be present in variables")
}
vschemaCountMap := getMapFromJSON(resultMap, "VtgateVSchemaCounts")
if _, present := vschemaCountMap["Reload"]; !present {
t.Error("Reload count should be present in vschemacount")
} else if object := reflect.ValueOf(vschemaCountMap["Reload"]); object.NumField() <= 0 {
t.Error("Reload count should be greater than 0")
}
if _, present := vschemaCountMap["WatchError"]; present {
t.Error("There should not be any WatchError in VschemaCount")
}
if _, present := vschemaCountMap["Parsing"]; present {
t.Error("There should not be any Parsing in VschemaCount")
}

healthCheckConnection := getMapFromJSON(resultMap, "HealthcheckConnections")
if len(healthCheckConnection) <= 0 {
t.Error("Atleast one healthy tablet needs to be present")
}
if !isPrimaryTabletPresent(healthCheckConnection) {
t.Error("Atleast one PRIMARY tablet needs to be present")
}
} else {
t.Error("Vtgate api url response not found")
if resultMap["HealthcheckConnections"] == nil {
t.Error("HealthcheckConnections count should be present in variables")
}

healthCheckConnection := getMapFromJSON(resultMap, "HealthcheckConnections")
if len(healthCheckConnection) <= 0 {
t.Error("Atleast one healthy tablet needs to be present")
}
if !isPrimaryTabletPresent(healthCheckConnection) {
t.Error("Atleast one PRIMARY tablet needs to be present")
}
}

Expand Down
16 changes: 9 additions & 7 deletions go/test/endtoend/clustertest/vttablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ func TestVttabletProcess(t *testing.T) {
defer cluster.PanicHandler(t)
firstTabletPort := clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].HTTPPort
testURL(t, fmt.Sprintf("http://localhost:%d/debug/vars/", firstTabletPort), "tablet debug var url")
resp, _ := http.Get(fmt.Sprintf("http://localhost:%d/debug/vars", firstTabletPort))
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/debug/vars", firstTabletPort))
require.NoError(t, err)
defer resp.Body.Close()

resultMap := make(map[string]any)
respByte, _ := io.ReadAll(resp.Body)
err := json.Unmarshal(respByte, &resultMap)
if err != nil {
panic(err)
}
respByte, err := io.ReadAll(resp.Body)
require.NoError(t, err)
err = json.Unmarshal(respByte, &resultMap)
require.NoError(t, err)
if got, want := resultMap["TabletKeyspace"], "commerce"; got != want {
t.Errorf("select:\n%v want\n%v for %s", got, want, "Keyspace of tablet should match")
}
Expand All @@ -50,5 +52,5 @@ func TestDeleteTablet(t *testing.T) {
primary := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet()
require.NotNil(t, primary)
_, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("DeleteTablet", "--", "--allow_primary", primary.Alias)
require.Nil(t, err, "Error: %v", err)
require.NoError(t, err)
}
7 changes: 6 additions & 1 deletion go/test/endtoend/messaging/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,13 @@ func parseDebugVars(t *testing.T, output interface{}, vttablet *cluster.Vttablet
if err != nil {
t.Fatalf("failed to fetch %q: %v", debugVarURL, err)
}
defer resp.Body.Close()

respByte, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("failed to read body %q: %v", debugVarURL, err)
}

respByte, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarURL, respByte)
}
Expand Down
Loading

0 comments on commit 2cb7903

Please sign in to comment.