Skip to content

Commit

Permalink
Merge pull request #127 from kaleido-io/feature/stream-openapi-support
Browse files Browse the repository at this point in the history
[ffapi] Support for Text and Binary Streams
  • Loading branch information
nguyer authored Feb 26, 2024
2 parents b875778 + 26af1df commit eb93fd6
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 24 deletions.
28 changes: 21 additions & 7 deletions pkg/ffapi/apiserver.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -19,6 +19,7 @@ package ffapi
import (
"context"
"fmt"
"io"
"net"
"net/http"
"time"
Expand Down Expand Up @@ -86,6 +87,7 @@ type APIServerOptions[T any] struct {
type APIServerRouteExt[T any] struct {
JSONHandler func(*APIRequest, T) (output interface{}, err error)
UploadHandler func(*APIRequest, T) (output interface{}, err error)
StreamHandler func(*APIRequest, T) (output io.ReadCloser, err error)
}

// NewAPIServer makes a new server, with the specified configuration, and
Expand Down Expand Up @@ -201,13 +203,25 @@ func (as *apiServer[T]) routeHandler(hf *HandlerFactory, route *Route) http.Hand
// We extend the base ffapi functionality, with standardized DB filter support for all core resources.
// We also pass the Orchestrator context through
ext := route.Extensions.(*APIServerRouteExt[T])
route.JSONHandler = func(r *APIRequest) (output interface{}, err error) {
er, err := as.EnrichRequest(r)
if err != nil {
return nil, err
switch {
case ext.StreamHandler != nil:
route.StreamHandler = func(r *APIRequest) (output io.ReadCloser, err error) {
er, err := as.EnrichRequest(r)
if err != nil {
return nil, err
}
return ext.StreamHandler(r, er)
}
case ext.JSONHandler != nil:
route.JSONHandler = func(r *APIRequest) (output interface{}, err error) {
er, err := as.EnrichRequest(r)
if err != nil {
return nil, err
}
return ext.JSONHandler(r, er)
}
return ext.JSONHandler(r, er)
}

return hf.RouteHandler(route)
}

Expand Down Expand Up @@ -247,7 +261,7 @@ func (as *apiServer[T]) createMuxRouter(ctx context.Context) *mux.Router {
return ce.UploadHandler(r, er)
}
}
if ce.JSONHandler != nil || ce.UploadHandler != nil {
if ce.JSONHandler != nil || ce.UploadHandler != nil || ce.StreamHandler != nil {
r.HandleFunc(fmt.Sprintf("/api/v1/%s", route.Path), as.routeHandler(hf, route)).
Methods(route.Method)
}
Expand Down
51 changes: 50 additions & 1 deletion pkg/ffapi/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ffapi
import (
"context"
"fmt"
"github.com/getkin/kin-openapi/openapi3"
"io"
"net/http"
"strings"
Expand All @@ -38,6 +39,7 @@ type utManager struct {
mockEnrichErr error
calledJSONHandler string
calledUploadHandler string
calledStreamHandler string
}

type sampleInput struct {
Expand Down Expand Up @@ -80,6 +82,35 @@ var utAPIRoute1 = &Route{
},
}

var utAPIRoute2 = &Route{
Name: "utAPIRoute2",
Path: "ut/utresource/{resourceid}/getit",
Method: http.MethodGet,
Description: "random GET stream route for testing",
PathParams: []*PathParam{
{Name: "resourceid", Description: "My resource"},
},
FormParams: nil,
JSONInputValue: nil,
JSONOutputValue: nil,
JSONOutputCodes: nil,
CustomResponseRefs: map[string]*openapi3.ResponseRef{
"200": {
Value: &openapi3.Response{
Content: openapi3.Content{
"application/octet-stream": {},
},
},
},
},
Extensions: &APIServerRouteExt[*utManager]{
StreamHandler: func(r *APIRequest, um *utManager) (output io.ReadCloser, err error) {
um.calledStreamHandler = r.PP["resourceid"]
return io.NopCloser(strings.NewReader("a stream!")), nil
},
},
}

func initUTConfig() (config.Section, config.Section, config.Section) {
config.RootConfigReset()
apiConfig := config.RootSection("ut.api")
Expand All @@ -97,7 +128,7 @@ func newTestAPIServer(t *testing.T, start bool) (*utManager, *apiServer[*utManag
um := &utManager{t: t}
as := NewAPIServer(ctx, APIServerOptions[*utManager]{
MetricsRegistry: metric.NewPrometheusMetricsRegistry("ut"),
Routes: []*Route{utAPIRoute1},
Routes: []*Route{utAPIRoute1, utAPIRoute2},
EnrichRequest: func(r *APIRequest) (*utManager, error) {
// This could be some dynamic object based on extra processing in the request,
// but the most common case is you just have a "manager" that you inject into each
Expand Down Expand Up @@ -125,6 +156,24 @@ func newTestAPIServer(t *testing.T, start bool) (*utManager, *apiServer[*utManag
}
}

func TestAPIServerInvokeAPIRouteStream(t *testing.T) {
um, as, done := newTestAPIServer(t, true)
defer done()

<-as.Started()

var o sampleOutput
res, err := resty.New().R().
SetBody(nil).
SetResult(&o).
Get(fmt.Sprintf("%s/api/v1/ut/utresource/id12345/getit", as.APIPublicURL()))
assert.NoError(t, err)
assert.Equal(t, 200, res.StatusCode())
assert.Equal(t, "application/octet-stream", res.Header().Get("Content-Type"))
assert.Equal(t, "id12345", um.calledStreamHandler)
assert.Equal(t, "a stream!", string(res.Body()))
}

func TestAPIServerInvokeAPIRouteJSON(t *testing.T) {
um, as, done := newTestAPIServer(t, true)
defer done()
Expand Down
33 changes: 20 additions & 13 deletions pkg/ffapi/handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -189,7 +189,7 @@ func (hs *HandlerFactory) RouteHandler(route *Route) http.HandlerFunc {
}
}

var status = 400 // if fail parsing input
status := 400 // if fail parsing input
var output interface{}
if err == nil {
queryParams, pathParams, queryArrayParams = hs.getParams(req, route)
Expand All @@ -202,24 +202,29 @@ func (hs *HandlerFactory) RouteHandler(route *Route) http.HandlerFunc {

if err == nil {
r := &APIRequest{
Req: req,
PP: pathParams,
QP: queryParams,
QAP: queryArrayParams,
Filter: filter,
Input: jsonInput,
SuccessStatus: http.StatusOK,
Req: req,
PP: pathParams,
QP: queryParams,
QAP: queryArrayParams,
Filter: filter,
Input: jsonInput,
SuccessStatus: http.StatusOK,
AlwaysPaginate: hs.AlwaysPaginate,

// res.Header() returns a map which is a ref type so handler header edits are persisted
ResponseHeaders: res.Header(),
AlwaysPaginate: hs.AlwaysPaginate,
}
if len(route.JSONOutputCodes) > 0 {
r.SuccessStatus = route.JSONOutputCodes[0]
}
if multipart != nil {
switch {
case multipart != nil:
r.FP = multipart.formParams
r.Part = multipart.part
output, err = route.FormUploadHandler(r)
} else {
case route.StreamHandler != nil:
output, err = route.StreamHandler(r)
default:
output, err = route.JSONHandler(r)
}
status = r.SuccessStatus // Can be updated by the route
Expand Down Expand Up @@ -259,7 +264,9 @@ func (hs *HandlerFactory) handleOutput(ctx context.Context, res http.ResponseWri
res.WriteHeader(204)
case reader != nil:
defer reader.Close()
res.Header().Add("Content-Type", "application/octet-stream")
if res.Header().Get("Content-Type") == "" {
res.Header().Add("Content-Type", "application/octet-stream")
}
res.WriteHeader(status)
_, marshalErr = io.Copy(res, reader)
default:
Expand Down
68 changes: 68 additions & 0 deletions pkg/ffapi/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/getkin/kin-openapi/openapi3"
"github.com/stretchr/testify/require"
"io"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -156,6 +158,72 @@ func TestJSONHTTPNilResponseNon204(t *testing.T) {
assert.Regexp(t, "FF00164", resJSON["error"])
}

func TestStreamHttpResponsePlainText200(t *testing.T) {
text := `
some stream
of
text
!!!
`
s, _, done := newTestServer(t, []*Route{{
Name: "testRoute",
Path: "/test",
Method: "GET",
CustomResponseRefs: map[string]*openapi3.ResponseRef{
"200": {
Value: &openapi3.Response{
Content: openapi3.Content{
"text/plain": {},
},
},
},
},
StreamHandler: func(r *APIRequest) (output io.ReadCloser, err error) {
r.ResponseHeaders.Add("Content-Type", "text/plain")
return io.NopCloser(strings.NewReader(text)), nil
},
}}, "", nil)
defer done()

res, err := http.Get(fmt.Sprintf("http://%s/test", s.Addr()))
require.NoError(t, err)
assert.Equal(t, 200, res.StatusCode)
assert.Equal(t, "text/plain", res.Header.Get("Content-Type"))
b, err := io.ReadAll(res.Body)
require.NoError(t, err)
assert.Equal(t, text, string(b))
}

func TestStreamHttpResponseBinary200(t *testing.T) {
randomBytes := []byte{3, 255, 192, 201, 33, 50}
s, _, done := newTestServer(t, []*Route{{
Name: "testRoute",
Path: "/test",
Method: "GET",
CustomResponseRefs: map[string]*openapi3.ResponseRef{
"200": {
Value: &openapi3.Response{
Content: openapi3.Content{
"application/octet-stream": &openapi3.MediaType{},
},
},
},
},
StreamHandler: func(r *APIRequest) (output io.ReadCloser, err error) {
return io.NopCloser(bytes.NewReader(randomBytes)), nil
},
}}, "", nil)
defer done()

res, err := http.Get(fmt.Sprintf("http://%s/test", s.Addr()))
require.NoError(t, err)
assert.Equal(t, 200, res.StatusCode)
assert.Equal(t, "application/octet-stream", res.Header.Get("Content-Type"))
b, err := io.ReadAll(res.Body)
require.NoError(t, err)
assert.Equal(t, randomBytes, b)
}

func TestJSONHTTPDefault500Error(t *testing.T) {
s, _, done := newTestServer(t, []*Route{{
Name: "testRoute",
Expand Down
6 changes: 6 additions & 0 deletions pkg/ffapi/openapi3.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ func (sg *SwaggerGen) addOutput(ctx context.Context, doc *openapi3.T, route *Rou
},
})
}
for code, res := range route.CustomResponseRefs {
if res.Value != nil && res.Value.Description == nil {
res.Value.Description = &s
}
op.Responses.Set(code, res)
}
}

func (sg *SwaggerGen) AddParam(ctx context.Context, op *openapi3.Operation, in, name, def, example string, description i18n.MessageKey, deprecated bool, msgArgs ...interface{}) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/ffapi/openapi3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ffapi
import (
"context"
"fmt"
"github.com/stretchr/testify/require"
"net/http"
"testing"

Expand Down Expand Up @@ -298,6 +299,36 @@ func TestFFExcludeTag(t *testing.T) {
assert.Regexp(t, "no schema", err)
}

func TestCustomResponseRefs(t *testing.T) {
routes := []*Route{
{
Name: "CustomResponseRefTest",
Path: "/test",
Method: http.MethodGet,
CustomResponseRefs: map[string]*openapi3.ResponseRef{
"200": {
Value: &openapi3.Response{
Content: openapi3.Content{
"text/plain": &openapi3.MediaType{},
},
},
},
},
},
}
swagger := NewSwaggerGen(&SwaggerGenOptions{
Title: "UnitTest",
Version: "1.0",
BaseURL: "http://localhost:12345/api/v1",
}).Generate(context.Background(), routes)
assert.Nil(t, swagger.Paths.Find("/test").Get.RequestBody)
require.NotEmpty(t, swagger.Paths.Find("/test").Get.Responses)
require.NotNil(t, swagger.Paths.Find("/test").Get.Responses.Value("200"))
require.NotNil(t, swagger.Paths.Find("/test").Get.Responses.Value("200").Value)
assert.NotNil(t, swagger.Paths.Find("/test").Get.Responses.Value("200").Value.Content.Get("text/plain"))
assert.Nil(t, swagger.Paths.Find("/test").Get.Responses.Value("201"))
}

func TestPanicOnMissingDescription(t *testing.T) {
routes := []*Route{
{
Expand Down
11 changes: 8 additions & 3 deletions pkg/ffapi/routes.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,6 +18,7 @@ package ffapi

import (
"context"
"io"

"github.com/getkin/kin-openapi/openapi3"
"github.com/hyperledger/firefly-common/pkg/config"
Expand Down Expand Up @@ -61,12 +62,16 @@ type Route struct {
JSONOutputSchema func(ctx context.Context, schemaGen SchemaGenerator) (*openapi3.SchemaRef, error)
// JSONOutputValue is a function that returns a pointer to a structure to take JSON output
JSONOutputValue func() interface{}
// JSONOutputCodes is the success response code
// JSONOutputCodes is the success response codes that could be returned by the API. Error codes are explicitly not supported by the framework since they could be subject to change by the errors thrown or how errors are handled.
JSONOutputCodes []int
// JSONHandler is a function for handling JSON content type input. Input/Ouptut objects are returned by JSONInputValue/JSONOutputValue funcs
// JSONHandler is a function for handling JSON content type input. Input/Output objects are returned by JSONInputValue/JSONOutputValue funcs
JSONHandler func(r *APIRequest) (output interface{}, err error)
// FormUploadHandler takes a single file upload, and returns a JSON object
FormUploadHandler func(r *APIRequest) (output interface{}, err error)
// StreamHandler allows for custom request handling with explicit stream (io.ReadCloser) responses
StreamHandler func(r *APIRequest) (output io.ReadCloser, err error)
// CustomResponseRefs allows for specifying custom responses for a route
CustomResponseRefs map[string]*openapi3.ResponseRef
// Deprecated whether this route is deprecated
Deprecated bool
// Tag a category identifier for this route in the generated OpenAPI spec
Expand Down

0 comments on commit eb93fd6

Please sign in to comment.