Skip to content

Commit

Permalink
Router fixes (#1013)
Browse files Browse the repository at this point in the history
PR #982 changed how the router works. It fixes some issues, but
introduces a bug: "PUT _bulk" is treated as "PUT :index".

In this PR :
1. Revert router changes (#982)
2. Fix async handling 
3. Fix mapping handling 
4. Fix "GET :index" handling  (it was always redirected to the elastic)
5. Handle Kibana internal indexes in an explicit way
  • Loading branch information
nablaone authored Nov 18, 2024
1 parent 864c8ba commit 0b40e2c
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 65 deletions.
3 changes: 3 additions & 0 deletions quesma/elasticsearch/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ func IsIndexPattern(index string) bool {
func IsInternalIndex(index string) bool {
return strings.HasPrefix(index, internalIndexPrefix)
}

// InternalPaths is a list of paths that are considered internal and should not handled by Quesma
var InternalPaths = []string{"/_nodes", "/_xpack"}
14 changes: 8 additions & 6 deletions quesma/quesma/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,21 @@ func (p *PathRouter) Matches(req *Request) (Handler, *table_resolver.Decision) {
}
}

func (p *PathRouter) findHandler(req *Request) (handler Handler, decision *table_resolver.Decision) {
func (p *PathRouter) findHandler(req *Request) (Handler, *table_resolver.Decision) {
path := strings.TrimSuffix(req.Path, "/")
for _, m := range p.mappings {
if pathData, pathMatches := m.compiledPath.Match(path); pathMatches {
req.Params = pathData.Params
meta, match := m.compiledPath.Match(path)
if match {
req.Params = meta.Params
predicateResult := m.predicate.Matches(req)
decision = predicateResult.Decision
if predicateResult.Matched {
handler = m.handler
return m.handler, predicateResult.Decision
} else {
return nil, predicateResult.Decision
}
}
}
return handler, decision
return nil, nil
}

type httpMethodPredicate struct {
Expand Down
153 changes: 94 additions & 59 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
and := mux.And

router := mux.NewPathRouter()

// These are the endpoints that are not supported by Quesma
// These will redirect to the elastic cluster.
for _, path := range elasticsearch.InternalPaths {
router.Register(path, mux.Never(), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { return nil, nil })
}

// These are the endpoints that are supported by Quesma

// Warning:
// The first handler that matches the path will be considered to use.
// If the predicate returns false it will be redirected to the elastic cluster.
// If the predicate returns true, the handler will be used.
//
// So, if you add multiple handlers with the same path, the first one will be used, the rest will be redirected to the elastic cluster.
// This is current limitation of the router.

router.Register(routes.ClusterHealthPath, method("GET"), func(_ context.Context, req *mux.Request) (*mux.Result, error) {
return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil
})
Expand Down Expand Up @@ -200,33 +217,38 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})

router.Register(routes.IndexMappingPath, and(method("PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
index := req.Params["index"]
router.Register(routes.IndexMappingPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {

body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
return nil, err
}
switch req.Method {

columns := elasticsearch.ParseMappings("", body)
case "GET":
index := req.Params["index"]

sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns})
foundSchema, found := sr.FindSchema(schema.TableName(index))
if !found {
return &mux.Result{StatusCode: http.StatusNotFound}, nil
}

return putIndexResult(index)
})
hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)

router.Register(routes.IndexMappingPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
index := req.Params["index"]
return getIndexMappingResult(index, mappings)

foundSchema, found := sr.FindSchema(schema.TableName(index))
if !found {
return &mux.Result{StatusCode: http.StatusNotFound}, nil
case "PUT":
index := req.Params["index"]

body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
return nil, err
}

columns := elasticsearch.ParseMappings("", body)
sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns})
return putIndexResult(index)
}

hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)
return nil, errors.New("unsupported method")

return getIndexMappingResult(index, mappings)
})

router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
Expand All @@ -237,21 +259,27 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})

router.Register(routes.AsyncSearchIdPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})
router.Register(routes.AsyncSearchIdPath, and(method("GET", "DELETE"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {

router.Register(routes.AsyncSearchIdPath, and(method("DELETE"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
responseBody, err := queryRunner.deleteAsyncSearch(req.Params["id"])
if err != nil {
return nil, err
switch req.Method {

case "GET":
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil

case "DELETE":
responseBody, err := queryRunner.deleteAsyncSearch(req.Params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil

return nil, errors.New("unsupported method")
})

router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
Expand Down Expand Up @@ -307,42 +335,49 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})

router.Register(routes.IndexPath, and(method("PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
index := req.Params["index"]
if req.Body == "" {
logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index)
return putIndexResult(index)
}
router.Register(routes.IndexPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {

body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
return nil, err
}
switch req.Method {

mappings, ok := body["mappings"]
if !ok {
logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, req.Body)
return putIndexResult(index)
}
columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{}))
case "GET":
index := req.Params["index"]

foundSchema, found := sr.FindSchema(schema.TableName(index))
if !found {
return &mux.Result{StatusCode: http.StatusNotFound}, nil
}

sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns})
hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)

return putIndexResult(index)
})
return getIndexResult(index, mappings)

router.Register(routes.IndexPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
index := req.Params["index"]
case "PUT":

foundSchema, found := sr.FindSchema(schema.TableName(index))
if !found {
return &mux.Result{StatusCode: http.StatusNotFound}, nil
}
index := req.Params["index"]
if req.Body == "" {
logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index)
return putIndexResult(index)
}

hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)
body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
return nil, err
}

mappings, ok := body["mappings"]
if !ok {
logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, req.Body)
return putIndexResult(index)
}
columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{}))

sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns})

return putIndexResult(index)
}

return getIndexResult(index, mappings)
return nil, errors.New("unsupported method")
})

router.Register(routes.QuesmaTableResolverPath, method("GET"), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
Expand Down

0 comments on commit 0b40e2c

Please sign in to comment.