Skip to content

Commit

Permalink
[ES output] Log event fields in events log file (elastic#40512)
Browse files Browse the repository at this point in the history
The Elasticsearch output, when faced with ingestion errors was logging
the raw publisher.Event that had already been encoded, hence no event
fields were present in the logs.

This commit fixes it by adding a String method to the encodedEvent
type and using the encodedEvent in the logs instead of the
publisher.Event.
  • Loading branch information
belimawr authored Aug 14, 2024
1 parent f6b8701 commit e7732c6
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix crashes in the journald input. {pull}40061[40061]
- Fix order of configuration for EntraID entity analytics provider. {pull}40487[40487]
- Ensure Entra ID request bodies are not truncated and trace logs are rotated before 100MB. {pull}40494[40494]
- The Elasticsearch output now correctly logs the event fields to the event log file {issue}40509[40509] {pull}40512[40512]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/integration/event_log_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestEventsLoggerESOutput(t *testing.T) {
}

strData := string(data)
eventMsg := "not a number"
eventMsg := `\"int\":\"not a number\"`
if !strings.Contains(strData, eventMsg) {
t.Errorf("expecting to find '%s' on '%s'", eventMsg, eventsLogFile)
t.Errorf("Contents:\n%s", strData)
Expand Down
6 changes: 3 additions & 3 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,14 +507,14 @@ func (client *Client) applyItemStatus(
// Fatal error while sending an already-failed event to the dead letter
// index, drop.
client.pLogDeadLetter.Add()
client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event '%s' (status=%v): %s", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
stats.nonIndexable++
return false
}
if client.deadLetterIndex == "" {
// Fatal error and no dead letter index, drop.
client.pLogIndex.Add()
client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
client.log.Warnw(fmt.Sprintf("Cannot index event '%s' (status=%v): %s, dropping event!", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
stats.nonIndexable++
return false
}
Expand All @@ -523,7 +523,7 @@ func (client *Client) applyItemStatus(
// ingestion succeeds it is counted in the "deadLetter" counter
// rather than the "acked" counter.
client.pLogIndexTryDeadLetter.Add()
client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
client.log.Warnw(fmt.Sprintf("Cannot index event '%s' (status=%v): %s, trying dead letter index", encodedEvent, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage))
}

Expand Down
7 changes: 7 additions & 0 deletions libbeat/outputs/elasticsearch/event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,10 @@ func (e *encodedEvent) setDeadLetter(
}
e.encoding = []byte(deadLetterReencoding.String())
}

// String converts e.encoding to string and returns it.
// The goal of this method is to provide an easy way to log
// the event encoded.
func (e *encodedEvent) String() string {
return string(e.encoding)
}

0 comments on commit e7732c6

Please sign in to comment.