Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2249] Add compression option to getQueueApplication API #757

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/scheduler/tests/restclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ func (c *RClient) GetEventsStream(count uint64) (io.ReadCloser, error) {
return nil, err
}
req.URL.RawQuery = "count=" + strconv.FormatUint(count, 10)
resp, err := http.DefaultClient.Do(req)
tr := &http.Transport{
DisableCompression: true,
}
client := &http.Client{
Transport: tr,
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
Expand Down
29 changes: 28 additions & 1 deletion pkg/webservice/webservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package webservice

import (
"compress/gzip"
"context"
"errors"
"io"
"net/http"
"strings"
"time"

"github.com/julienschmidt/httprouter"
Expand All @@ -43,7 +46,7 @@
func newRouter() *httprouter.Router {
router := httprouter.New()
for _, webRoute := range webRoutes {
handler := loggingHandler(webRoute.HandlerFunc, webRoute.Name)
handler := gzipHandler(loggingHandler(webRoute.HandlerFunc, webRoute.Name))

Check warning on line 49 in pkg/webservice/webservice.go

View check run for this annotation

Codecov / codecov/patch

pkg/webservice/webservice.go#L49

Added line #L49 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should unconditionally GZIP every response, only those that might be large. I'd suggest adding a "Compress" boolean to the route object in routes.go and use that to determine whether to add the gzip handler.

Copy link
Contributor

@craigcondit craigcondit Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, none of the /debug routes should have it (many of those generate binary data which is not compressible, and/or are simple text that wouldn't benefit). It should probably not be on anything that typically results in < 10 KB of data.

router.Handler(webRoute.Method, webRoute.Pattern, handler)
}
return router
Expand Down Expand Up @@ -94,3 +97,27 @@

return nil
}

type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
}

func (w gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}

func gzipHandler(fn http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
fn(w, r)
return
}
w.Header().Set("Content-Encoding", "gzip")
w.Header().Del("Content-Length")
gz := gzip.NewWriter(w)
defer gz.Close()
gzr := gzipResponseWriter{Writer: gz, ResponseWriter: w}
fn(gzr, r)
}
}
125 changes: 125 additions & 0 deletions pkg/webservice/webservice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webservice

import (
"compress/gzip"
"encoding/json"
"io"
"net"
"net/http"
"net/url"
"testing"
"time"

"github.com/julienschmidt/httprouter"
"go.uber.org/zap"
"gotest.tools/v3/assert"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/scheduler"
)

func TestCompression(t *testing.T) {
m := NewWebApp(scheduler.NewScheduler().GetClusterContext(), nil)
// dummy route and corresponding handler
testRoute := route{
"testHelloWord",
"GET",
"/ws/v1/helloWorld",
getHelloWorld,
}
router := httprouter.New()
testHandler := gzipHandler(loggingHandler(testRoute.HandlerFunc, testRoute.Name))
router.Handler(testRoute.Method, testRoute.Pattern, testHandler)

// start simulation server
m.httpServer = &http.Server{Addr: ":9080", Handler: router, ReadHeaderTimeout: 5 * time.Second}
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse the code of WebService? For example, we can add variety startWebApp to accept a custom route.

func newRouter(routes []route) *httprouter.Router {
	router := httprouter.New()
	for _, webRoute := range routes {
		handler := gzipHandler(loggingHandler(webRoute.HandlerFunc, webRoute.Name))
		router.Handler(webRoute.Method, webRoute.Pattern, handler)
	}
	return router
}

func (m *WebService) StartWebApp() {
	m.startWebApp(webRoutes)
}

func (m *WebService) startWebApp(routes []route) {
	m.httpServer = &http.Server{Addr: ":9080", Handler: newRouter(routes)}

	log.Log(log.REST).Info("web-app started", zap.Int("port", 9080))
	go func() {
		httpError := m.httpServer.ListenAndServe()
		if httpError != nil && !errors.Is(httpError, http.ErrServerClosed) {
			log.Log(log.REST).Error("HTTP serving error",
				zap.Error(httpError))
		}
	}()
}

and then we call startWebApp instead of invoking thread.

	m := NewWebApp(scheduler.NewScheduler().GetClusterContext(), nil)
	// start simulation server
	m.startWebApp([]route{route{
		"testHelloWord",
		"GET",
		"/ws/v1/helloWorld",
		getHelloWorld,
	}})

httpError := m.httpServer.ListenAndServe()
if httpError != nil {
log.Log(log.REST).Error("HTTP serving error",
zap.Error(httpError))
}
}()
defer func() {
err := m.StopWebApp()
assert.NilError(t, err, "Error when closing webapp service.")
}()

err := common.WaitFor(500*time.Millisecond, 5*time.Second, func() bool {
conn, connErr := net.DialTimeout("tcp", net.JoinHostPort("127.0.0.1", "9080"), time.Second)
if connErr == nil {
conn.Close()
}
return connErr == nil
})
assert.NilError(t, err, "Web app failed to start in 5 seconds.")

u := &url.URL{
Host: "localhost:9080",
Scheme: "http",
Path: "/ws/v1/helloWorld",
}

// request without gzip compression
var buf io.ReadWriter
req, err := http.NewRequest("GET", u.String(), buf)
assert.NilError(t, err, "Create new http request failed.")
req.Header.Set("Accept", "application/json")

// prevent http.DefaultClient from automatically adding gzip header
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge those code into a helper function. For example:

	check := func(compression bool) {
		req, err := http.NewRequest("GET", u.String(), nil)
		assert.NilError(t, err, "Create new http request failed.")
		req.Header.Set("Accept", "application/json")
		if compression {
			req.Header.Set("Accept-Encoding", "gzip")
		} else {
			req.Header.Set("Accept-Encoding", "deflate")
		}

		resp, err := http.DefaultClient.Do(req)
		assert.NilError(t, err, "Http request failed.")
		defer resp.Body.Close()

		var reader io.Reader
		if compression {
			gzipReader, err := gzip.NewReader(resp.Body)
			assert.NilError(t, err, "Failed to create gzip reader.")
			defer gzipReader.Close()
			reader = gzipReader
		} else {
			reader = resp.Body
		}
		byteArr, err := io.ReadAll(reader)
		assert.NilError(t, err, "Failed to read body.")

		var respMsg map[string]string
		err = json.Unmarshal(byteArr, &respMsg)
		assert.NilError(t, err, unmarshalError)
		assert.Equal(t, len(respMsg), 1)
		assert.Equal(t, respMsg["data"], "hello world")
	}

	check(false)
	check(true)

req.Header.Set("Accept-Encoding", "deflate")
resp, err := http.DefaultClient.Do(req)
assert.NilError(t, err, "Http request failed.")
defer resp.Body.Close()
byteArr, err := io.ReadAll(resp.Body)
assert.NilError(t, err, "Failed to read body.")
var respMsg map[string]string
err = json.Unmarshal(byteArr, &respMsg)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, respMsg["data"], "hello world")

// request with gzip compression enabled
req.Header.Set("Accept-Encoding", "gzip")
resp2, err := http.DefaultClient.Do(req)
assert.NilError(t, err, "Http request failed.")
defer resp2.Body.Close()
gzipReader, err := gzip.NewReader(resp2.Body)
assert.NilError(t, err, "Failed to create gzip reader.")
compressedData, err := io.ReadAll(gzipReader)
assert.NilError(t, err, "Failed to read body")
var respMsg2 map[string]string
err = json.Unmarshal(compressedData, &respMsg2)
assert.NilError(t, err, unmarshalError)
assert.Equal(t, respMsg2["data"], "hello world")
defer gzipReader.Close()
}

func getHelloWorld(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
result := map[string]string{
"data": "hello world",
}

if err := json.NewEncoder(w).Encode(result); err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusInternalServerError)
}
}