Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enhance encoder interface #2054

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pkg/pipeline/extensions/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,22 @@ type Encoder interface {
//
// drivers: sls, influxdb, ...
type EncoderV1 interface {
EncodeV1(*protocol.LogGroup) ([][]byte, error)
EncodeBatchV1([]*protocol.LogGroup) ([][]byte, error)
EncodeV1(*protocol.LogGroup, []string) ([][]byte, []map[string]string, error)
EncodeBatchV1([]*protocol.LogGroup, []string) ([][]byte, []map[string]string, error)
}

// EncoderV2 supports v2 pipeline plugin interface,
// encodes data of v2 model into bytes.
//
// drivers: raw, influxdb, prometheus, ...
type EncoderV2 interface {
EncodeV2(*models.PipelineGroupEvents) ([][]byte, error)
EncodeBatchV2([]*models.PipelineGroupEvents) ([][]byte, error)
EncodeV2(*models.PipelineGroupEvents, []string) ([][]byte, []map[string]string, error)
EncodeBatchV2([]*models.PipelineGroupEvents, []string) ([][]byte, []map[string]string, error)
}

// Recycler recyles the buffer
type BufferRecycler interface {
Recycle([]byte)
}

type EncoderExtension interface {
Expand Down
34 changes: 21 additions & 13 deletions pkg/protocol/encoder/prometheus/encoder_prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package prometheus
import (
"context"
"errors"
"fmt"

"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/models"
Expand Down Expand Up @@ -50,22 +51,23 @@ func newPromEncoder(seriesLimit int) *Encoder {
}
}

func (p *Encoder) EncodeV1(logGroups *protocol.LogGroup) ([][]byte, error) {
func (p *Encoder) EncodeV1(logGroups *protocol.LogGroup, targetFields []string) ([][]byte, []map[string]string, error) {
// TODO implement me
return nil, nil
return nil, nil, fmt.Errorf("not implemented")
}

func (p *Encoder) EncodeBatchV1(logGroups []*protocol.LogGroup) ([][]byte, error) {
func (p *Encoder) EncodeBatchV1(logGroups []*protocol.LogGroup, targetFields []string) ([][]byte, []map[string]string, error) {
// TODO implement me
return nil, nil
return nil, nil, fmt.Errorf("not implemented")
}

func (p *Encoder) EncodeV2(groupEvents *models.PipelineGroupEvents) ([][]byte, error) {
func (p *Encoder) EncodeV2(groupEvents *models.PipelineGroupEvents, targetFields []string) ([][]byte, []map[string]string, error) {
if groupEvents == nil || len(groupEvents.Events) == 0 {
return nil, errNilOrZeroGroupEvents
return nil, nil, errNilOrZeroGroupEvents
}

var res [][]byte
var varValues []map[string]string

wr := getWriteRequest(p.SeriesLimit)
defer putWriteRequest(wr)
Expand Down Expand Up @@ -99,28 +101,34 @@ func (p *Encoder) EncodeV2(groupEvents *models.PipelineGroupEvents) ([][]byte, e
wr.Timeseries = wr.Timeseries[:0]
}

return res, nil
return res, varValues, nil
}

func (p *Encoder) EncodeBatchV2(groupEventsSlice []*models.PipelineGroupEvents) ([][]byte, error) {
func (p *Encoder) EncodeBatchV2(groupEventsSlice []*models.PipelineGroupEvents, targetFields []string) ([][]byte, []map[string]string, error) {
if len(groupEventsSlice) == 0 {
return nil, errNilOrZeroGroupEvents
return nil, nil, errNilOrZeroGroupEvents
}

var res [][]byte
var varValues []map[string]string

for _, groupEvents := range groupEventsSlice {
bytes, err := p.EncodeV2(groupEvents)
bytes, values, err := p.EncodeV2(groupEvents, targetFields)
if err != nil {
continue
}

res = append(res, bytes...)
varValues = append(varValues, values...)
}

if res == nil {
return nil, errNilOrZeroGroupEvents
if len(res) == 0 {
return nil, nil, errNilOrZeroGroupEvents
}

return res, nil
return res, varValues, nil
}

// Recycle implements Extension.BufferRecycler
// But Prometheus Encoder doesn't actually recycle the buffer since its Encode implementation cannot reuse the buffer now.
func (p *Encoder) Recycle(buf []byte) {}
35 changes: 21 additions & 14 deletions pkg/protocol/encoder/prometheus/encoder_prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func BenchmarkV2Encode(b *testing.B) {

for i := 0; i < b.N; i++ {
for _, groupEvents := range groupEventsSlice {
p.EncodeV2(groupEvents)
p.EncodeV2(groupEvents, nil)
}
}
})
Expand All @@ -62,7 +62,7 @@ func BenchmarkV2Encode(b *testing.B) {
b.ResetTimer()

for i := 0; i < b.N; i++ {
p.EncodeBatchV2(groupEventsSlice)
p.EncodeBatchV2(groupEventsSlice, nil)
}
})
assert.Equal(b, want, groupEventsSlice)
Expand All @@ -82,12 +82,13 @@ func TestV2Encode_ShouldReturnNoError_GivenNormalDataOfPipelineGroupEvents(t *te

// when
// then
data1, err1 := p.EncodeBatchV2(groupEventsSlice1)
data1, varValues1, err1 := p.EncodeBatchV2(groupEventsSlice1, nil)
assert.NoError(t, err1)
data2, err2 := p.EncodeBatchV2(groupEventsSlice2)
data2, varValues2, err2 := p.EncodeBatchV2(groupEventsSlice2, nil)
assert.NoError(t, err2)

assert.Equal(t, len(data2), len(data1))
assert.Equal(t, len(varValues2), len(varValues1))
}

// 场景:V2 Encode接口功能测试(异常数据,非全nil或0值)
Expand All @@ -108,25 +109,27 @@ func TestV2Encode_ShouldReturnError_GivenAbNormalDataOfPipelineGroupEvents(t *te
// then
t.Run("Test EncodeV2 with abnormal data input", func(t *testing.T) {
for i, groupEvents := range groupEventsSlice1 {
data1, err1 := p.EncodeV2(groupEvents)
data2, err2 := p.EncodeV2(groupEventsSlice2[i])
data1, varValues1, err1 := p.EncodeV2(groupEvents, nil)
data2, varValues2, err2 := p.EncodeV2(groupEventsSlice2[i], nil)
if err1 != nil {
assert.Error(t, err2)
assert.Equal(t, err1, err2)
} else {
assert.NoError(t, err2)
assert.Equal(t, len(data2), len(data1))
assert.Equal(t, len(varValues1), len(varValues2))
}
}
})

t.Run("Test EncodeBatchV2 with abnormal data input", func(t *testing.T) {
data1, err1 := p.EncodeBatchV2(groupEventsSlice1)
data1, varValues1, err1 := p.EncodeBatchV2(groupEventsSlice1, nil)
assert.NoError(t, err1)
data2, err2 := p.EncodeBatchV2(groupEventsSlice2)
data2, varValues2, err2 := p.EncodeBatchV2(groupEventsSlice2, nil)
assert.NoError(t, err2)

assert.Equal(t, len(data2), len(data1))
assert.Equal(t, len(varValues1), len(varValues2))
})
}

Expand Down Expand Up @@ -154,17 +157,19 @@ func TestV2Encode_ShouldReturnError_GivenNilOrZeroDataOfPipelineGroupEvents(t *t
// then
t.Run("Test EncodeV2 with nil or zero data input", func(t *testing.T) {
for _, input := range nilOrZeroGroupEventsSlices {
data, err := p.EncodeV2(input)
data, varValues, err := p.EncodeV2(input, nil)
assert.Error(t, err)
assert.Nil(t, data)
assert.Nil(t, varValues)
}
})

t.Run("Test EncodeBatchV2 with nil or zero data input", func(t *testing.T) {
for _, input := range nilOrZeroGroupEventsSlicesEx {
data, err := p.EncodeBatchV2(input)
data, varValues, err := p.EncodeBatchV2(input, nil)
assert.Error(t, err)
assert.Nil(t, data)
assert.Nil(t, varValues)
}
})
}
Expand All @@ -186,12 +191,13 @@ func TestEncoderBatchV2_ShouldReturnNoErrorAndEqualData_GivenNormalDataOfDataOfP

// when
// then
data1, err1 := p.EncodeBatchV2(groupEventsSlice1)
data1, varValues1, err1 := p.EncodeBatchV2(groupEventsSlice1, nil)
assert.NoError(t, err1)
data2, err2 := p.EncodeBatchV2(groupEventsSlice2)
data2, varValues2, err2 := p.EncodeBatchV2(groupEventsSlice2, nil)
assert.NoError(t, err2)

assert.Equal(t, data2, data1)
assert.Equal(t, varValues2, varValues1)
}

// 场景:V2 Encode接口功能测试
Expand All @@ -211,14 +217,15 @@ func TestEncoderBatchV2_ShouldDecodeSuccess_GivenNormalDataOfDataOfPipelineGroup
n := seriesLimit
wantGroupEventsSlice := genPipelineGroupEventsSliceSingleTag(n)
p := NewPromEncoder(seriesLimit)
data, err := p.EncodeBatchV2(wantGroupEventsSlice)
data, varValues, err := p.EncodeBatchV2(wantGroupEventsSlice, nil)
assert.NoError(t, err)

// when
// then
gotGroupEventsSlice, err := DecodeBatchV2(data)
assert.NoError(t, err)
assert.Equal(t, wantGroupEventsSlice, gotGroupEventsSlice)
assert.LessOrEqual(t, len(varValues), len(data))
}

// 场景:V2 Encode接口功能测试
Expand All @@ -241,7 +248,7 @@ func TestEncoderBatchV2_ShouldDecodeSuccess_GivenNormalDataOfDataOfPipelineGroup
wantGroupEventsSlice := genPipelineGroupEventsSliceSingleTag(n)
assert.Equal(t, n, len(wantGroupEventsSlice))
p := NewPromEncoder(seriesLimit)
data, err := p.EncodeBatchV2(wantGroupEventsSlice)
data, _, err := p.EncodeBatchV2(wantGroupEventsSlice, nil)
assert.NoError(t, err)
expectedLen := func(limit, length int) int {
// make sure limit > 0 && length > 0
Expand Down
34 changes: 29 additions & 5 deletions plugins/flusher/http/flusher_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type FlusherHTTP struct {
QueueCapacity int // capacity of channel
DropEventWhenQueueFull bool // If true, pipeline events will be dropped when the queue is full
Compression string // Compression type, support gzip and snappy at this moment.
IgnoreRespBody bool // If true, senders directly close the resp without reading the body.

varKeys []string

Expand Down Expand Up @@ -386,14 +387,17 @@ func (f *FlusherHTTP) runFlushTask() {

func (f *FlusherHTTP) encodeAndFlush(event any) error {
defer f.countDownTask()
if event == nil {
return nil
}

var data [][]byte
var varValues []map[string]string
var err error

switch v := event.(type) {
case *models.PipelineGroupEvents:
data, err = f.encoder.EncodeV2(v)

data, varValues, err = f.encoder.EncodeV2(v, f.varKeys)
default:
return errors.New("unsupported event type")
}
Expand All @@ -402,8 +406,12 @@ func (f *FlusherHTTP) encodeAndFlush(event any) error {
return fmt.Errorf("http flusher encode event data fail, error: %w", err)
}

for _, shard := range data {
if err = f.flushWithRetry(shard, nil); err != nil {
for i, shard := range data {
var values map[string]string
if i < len(varValues) {
values = varValues[i]
}
if err = f.flushWithRetry(shard, values); err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM",
"http flusher failed flush data after retry, data dropped, error", err,
"remote url", f.RemoteURL)
Expand Down Expand Up @@ -471,6 +479,14 @@ func (f *FlusherHTTP) flushWithRetry(data []byte, varValues map[string]string) e
err = e
<-time.After(f.getNextRetryDelay(i))
}

if f.encoder != nil {
if recycler, ok := f.encoder.(extensions.BufferRecycler); ok {
recycler.Recycle(data)
return err
}
}

converter.PutPooledByteBuf(&data)
return err
}
Expand Down Expand Up @@ -572,7 +588,15 @@ func (f *FlusherHTTP) flush(data []byte, varValues map[string]string) (ok, retry
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALRAM", "http flusher send request fail, error", err)
return false, retry, err
}
body, err := io.ReadAll(response.Body)
var body []byte
if !f.IgnoreRespBody {
body, err = io.ReadAll(response.Body)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALRAM", "http flusher read response fail, error", err)
return false, false, err
}
}

if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALRAM", "http flusher read response fail, error", err)
return false, false, err
Expand Down
Loading