Skip to content

Commit

Permalink
Fix: endpoint throughput improvements (#816)
Browse files Browse the repository at this point in the history
- Move endpoint task serialization to the point right before we actually
run the task
- Migrate json marshal/unmarshal to faster library
  • Loading branch information
luke-lombardi authored Dec 27, 2024
1 parent 6253105 commit bc1a50e
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 51 deletions.
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/hanwen/go-fuse/v2 v2.5.1
github.com/jmoiron/sqlx v1.3.5
github.com/json-iterator/go v1.1.12
github.com/knadh/koanf/parsers/json v0.1.0
github.com/knadh/koanf/parsers/yaml v0.1.0
github.com/knadh/koanf/providers/file v0.1.0
github.com/knadh/koanf/providers/rawbytes v0.1.0
github.com/knadh/koanf/v2 v2.0.1
github.com/labstack/echo-contrib v0.17.1
github.com/labstack/echo/v4 v4.12.0
github.com/labstack/echo/v4 v4.13.3
github.com/lib/pq v1.10.9
github.com/mholt/archiver/v3 v3.5.1
github.com/mitchellh/hashstructure/v2 v2.0.2
Expand All @@ -58,7 +59,7 @@ require (
github.com/sashabaranov/go-openai v1.35.7
github.com/shirou/gopsutil/v4 v4.24.6
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
github.com/tj/assert v0.0.3
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/vishvananda/netns v0.0.4
Expand All @@ -73,9 +74,9 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/net v0.30.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
golang.org/x/net v0.33.0
golang.org/x/sync v0.10.0
golang.org/x/sys v0.28.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.2
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -172,7 +173,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/josharian/native v1.1.1-0.20230202152459-5c7d0dd6ab86 // indirect
github.com/jsimonetti/rtnetlink v1.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/karrick/godirwalk v1.17.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect
Expand Down Expand Up @@ -251,12 +251,12 @@ require (
go.uber.org/zap v1.27.0 // indirect
go4.org/mem v0.0.0-20220726221520-4f986261bf13 // indirect
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/tools v0.24.0 // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
golang.zx2c4.com/wireguard/windows v0.5.3 // indirect
Expand Down
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ github.com/labstack/echo-contrib v0.17.1 h1:7I/he7ylVKsDUieaGRZ9XxxTYOjfQwVzHzUY
github.com/labstack/echo-contrib v0.17.1/go.mod h1:SnsCZtwHBAZm5uBSAtQtXQHI3wqEA73hvTn0bYMKnZA=
github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0=
github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM=
github.com/labstack/echo/v4 v4.13.3 h1:pwhpCPrTl5qry5HRdM5FwdXnhXSLSY+WE+YQSeCaafY=
github.com/labstack/echo/v4 v4.13.3/go.mod h1:o90YNEeQWjDozo584l7AwhJMHN0bOC4tAfg+Xox9q5g=
github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0=
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw=
Expand Down Expand Up @@ -559,6 +561,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tailscale/certstore v0.1.1-0.20231202035212-d3fa0460f47e h1:PtWT87weP5LWHEY//SWsYkSO3RWRZo4OSWagh3YD2vQ=
github.com/tailscale/certstore v0.1.1-0.20231202035212-d3fa0460f47e/go.mod h1:XrBNfAFN+pwoWuksbFS9Ccxnopa15zJGgXRFN90l3K4=
github.com/tailscale/go-winio v0.0.0-20231025203758-c4f33415bf55 h1:Gzfnfk2TWrk8Jj4P4c1a3CtQyMaTVCznlkLZI++hok4=
Expand Down Expand Up @@ -675,6 +679,8 @@ golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8=
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY=
Expand All @@ -700,6 +706,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
Expand All @@ -712,6 +720,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -743,15 +753,23 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
Expand Down
58 changes: 35 additions & 23 deletions pkg/abstractions/endpoint/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package endpoint
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/beam-cloud/beta9/pkg/common"
"github.com/beam-cloud/beta9/pkg/network"
"github.com/beam-cloud/beta9/pkg/repository"
"github.com/beam-cloud/beta9/pkg/task"
"github.com/beam-cloud/beta9/pkg/types"
)

Expand All @@ -32,7 +32,6 @@ const (

type request struct {
ctx echo.Context
payload *types.TaskPayload
task *EndpointTask
done chan bool
processed bool
Expand Down Expand Up @@ -131,13 +130,12 @@ func (rb *RequestBuffer) handleHeartbeatEvents() {
}
}

func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask, payload *types.TaskPayload) error {
func (rb *RequestBuffer) ForwardRequest(ctx echo.Context, task *EndpointTask) error {
done := make(chan bool)
req := &request{
ctx: ctx,
done: done,
payload: payload,
task: task,
ctx: ctx,
done: done,
task: task,
}
rb.buffer.Push(req, false)

Expand Down Expand Up @@ -175,7 +173,6 @@ func (rb *RequestBuffer) processRequests() {

if req.ctx.Request().Context().Err() != nil {
rb.cancelInFlightTask(req.task)
req.payload = nil
continue
}

Expand Down Expand Up @@ -419,20 +416,39 @@ func (rb *RequestBuffer) handleWSRequest(req *request, c container) {
func (rb *RequestBuffer) handleHttpRequest(req *request, c container) {
request := req.ctx.Request()

requestBody := request.Body
var requestBody io.ReadCloser = request.Body
if !rb.isASGI {
b, err := json.Marshal(req.payload)
payload, err := task.SerializeHttpPayload(req.ctx)
if err != nil {
if req.ctx.Request().Context().Err() == context.Canceled {
rb.cancelInFlightTask(req.task)
return
}

req.ctx.JSON(http.StatusBadRequest, map[string]interface{}{
"error": err.Error(),
})
return
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
req.ctx.JSON(http.StatusBadRequest, map[string]interface{}{
"error": err.Error(),
})
return
}
requestBody = io.NopCloser(bytes.NewReader(b))

requestBody = io.NopCloser(bytes.NewReader(payloadBytes))
}

httpClient, err := rb.getHttpClient(c.address)
if err != nil {
req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": "Internal server error",
})
return
}

containerUrl := fmt.Sprintf("http://%s/%s", c.address, req.ctx.Param("subPath"))

// Forward query params to the container if ASGI
Expand Down Expand Up @@ -463,23 +479,20 @@ func (rb *RequestBuffer) handleHttpRequest(req *request, c container) {

resp, err := httpClient.Do(httpReq)
if err != nil {
req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": "Internal server error",
})
if req.ctx.Request().Context().Err() == context.Canceled {
rb.cancelInFlightTask(req.task)
}
return
}

defer resp.Body.Close()

// Write response headers
// Set response headers and status code before writing the body
for key, values := range resp.Header {
for _, value := range values {
req.ctx.Response().Writer.Header().Add(key, value)
req.ctx.Response().Header().Add(key, value)
}
}

// Write status code header
req.ctx.Response().Writer.WriteHeader(resp.StatusCode)
req.ctx.Response().WriteHeader(resp.StatusCode)

// Check if we can stream the response
streamingSupported := true
Expand All @@ -501,7 +514,7 @@ func (rb *RequestBuffer) handleHttpRequest(req *request, c container) {
}

if err != nil {
if err != io.EOF {
if err != io.EOF && err != context.Canceled {
req.ctx.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": "Internal server error",
})
Expand Down Expand Up @@ -537,7 +550,6 @@ func (rb *RequestBuffer) heartBeat(req *request, containerId string) {
func (rb *RequestBuffer) afterRequest(req *request, containerId string) {
defer func() {
req.done <- true
req.payload = nil
}()

defer rb.releaseRequestToken(containerId, req.task.msg.TaskId)
Expand Down
17 changes: 5 additions & 12 deletions pkg/abstractions/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package endpoint

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

jsoniter "github.com/json-iterator/go"

"github.com/labstack/echo/v4"

abstractions "github.com/beam-cloud/beta9/pkg/abstractions/common"
Expand All @@ -21,6 +22,8 @@ import (
pb "github.com/beam-cloud/beta9/proto"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

type EndpointService interface {
pb.EndpointServiceServer
StartEndpointServe(in *pb.StartEndpointServeRequest, stream pb.EndpointService_StartEndpointServeServer) error
Expand Down Expand Up @@ -184,16 +187,6 @@ func (es *HttpEndpointService) forwardRequest(
})
}

payload := &types.TaskPayload{}
if !instance.isASGI {
payload, err = task.SerializeHttpPayload(ctx)
if err != nil {
return ctx.JSON(http.StatusBadRequest, map[string]interface{}{
"error": err.Error(),
})
}
}

// Needed for backwards compatibility
ttl := instance.StubConfig.TaskPolicy.TTL
if ttl == 0 {
Expand All @@ -209,7 +202,7 @@ func (es *HttpEndpointService) forwardRequest(
return err
}

return task.Execute(ctx.Request().Context(), ctx, payload)
return task.Execute(ctx.Request().Context(), ctx)
}

func (es *HttpEndpointService) InstanceFactory(stubId string, options ...func(abstractions.IAutoscaledInstance)) (abstractions.IAutoscaledInstance, error) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/abstractions/endpoint/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type EndpointTask struct {
func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) error {
var err error = nil
echoCtx := options[0].(echo.Context)
payload := options[1].(*types.TaskPayload)

instance, err := t.es.getOrCreateEndpointInstance(ctx, t.msg.StubId)
if err != nil {
Expand All @@ -32,7 +31,7 @@ func (t *EndpointTask) Execute(ctx context.Context, options ...interface{}) erro
return err
}

return instance.buffer.ForwardRequest(echoCtx, t, payload)
return instance.buffer.ForwardRequest(echoCtx, t)
}

func (t *EndpointTask) Retry(ctx context.Context) error {
Expand Down
4 changes: 3 additions & 1 deletion pkg/task/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ func (d *Dispatcher) RetryTask(ctx context.Context, task types.TaskInterface) er

// Hit retry limit, cancel task and resolve
if taskMessage.Retries >= taskMessage.Policy.MaxRetries {
log.Info().Str("task_id", taskMessage.TaskId).Str("stub_id", taskMessage.StubId).Msg("dispatcher hit retry limit, not reinserting task")
if taskMessage.Policy.MaxRetries > 0 {
log.Info().Str("task_id", taskMessage.TaskId).Str("stub_id", taskMessage.StubId).Msg("dispatcher hit retry limit, not reinserting task")
}

err = task.Cancel(ctx, types.TaskExceededRetryLimit)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/task/serialize.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package task

import (
"encoding/json"
"errors"
"io"
"net/url"
"strconv"

"github.com/beam-cloud/beta9/pkg/types"
jsoniter "github.com/json-iterator/go"
"github.com/labstack/echo/v4"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

func SerializeHttpPayload(ctx echo.Context) (*types.TaskPayload, error) {
defer ctx.Request().Body.Close()

// Create a JSON decoder
decoder := json.NewDecoder(ctx.Request().Body)

// Decode the JSON directly from the reader
payload := map[string]interface{}{}
if err := decoder.Decode(&payload); err != nil {
if err != io.EOF {
Expand Down

0 comments on commit bc1a50e

Please sign in to comment.