Skip to content

Commit

Permalink
logs: skip temporary buffer between serialization and compression
Browse files Browse the repository at this point in the history
  • Loading branch information
paulcacheux committed Feb 8, 2025
1 parent ba9f4f1 commit f061006
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 26 deletions.
38 changes: 32 additions & 6 deletions pkg/logs/sender/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package sender

import (
"bytes"
"io"
"sync"
"time"

Expand Down Expand Up @@ -166,29 +168,53 @@ func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) {
}

func (s *batchStrategy) sendMessages(messages []*message.Message, outputChan chan *message.Payload) {
serializedMessage := s.serializer.Serialize(messages)
log.Debugf("Send messages for pipeline %s (msg_count:%d, content_size=%d, avg_msg_size=%.2f)", s.pipelineName, len(messages), len(serializedMessage), float64(len(serializedMessage))/float64(len(messages)))
var encodedPayload bytes.Buffer
compressor := s.compression.NewStreamCompressor(&encodedPayload)
if compressor == nil {
compressor = &compression.NoopStreamCompressor{Writer: &encodedPayload}
}

wc := &writerCounter{Writer: compressor}

if err := s.serializer.Serialize(messages, wc); err != nil {
log.Warn("Encoding failed - dropping payload", err)
s.utilization.Stop()
return
}

encodedPayload, err := s.compression.Compress(serializedMessage)
if err != nil {
if err := compressor.Close(); err != nil {
log.Warn("Encoding failed - dropping payload", err)
s.utilization.Stop()
return
}

unencodedSize := wc.counter
log.Debugf("Send messages for pipeline %s (msg_count:%d, content_size=%d, avg_msg_size=%.2f)", s.pipelineName, len(messages), unencodedSize, float64(unencodedSize)/float64(len(messages)))

if s.serverless {
// Increment the wait group so the flush doesn't finish until all payloads are sent to all destinations
s.flushWg.Add(1)
}

p := &message.Payload{
Messages: messages,
Encoded: encodedPayload,
Encoded: encodedPayload.Bytes(),
Encoding: s.compression.ContentEncoding(),
UnencodedSize: len(serializedMessage),
UnencodedSize: unencodedSize,
}
s.utilization.Stop()
outputChan <- p
s.pipelineMonitor.ReportComponentEgress(p, "strategy")
s.pipelineMonitor.ReportComponentIngress(p, "sender")
}

type writerCounter struct {
io.Writer
counter int
}

func (wc *writerCounter) Write(b []byte) (int, error) {
n, err := wc.Writer.Write(b)
wc.counter += n
return n, err
}
38 changes: 24 additions & 14 deletions pkg/logs/sender/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package sender

import (
"bytes"
"io"

"github.com/DataDog/datadog-agent/pkg/logs/message"
)
Expand All @@ -23,7 +23,7 @@ var (
// raw []byte data from unstructured messages or turning structured
// messages into []byte data).
type Serializer interface {
Serialize(messages []*message.Message) []byte
Serialize(messages []*message.Message, writer io.Writer) error
}

// lineSerializer transforms a message array into a payload
Expand All @@ -35,15 +35,18 @@ type lineSerializer struct{}
// for example:
// "{"message":"content1"}", "{"message":"content2"}"
// returns, "{"message":"content1"}\n{"message":"content2"}"
func (s *lineSerializer) Serialize(messages []*message.Message) []byte {
var buffer bytes.Buffer
func (s *lineSerializer) Serialize(messages []*message.Message, writer io.Writer) error {
for i, message := range messages {
if i > 0 {
buffer.WriteByte('\n')
if _, err := writer.Write([]byte{'\n'}); err != nil {
return err
}
}
if _, err := writer.Write(message.GetContent()); err != nil {
return err
}
buffer.Write(message.GetContent())
}
return buffer.Bytes()
return nil
}

// arraySerializer transforms a message array into a array string payload.
Expand All @@ -53,15 +56,22 @@ type arraySerializer struct{}
// for example:
// "{"message":"content1"}", "{"message":"content2"}"
// returns, "[{"message":"content1"},{"message":"content2"}]"
func (s *arraySerializer) Serialize(messages []*message.Message) []byte {
var buffer bytes.Buffer
buffer.WriteByte('[')
func (s *arraySerializer) Serialize(messages []*message.Message, writer io.Writer) error {
if _, err := writer.Write([]byte{'['}); err != nil {
return err
}

for i, message := range messages {
if i > 0 {
buffer.WriteByte(',')
if _, err := writer.Write([]byte{','}); err != nil {
return err
}
}
if _, err := writer.Write(message.GetContent()); err != nil {
return err
}
buffer.Write(message.GetContent())
}
buffer.WriteByte(']')
return buffer.Bytes()

_, err := writer.Write([]byte{']'})
return err
}
22 changes: 16 additions & 6 deletions pkg/logs/sender/serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package sender

import (
"bytes"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -19,15 +20,15 @@ func TestLineSerializer(t *testing.T) {

serializer := LineSerializer

payload = serializer.Serialize(messages)
payload = serializeToBytes(t, serializer, messages)
assert.Len(t, payload, 0)

messages = []*message.Message{message.NewMessage([]byte("a"), nil, "", 0)}
payload = serializer.Serialize(messages)
payload = serializeToBytes(t, serializer, messages)
assert.Equal(t, []byte("a"), payload)

messages = []*message.Message{message.NewMessage([]byte("a"), nil, "", 0), message.NewMessage([]byte("b"), nil, "", 0)}
payload = serializer.Serialize(messages)
payload = serializeToBytes(t, serializer, messages)
assert.Equal(t, []byte("a\nb"), payload)
}

Expand All @@ -37,14 +38,23 @@ func TestArraySerializer(t *testing.T) {

serializer := ArraySerializer

payload = serializer.Serialize(messages)
payload = serializeToBytes(t, serializer, messages)
assert.Equal(t, []byte("[]"), payload)

messages = []*message.Message{message.NewMessage([]byte("a"), nil, "", 0)}
payload = serializer.Serialize(messages)
payload = serializeToBytes(t, serializer, messages)
assert.Equal(t, []byte("[a]"), payload)

messages = []*message.Message{message.NewMessage([]byte("a"), nil, "", 0), message.NewMessage([]byte("b"), nil, "", 0)}
payload = serializer.Serialize(messages)
payload = serializeToBytes(t, serializer, messages)
assert.Equal(t, []byte("[a,b]"), payload)
}

func serializeToBytes(t *testing.T, s Serializer, messages []*message.Message) []byte {
t.Helper()

var payload bytes.Buffer
err := s.Serialize(messages, &payload)
assert.NoError(t, err)
return payload.Bytes()
}
18 changes: 18 additions & 0 deletions pkg/util/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,21 @@ type StreamCompressor interface {

// ZstdCompressionLevel is a wrapper type over int for the compression level for zstd compression, if that is selected.
type ZstdCompressionLevel int

// NoopStreamCompressor is a no-op implementation of StreamCompressor
type NoopStreamCompressor struct {
io.Writer
}

// Close closes the underlying writer
func (n *NoopStreamCompressor) Close() error {
if c, ok := n.Writer.(io.Closer); ok {
return c.Close()
}
return nil
}

// Flush is a no-op
func (n *NoopStreamCompressor) Flush() error {
return nil
}

0 comments on commit f061006

Please sign in to comment.