Skip to content

Commit

Permalink
[filebeat][websocket] - Added runtime URL modification support based …
Browse files Browse the repository at this point in the history
…on state and cursor values (#39997)

* Till now it was not possible to modify the Websocket URL dynamically via the CEL program as the Websocket logic exists outside the scope of the CEL engine instance. To make this action possible a new instance of the CEL engine was added as a one time execution process to prime the URL before the main CEL program executes. This new CEL instance shares the same state hence its possible to modify and use cursor and state values and in turn modify the URL based off of these values.
  • Loading branch information
ShourieG authored Jun 25, 2024
1 parent e3a8223 commit 45825bb
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Introduce log message for not supported annotations for Hints based autodiscover {pull}38213[38213]
- Add persistent volume claim name to volume if available {pull}38839[38839]
- Raw events are now logged to a different file, this prevents potentially sensitive information from leaking into log files {pull}38767[38767]
- Websocket input: Added runtime URL modification support based on state and cursor values {issue}39858[39858] {pull}39997[39997]

*Auditbeat*

Expand Down
24 changes: 24 additions & 0 deletions x-pack/filebeat/docs/inputs/input-websocket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ program: |
})
----

[[input-url-program-websocket]]
[float]
==== `url_program`

If present, this CEL program is executed before the websocket connection is established using the `state` object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the websocket connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.

["source","yaml",subs="attributes"]
----
url: ws://testapi:443/v1/streamresults
state:
initial_start_time: "2022-01-01T00:00:00Z"
url_program: |
state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
},
"cursor": {
"since": inner_body.timestamp
}
})
----

[[state-websocket]]
[float]
==== `state`
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/websocket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
)

type config struct {
// URLProgram is the CEL program to be run once before to prep the url.
URLProgram string `config:"url_program"`
// Program is the CEL program to be run for each polling.
Program string `config:"program"`
// Regexps is the set of regular expression to be made
Expand Down
75 changes: 74 additions & 1 deletion x-pack/filebeat/input/websocket/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"net/url"
"reflect"
"time"

Expand Down Expand Up @@ -97,9 +98,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
state["cursor"] = cursor
}

// initialize the input url with the help of the url_program.
url, err := i.getURL(ctx, state, log)
if err != nil {
metrics.errorsTotal.Inc()
return err
}

// websocket client
headers := formHeader(cfg)
url := cfg.URL.String()
c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers)
if resp != nil && resp.Body != nil {
log.Debugw("websocket connection response", "body", resp.Body)
Expand Down Expand Up @@ -150,6 +157,72 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
}
}

// getURL initializes the input URL with the help of the url_program.
func (i input) getURL(ctx context.Context, state map[string]interface{}, log *logp.Logger) (string, error) {
var (
url string
err error
)
cfg := i.cfg
if cfg.URLProgram != "" {
state["url"] = cfg.URL.String()
// CEL program which is used to prime/initialize the input url
url_prg, ast, err := newProgram(ctx, cfg.URLProgram, root, nil, log)
if err != nil {
return url, err
}

log.Debugw("cel engine state before url_eval", logp.Namespace("websocket"), "state", redactor{state: state, cfg: cfg.Redact})
start := i.now().In(time.UTC)
url, err = evalURLWith(ctx, url_prg, ast, state, start)
log.Debugw("url_eval result", logp.Namespace("websocket"), "modified_url", url)
if err != nil {
log.Errorw("failed url evaluation", "error", err)
return url, err
}
} else {
url = cfg.URL.String()
}
return url, err
}

func evalURLWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (string, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{
// Replace global program "now" with current time. This is necessary
// as the lib.Time now global is static at program instantiation time
// which will persist over multiple evaluations. The lib.Time behaviour
// is correct for mito where CEL program instances live for only a
// single evaluation. Rather than incurring the cost of creating a new
// cel.Program for each evaluation, shadow lib.Time's now with a new
// value for each eval. We retain the lib.Time now global for
// compatibility between CEL programs developed in mito with programs
// run in the input.
"now": now,
root: state,
})
if err != nil {
err = lib.DecoratedError{AST: ast, Err: err}
}
if e := ctx.Err(); e != nil {
err = e
}
if err != nil {
return "", fmt.Errorf("failed eval: %w", err)
}
v, err := out.ConvertToNative(reflect.TypeOf(""))
if err != nil {
return "", fmt.Errorf("failed type conversion: %w", err)
}
switch v := v.(type) {
case string:
_, err = url.Parse(v)
return v, err
default:
// This should never happen.
return "", fmt.Errorf("unexpected native conversion type: %T", v)
}
}

// processAndPublishData processes the data in state, updates the cursor and publishes it to the publisher.
// the CEL program here only executes a single time, since the websocket connection is persistent and events are received and processed in real time.
func (i *input) processAndPublishData(ctx context.Context, metrics *inputMetrics, prg cel.Program, ast *cel.Ast,
Expand Down
99 changes: 98 additions & 1 deletion x-pack/filebeat/input/websocket/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package websocket

import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

//nolint:gosec // These are test tokens and are not used in production code.
const (
basicToken = "dXNlcjpwYXNz"
bearerToken = "BXNlcjpwYVVz"
Expand Down Expand Up @@ -399,6 +401,101 @@ var inputTests = []struct {
},
}

var urlEvalTests = []struct {
name string
config map[string]interface{}
time func() time.Time
want string
}{
{
name: "cursor based url modification",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"url_program": `has(state.cursor) && has(state.cursor.since) ? state.url+"?since="+ state.cursor.since : state.url`,
"state": map[string]interface{}{
"cursor": map[string]interface{}{
"since": "2017-08-17T14:54:12",
},
},
},
want: "ws://testapi/getresults?since=2017-08-17T14:54:12",
},
{
name: "cursor based url modification using simplified query",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"url_program": `state.url + "?since=" + state.?cursor.since.orValue(state.url)`,
"state": map[string]interface{}{
"cursor": map[string]interface{}{
"since": "2017-08-17T14:54:12",
},
},
},
want: "ws://testapi/getresults?since=2017-08-17T14:54:12",
},
{
name: "url modification with no cursor",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"url_program": `has(state.cursor) && has(state.cursor.since) ? state.url+"?since="+ state.cursor.since: state.url+"?since="+ state.initial_start_time`,
"state": map[string]interface{}{
"initial_start_time": "2022-01-01T00:00:00Z",
},
},
want: "ws://testapi/getresults?since=2022-01-01T00:00:00Z",
},
{
name: "url modification with no cursor, using simplified query",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"url_program": `state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)`,
"state": map[string]interface{}{
"initial_start_time": "2022-01-01T00:00:00Z",
},
},
want: "ws://testapi/getresults?since=2022-01-01T00:00:00Z",
},
}

func TestURLEval(t *testing.T) {
logp.TestingSetup()
for _, test := range urlEvalTests {
t.Run(test.name, func(t *testing.T) {

cfg := conf.MustNewConfigFrom(test.config)

conf := config{}
conf.Redact = &redact{}
err := cfg.Unpack(&conf)
if err != nil {
t.Fatalf("unexpected error unpacking config: %v", err)
}

name := input{}.Name()
if name != "websocket" {
t.Errorf(`unexpected input name: got:%q want:"websocket"`, name)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var state map[string]interface{}
if conf.State == nil {
state = make(map[string]interface{})
} else {
state = conf.State
}

response, err := input{test.time, conf}.getURL(ctx, state, logp.NewLogger("websocket_url_eval_test"))
if err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("unexpected error from running input: got:%v want:%v", err, nil)
}

assert.Equal(t, test.want, response)
})
}
}

func TestInput(t *testing.T) {
// tests will ignore context cancelled errors, since they are expected
ctxCancelledError := fmt.Errorf("context canceled")
Expand Down Expand Up @@ -432,7 +529,7 @@ func TestInput(t *testing.T) {
t.Fatalf("unexpected error running test: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

v2Ctx := v2.Context{
Expand Down

0 comments on commit 45825bb

Please sign in to comment.