Skip to content

Commit

Permalink
refactor: pull ingress handler out of Controller (#1020)
Browse files Browse the repository at this point in the history
Add some basic e2e tests for the ingress handler.
  • Loading branch information
alecthomas authored Mar 6, 2024
1 parent c995fa4 commit 335d952
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 5,651 deletions.
122 changes: 1 addition & 121 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package controller

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -179,10 +178,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
return svc, nil
}

// ServeHTTP handles ingress routes.
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := log.FromContext(r.Context())
logger.Debugf("%s %s", r.Method, r.URL.Path)
routes, err := s.dal.GetIngressRoutes(r.Context(), r.Method)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
Expand All @@ -192,93 +188,17 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
route, err := ingress.GetIngressRoute(routes, r.Method, r.URL.Path)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
http.NotFound(w, r)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

sch, err := s.getActiveSchema(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

body, err := ingress.BuildRequestBody(route, r, sch)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

creq := connect.NewRequest(&ftlv1.CallRequest{
Metadata: &ftlv1.Metadata{},
Verb: &schemapb.VerbRef{Module: route.Module, Name: route.Verb},
Body: body,
})

requestName, err := s.dal.CreateIngressRequest(r.Context(), fmt.Sprintf("%s %s", r.Method, r.URL.Path), r.RemoteAddr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
headers.SetRequestName(creq.Header(), requestName)
resp, err := s.Call(r.Context(), creq)
if err != nil {
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
http.Error(w, err.Error(), connectCodeToHTTP(connectErr.Code()))
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
switch msg := resp.Msg.Response.(type) {
case *ftlv1.CallResponse_Body:
verb := sch.ResolveVerbRef(&schema.VerbRef{Name: route.Verb, Module: route.Module})
if verb == nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

var responseBody []byte

if metadata, ok := verb.GetMetadataIngress().Get(); ok && metadata.Type == "http" {
var response ingress.HTTPResponse
if err := json.Unmarshal(msg.Body, &response); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

var responseHeaders http.Header
responseBody, responseHeaders, err = ingress.ResponseForVerb(sch, verb, response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

for k, v := range responseHeaders {
w.Header()[k] = v
}

if response.Status != 0 {
w.WriteHeader(response.Status)
}
} else {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
responseBody = msg.Body
}
_, err = w.Write(responseBody)
if err != nil {
logger.Errorf(err, "Could not write response body")
}

case *ftlv1.CallResponse_Error_:
http.Error(w, msg.Error.Message, http.StatusInternalServerError)
}
ingress.Handle(sch, requestName, routes, w, r, s.Call)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down Expand Up @@ -1153,46 +1073,6 @@ func (s *Service) getActiveSchema(ctx context.Context) (*schema.Schema, error) {
})
}

// Copied from the Apache-licensed connect-go source.
func connectCodeToHTTP(code connect.Code) int {
switch code {
case connect.CodeCanceled:
return 408
case connect.CodeUnknown:
return 500
case connect.CodeInvalidArgument:
return 400
case connect.CodeDeadlineExceeded:
return 408
case connect.CodeNotFound:
return 404
case connect.CodeAlreadyExists:
return 409
case connect.CodePermissionDenied:
return 403
case connect.CodeResourceExhausted:
return 429
case connect.CodeFailedPrecondition:
return 412
case connect.CodeAborted:
return 409
case connect.CodeOutOfRange:
return 400
case connect.CodeUnimplemented:
return 404
case connect.CodeInternal:
return 500
case connect.CodeUnavailable:
return 503
case connect.CodeDataLoss:
return 500
case connect.CodeUnauthenticated:
return 401
default:
return 500 // same as CodeUnknown
}
}

func runWithRetries(ctx context.Context, success, failure time.Duration, fn func(ctx context.Context) error) {
name := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
name = name[strings.LastIndex(name, ".")+1:]
Expand Down
142 changes: 142 additions & 0 deletions backend/controller/ingress/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package ingress

import (
"context"
"encoding/json"
"errors"
"net/http"

"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/controller/dal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc/headers"
)

// Handle HTTP ingress routes.
func Handle(
sch *schema.Schema,
requestName model.RequestName,
routes []dal.IngressRoute,
w http.ResponseWriter,
r *http.Request,
call func(context.Context, *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error),
) {
logger := log.FromContext(r.Context())
logger.Debugf("%s %s", r.Method, r.URL.Path)
route, err := GetIngressRoute(routes, r.Method, r.URL.Path)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
http.NotFound(w, r)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

body, err := BuildRequestBody(route, r, sch)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

creq := connect.NewRequest(&ftlv1.CallRequest{
Metadata: &ftlv1.Metadata{},
Verb: &schemapb.VerbRef{Module: route.Module, Name: route.Verb},
Body: body,
})

headers.SetRequestName(creq.Header(), requestName)
resp, err := call(r.Context(), creq)
if err != nil {
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
http.Error(w, err.Error(), connectCodeToHTTP(connectErr.Code()))
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
switch msg := resp.Msg.Response.(type) {
case *ftlv1.CallResponse_Body:
verb := sch.ResolveVerbRef(&schema.VerbRef{Name: route.Verb, Module: route.Module})
var responseBody []byte

if metadata, ok := verb.GetMetadataIngress().Get(); ok && metadata.Type == "http" {
var response HTTPResponse
if err := json.Unmarshal(msg.Body, &response); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

var responseHeaders http.Header
responseBody, responseHeaders, err = ResponseForVerb(sch, verb, response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

for k, v := range responseHeaders {
w.Header()[k] = v
}

if response.Status != 0 {
w.WriteHeader(response.Status)
}
} else {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
responseBody = msg.Body
}
_, err = w.Write(responseBody)
if err != nil {
logger.Errorf(err, "Could not write response body")
}

case *ftlv1.CallResponse_Error_:
http.Error(w, msg.Error.Message, http.StatusInternalServerError)
}
}

// Copied from the Apache-licensed connect-go source.
func connectCodeToHTTP(code connect.Code) int {
switch code {
case connect.CodeCanceled:
return 408
case connect.CodeUnknown:
return 500
case connect.CodeInvalidArgument:
return 400
case connect.CodeDeadlineExceeded:
return 408
case connect.CodeNotFound:
return 404
case connect.CodeAlreadyExists:
return 409
case connect.CodePermissionDenied:
return 403
case connect.CodeResourceExhausted:
return 429
case connect.CodeFailedPrecondition:
return 412
case connect.CodeAborted:
return 409
case connect.CodeOutOfRange:
return 400
case connect.CodeUnimplemented:
return 404
case connect.CodeInternal:
return 500
case connect.CodeUnavailable:
return 503
case connect.CodeDataLoss:
return 500
case connect.CodeUnauthenticated:
return 401
default:
return 500 // same as CodeUnknown
}
}
Loading

0 comments on commit 335d952

Please sign in to comment.