From 7db0833d735851da11277677e0a955d95055bf83 Mon Sep 17 00:00:00 2001 From: Eric L <100242256+splunkericl@users.noreply.github.com> Date: Mon, 25 Sep 2023 10:09:01 -0700 Subject: [PATCH] Extract time query parameter in splunkhecreceiver (#27008) This change adds a new feature in splunk hec receiver allowing users to specify time query parameter. This is to put parity between splunk hec receiver and splunk HEC raw endpoint Note: the validation response is slightly different than what splunk returns. if invalid input is provided for time: - splunk returns 400 response with this error message: ``` { "text": "Error in handling indexed fields", "code": 15, "invalid-event-number": 0 } ``` - however this doesn't make sense for splunk hec receiver as it is not indexing anything. Instead, splunk hec receiver will return: ``` {"text":"Invalid data format","code":6} ``` Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27006 --------- Co-authored-by: Antoine Toulme --- .../splunkhecreceiver-add-time-param.yaml | 27 +++++++++++++ receiver/splunkhecreceiver/receiver.go | 17 ++++++++- receiver/splunkhecreceiver/receiver_test.go | 38 +++++++++++++++++++ .../splunkhecreceiver/splunk_to_logdata.go | 6 ++- .../splunk_to_logdata_test.go | 13 ++++++- 5 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 .chloggen/splunkhecreceiver-add-time-param.yaml diff --git a/.chloggen/splunkhecreceiver-add-time-param.yaml b/.chloggen/splunkhecreceiver-add-time-param.yaml new file mode 100644 index 000000000000..17de07992453 --- /dev/null +++ b/.chloggen/splunkhecreceiver-add-time-param.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Update splunk hec receiver to extract time query parameter if it is provided + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27006] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/splunkhecreceiver/receiver.go b/receiver/splunkhecreceiver/receiver.go index 4e44ad1f0115..c363d92777fb 100644 --- a/receiver/splunkhecreceiver/receiver.go +++ b/receiver/splunkhecreceiver/receiver.go @@ -11,6 +11,7 @@ import ( "io" "net" "net/http" + "strconv" "strings" "sync" "time" @@ -265,7 +266,21 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques } resourceCustomizer := r.createResourceCustomizer(req) - ld, slLen, err := splunkHecRawToLogData(bodyReader, req.URL.Query(), resourceCustomizer, r.config) + query := req.URL.Query() + var timestamp pcommon.Timestamp + if query.Has(queryTime) { + t, err := strconv.ParseInt(query.Get(queryTime), 10, 64) + if t < 0 { + err = errors.New("time cannot be less than 0") + } + if err != nil { + r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, 0, err) + return + } + timestamp = pcommon.NewTimestampFromTime(time.Unix(t, 0)) + } + + ld, slLen, err := splunkHecRawToLogData(bodyReader, query, resourceCustomizer, r.config, timestamp) if err != nil { r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, err) return diff --git a/receiver/splunkhecreceiver/receiver_test.go b/receiver/splunkhecreceiver/receiver_test.go index b716ab4ceb7b..0aa2fefad7de 100644 --- a/receiver/splunkhecreceiver/receiver_test.go +++ b/receiver/splunkhecreceiver/receiver_test.go @@ -996,6 +996,44 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) { assert.Equal(t, responseErrGzipReader, body) }, }, + { + name: "raw_endpoint_bad_time_negative_number", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + + req := httptest.NewRequest("POST", "http://localhost/service/collector/raw", bytes.NewReader(msgBytes)) + + q := req.URL.Query() + q.Add(queryTime, "-5") + req.URL.RawQuery = q.Encode() + + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, responseInvalidDataFormat, body) + }, + }, + { + name: "raw_endpoint_bad_time_not_a_number", + req: func() *http.Request { + msgBytes, err := json.Marshal(splunkMsg) + require.NoError(t, err) + + req := httptest.NewRequest("POST", "http://localhost/service/collector/raw", bytes.NewReader(msgBytes)) + + q := req.URL.Query() + q.Add(queryTime, "notANumber") + req.URL.RawQuery = q.Encode() + + return req + }(), + assertResponse: func(t *testing.T, status int, body string) { + assert.Equal(t, http.StatusBadRequest, status) + assert.Equal(t, responseInvalidDataFormat, body) + }, + }, } for _, tt := range tests { diff --git a/receiver/splunkhecreceiver/splunk_to_logdata.go b/receiver/splunkhecreceiver/splunk_to_logdata.go index b78fb910e6cc..1ebeef6652a8 100644 --- a/receiver/splunkhecreceiver/splunk_to_logdata.go +++ b/receiver/splunkhecreceiver/splunk_to_logdata.go @@ -23,6 +23,7 @@ const ( source = "source" sourcetype = "sourcetype" host = "host" + queryTime = "time" ) var ( @@ -76,9 +77,10 @@ func splunkHecToLogData(logger *zap.Logger, events []*splunk.Event, resourceCust } // splunkHecRawToLogData transforms raw splunk event into log -func splunkHecRawToLogData(bodyReader io.Reader, query url.Values, resourceCustomizer func(pcommon.Resource), config *Config) (plog.Logs, int, error) { +func splunkHecRawToLogData(bodyReader io.Reader, query url.Values, resourceCustomizer func(pcommon.Resource), config *Config, timestamp pcommon.Timestamp) (plog.Logs, int, error) { ld := plog.NewLogs() rl := ld.ResourceLogs().AppendEmpty() + appendSplunkMetadata(rl, config.HecToOtelAttrs, query.Get(host), query.Get(source), query.Get(sourcetype), query.Get(index)) if resourceCustomizer != nil { resourceCustomizer(rl.Resource()) @@ -91,12 +93,14 @@ func splunkHecRawToLogData(bodyReader io.Reader, query url.Values, resourceCusto } logRecord := sl.LogRecords().AppendEmpty() logRecord.Body().SetStr(string(b)) + logRecord.SetTimestamp(timestamp) } else { sc := bufio.NewScanner(bodyReader) for sc.Scan() { logRecord := sl.LogRecords().AppendEmpty() logLine := sc.Text() logRecord.Body().SetStr(logLine) + logRecord.SetTimestamp(timestamp) } } diff --git a/receiver/splunkhecreceiver/splunk_to_logdata_test.go b/receiver/splunkhecreceiver/splunk_to_logdata_test.go index 8e81cc7d176a..836963e37178 100644 --- a/receiver/splunkhecreceiver/splunk_to_logdata_test.go +++ b/receiver/splunkhecreceiver/splunk_to_logdata_test.go @@ -7,6 +7,7 @@ import ( "bytes" "io" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -341,6 +342,9 @@ func Test_SplunkHecToLogData(t *testing.T) { } func Test_SplunkHecRawToLogData(t *testing.T) { + const ( + testTimestampVal = 1695146885 + ) hecConfig := &Config{ HecToOtelAttrs: splunk.HecToOtelAttrs{ Source: "mysource", @@ -355,6 +359,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) { query map[string][]string assertResource func(t *testing.T, got plog.Logs, slLen int) config *Config + time pcommon.Timestamp }{ { name: "all_mapping", @@ -369,6 +374,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) { m[sourcetype] = k m[source] = k m[index] = k + m[queryTime] = []string{"1695146885"} return m }(), assertResource: func(t *testing.T, got plog.Logs, slLen int) { @@ -395,8 +401,10 @@ func Test_SplunkHecRawToLogData(t *testing.T) { } else { assert.Fail(t, "index is not added to attributes") } + assert.Equal(t, time.Unix(testTimestampVal, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix()) }, config: hecConfig, + time: pcommon.NewTimestampFromTime(time.Unix(testTimestampVal, 0)), }, { name: "some_mapping", @@ -425,6 +433,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) { } else { assert.Fail(t, "sourcetype is not added to attributes") } + assert.Equal(t, time.Unix(0, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix()) }, config: hecConfig, }, @@ -439,12 +448,14 @@ func Test_SplunkHecRawToLogData(t *testing.T) { }(), assertResource: func(t *testing.T, got plog.Logs, slLen int) { assert.Equal(t, 1, got.LogRecordCount()) + assert.Equal(t, time.Unix(testTimestampVal, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix()) }, config: func() *Config { return &Config{ Splitting: SplittingStrategyNone, } }(), + time: pcommon.NewTimestampFromTime(time.Unix(testTimestampVal, 0)), }, { name: "line splitting", @@ -466,7 +477,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result, slLen, err := splunkHecRawToLogData(tt.sc, tt.query, func(resource pcommon.Resource) {}, tt.config) + result, slLen, err := splunkHecRawToLogData(tt.sc, tt.query, func(resource pcommon.Resource) {}, tt.config, tt.time) require.NoError(t, err) tt.assertResource(t, result, slLen) })