Skip to content

Commit

Permalink
perf(transform): Improve AggregateToArray Throughput (#150)
Browse files Browse the repository at this point in the history
* perf(transform): Increase aggregateToArray Throughput

* build(config): Update JSON Lines Pattern
  • Loading branch information
jshlbrd authored Mar 21, 2024
1 parent 5ca6807 commit d730cc6
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 27 deletions.
3 changes: 1 addition & 2 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -1148,8 +1148,7 @@
condition=$.cnd.meta.negate({ inspector: $.cnd.fmt.json() }),
transform=$.tf.util.drop(),
),
$.tf.agg.to.arr(),
$.tf.arr.join({ separator: '\n' }),
$.tf.agg.to.string({ separator: '\n' }),
$.tf.str.append({ suffix: '\n' }),
],
},
Expand Down
15 changes: 3 additions & 12 deletions transform/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package transform
import (
"bytes"
"fmt"
"slices"

iconfig "github.com/brexhq/substation/internal/config"
"github.com/brexhq/substation/internal/errors"
"github.com/brexhq/substation/message"
)

type aggregateArrayConfig struct {
Expand All @@ -18,17 +18,8 @@ func (c *aggregateArrayConfig) Decode(in interface{}) error {
return iconfig.Decode(in, c)
}

func aggToArray(data [][]byte) ([]byte, error) {
msg := message.New()

for _, d := range data {
if err := msg.SetValue("array.-1", d); err != nil {
return nil, err
}
}

b := msg.GetValue("array")
return b.Bytes(), nil
func aggToArray(data [][]byte) []byte {
return slices.Concat([]byte("["), bytes.Join(data, []byte(",")), []byte("]"))
}

type aggregateStrConfig struct {
Expand Down
19 changes: 6 additions & 13 deletions transform/aggregate_to_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,19 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message)
tf.mu.Lock()
defer tf.mu.Unlock()

//nolint: nestif // ignore nesting complexity
if msg.IsControl() {
var output []*message.Message

for _, items := range tf.agg.GetAll() {
agg, err := aggToArray(items.Get())
if err != nil {
return nil, fmt.Errorf("transform: aggregate_to_array: %v", err)
}
array := aggToArray(items.Get())

outMsg := message.New()
if tf.hasObjTrg {
if err := outMsg.SetValue(tf.conf.Object.TargetKey, agg); err != nil {
if err := outMsg.SetValue(tf.conf.Object.TargetKey, array); err != nil {
return nil, fmt.Errorf("transform: aggregate_to_array: %v", err)
}
} else {
outMsg.SetData(agg)
outMsg.SetData(array)
}

output = append(output, outMsg)
Expand All @@ -80,18 +76,15 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message)
return nil, nil
}

agg, err := aggToArray(tf.agg.Get(key))
if err != nil {
return nil, fmt.Errorf("transform: aggregate_to_array: %v", err)
}
array := aggToArray(tf.agg.Get(key))

outMsg := message.New()
if tf.hasObjTrg {
if err := outMsg.SetValue(tf.conf.Object.TargetKey, agg); err != nil {
if err := outMsg.SetValue(tf.conf.Object.TargetKey, array); err != nil {
return nil, fmt.Errorf("transform: aggregate_to_array: %v", err)
}
} else {
outMsg.SetData(agg)
outMsg.SetData(array)
}

// If data cannot be added after reset, then the batch is misconfgured.
Expand Down

0 comments on commit d730cc6

Please sign in to comment.