Skip to content

Commit

Permalink
Fix: add render object strict (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethfoo committed Nov 7, 2022
1 parent 46bc60e commit 3bded25
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 21 deletions.
24 changes: 9 additions & 15 deletions pkg/core/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package event

import (
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"strings"
"sync"

Expand All @@ -35,6 +37,8 @@ const (
Body = "body"
)

var ErrorDropEvent = errors.New("drop event")

type DefaultMeta struct {
Properties map[string]interface{} `json:"properties"`
}
Expand Down Expand Up @@ -123,22 +127,12 @@ func (de *DefaultEvent) Release() {

func (de *DefaultEvent) String() string {
var sb strings.Builder
sb.WriteString("meta:")
if de.M != nil {
sb.WriteString(de.M.String())
}
sb.WriteString(";")
sb.WriteString("header:")
sb.WriteString("{")
for k, v := range de.Header() {
sb.WriteString(k)
sb.WriteString(" : ")
sb.WriteString(fmt.Sprintf("%+v", v))
sb.WriteString(", ")
}
sb.WriteString("}; body:{")
sb.WriteString(`header:`)
header, _ := jsoniter.Marshal(de.Header())
sb.Write(header)
sb.WriteString(`, body:"`)
sb.WriteString(string(de.Body()))
sb.WriteString("}")
sb.WriteString(`"`)
return sb.String()
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/sink/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
eventer "github.com/loggie-io/loggie/pkg/core/event"
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/util/pattern"
"strings"

Expand Down Expand Up @@ -88,9 +90,10 @@ func (c *ClientSet) BulkIndex(ctx context.Context, batch api.Batch) error {
headerObj := runtime.NewObject(event.Header())

// select index
idx, err := c.indexPattern.WithObject(headerObj).Render()
idx, err := c.indexPattern.WithObject(headerObj).RenderWithStrict()
if err != nil {
return errors.WithMessagef(err, "select index pattern error")
log.Error("render index pattern err: %v; event is: %s", err, event.String())
continue
}

data, err := c.codec.Encode(event)
Expand All @@ -115,6 +118,11 @@ func (c *ClientSet) BulkIndex(ctx context.Context, batch api.Batch) error {

req.Add(bulkIndexRequest)
}

if req.NumberOfActions() == 0 {
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
}

ret, err := req.Do(ctx)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package elasticsearch
import (
"context"
"fmt"
eventer "github.com/loggie-io/loggie/pkg/core/event"
"github.com/pkg/errors"

"github.com/loggie-io/loggie/pkg/util/pattern"
Expand Down Expand Up @@ -103,6 +104,9 @@ func (s *Sink) Consume(batch api.Batch) api.Result {

err := s.cli.BulkIndex(context.TODO(), batch)
if err != nil {
if errors.Is(err, eventer.ErrorDropEvent) {
return result.DropWith(err)
}
return result.Fail(errors.WithMessage(err, "send events to elasticsearch"))
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/elasticsearch/pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
sink:
type: elasticsearch
hosts: ["localhost:9200"]
index: "log-${fields.topic}-${+YYYY.MM.DD}"
4 changes: 4 additions & 0 deletions pkg/sink/kafka/pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
sink:
type: kafka
brokers: ["127.0.0.1:6400"]
topic: "log-${fields.topic}"
11 changes: 7 additions & 4 deletions pkg/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka
import (
"context"
"fmt"

"github.com/loggie-io/loggie/pkg/util/pattern"
"github.com/loggie-io/loggie/pkg/util/runtime"
"github.com/pkg/errors"
Expand Down Expand Up @@ -127,8 +126,8 @@ func (s *Sink) Consume(batch api.Batch) api.Result {
for _, e := range events {
topic, err := s.selectTopic(e)
if err != nil {
log.Error("select kafka topic error: %+v", err)
return result.Fail(err)
log.Error("select kafka topic error: %v; event is: %s", err, e.String())
continue
}

msg, err := s.cod.Encode(e)
Expand All @@ -143,6 +142,10 @@ func (s *Sink) Consume(batch api.Batch) api.Result {
})
}

if len(km) == 0 {
return result.DropWith(errors.New("send to kafka message batch is null"))
}

if s.writer != nil {
err := s.writer.WriteMessages(context.Background(), km...)
if err != nil {
Expand All @@ -156,5 +159,5 @@ func (s *Sink) Consume(batch api.Batch) api.Result {
}

func (s *Sink) selectTopic(e api.Event) (string, error) {
return s.topicPattern.WithObject(runtime.NewObject(e.Header())).Render()
return s.topicPattern.WithObject(runtime.NewObject(e.Header())).RenderWithStrict()
}
18 changes: 18 additions & 0 deletions pkg/util/pattern/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pattern
import (
"github.com/loggie-io/loggie/pkg/util/runtime"
"github.com/loggie-io/loggie/pkg/util/time"
"github.com/pkg/errors"
"os"
"regexp"
"strings"
Expand All @@ -39,6 +40,8 @@ const (
kindObject = "object"
)

var ErrEmptyMatcher = errors.New("render matcher is empty")

type Pattern struct {
Raw string
isConstVal bool
Expand Down Expand Up @@ -141,7 +144,18 @@ func makeMatch(m []string) matcher {
return item
}

// RenderWithStrict any placeholder rendering empty will return an error
func (p *Pattern) RenderWithStrict() (string, error) {
return p.render(true)
}

func (p *Pattern) Render() (string, error) {
return p.render(false)
}

// Render to actual results based on placeholders
// If `strict` is set to true, any placeholder rendering empty will return an error.
func (p *Pattern) render(strict bool) (string, error) {
if p.isConstVal || len(p.matcher) == 0 {
return p.Raw, nil
}
Expand All @@ -164,6 +178,10 @@ func (p *Pattern) Render() (string, error) {
alt = p.K8sMatcherRender(m.key)
}

if alt == "" && strict {
return "", errors.WithMessagef(ErrEmptyMatcher, "with %s", m.keyWrap)
}

// add old
oldNew = append(oldNew, m.keyWrap)
// add new
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/pattern/pattern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,43 @@ func TestObjectPattern(t *testing.T) {
}
}

func TestObjectPatternWithStrict(t *testing.T) {
type args struct {
pattern string
obj *runtime.Object
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "got object fields",
args: args{
pattern: "${a.b}-${a.none}",
obj: runtime.NewObject(map[string]interface{}{
"a": map[string]interface{}{
"b": "c",
}}),
},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, err := Init(tt.args.pattern)
if err != nil {
t.Errorf("init pattern error: %v", err)
}

_, err = p.WithObject(tt.args.obj).RenderWithStrict()
assert.ErrorIs(t, err, ErrEmptyMatcher)

})
}
}

func TestK8sPattern(t *testing.T) {

testpod := &corev1.Pod{}
Expand Down

0 comments on commit 3bded25

Please sign in to comment.