Skip to content

Commit

Permalink
feat(transform): Add UtilityControl Transform for Generating Ctrl Mes…
Browse files Browse the repository at this point in the history
…sages (#175)

* feat(transform): Add UtilityControl Transform for Generating Ctrl Messages

* fix(transform): Fix Cases Where Aggregate Transforms Emit Empty Results

* docs(examples): Add Example for Generating Ctrl Messages
  • Loading branch information
jshlbrd authored May 21, 2024
1 parent 3759ccc commit 93b64cd
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 0 deletions.
8 changes: 8 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,14 @@
},
util: $.transform.utility,
utility: {
control(settings={}): {
local default = {
batch: $.config.batch,
},

type: 'utility_control',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
delay(settings={}): {
local default = {
duration: null,
Expand Down
13 changes: 13 additions & 0 deletions examples/config/transform/utility/generate_ctrl/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// This example shows how to use the `utility_control` transform to
// generate a control (ctrl) Message based on the amount of data Messages
// received by the system. ctrl Messages overrides the settings of the
// `aggregate_to_array` transform (and any other transform that supports).
local sub = import '../../../../../build/config/substation.libsonnet';

{
transforms: [
sub.tf.utility.control({ batch: { count: 2 } }),
sub.tf.aggregate.to.array({ batch: { count: 10000 } }),
sub.tf.send.stdout(),
],
}
13 changes: 13 additions & 0 deletions examples/config/transform/utility/generate_ctrl/data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{"a":"b"}
{"c":"d"}
{"e":"f"}
{"g":"h"}
{"i":"j"}
{"k":"l"}
{"m":"n"}
{"o":"p"}
{"q":"r"}
{"s":"t"}
{"u":"v"}
{"w":"x"}
{"y":"z"}
8 changes: 8 additions & 0 deletions transform/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ func (c *aggregateArrayConfig) Decode(in interface{}) error {
}

func aggToArray(data [][]byte) []byte {
if len(data) == 0 {
return nil
}

return slices.Concat([]byte("["), bytes.Join(data, []byte(",")), []byte("]"))
}

Expand All @@ -43,6 +47,10 @@ func (c *aggregateStrConfig) Validate() error {
}

func aggToStr(data [][]byte, separator []byte) []byte {
if len(data) == 0 {
return nil
}

return bytes.Join(data, separator)
}

Expand Down
2 changes: 2 additions & 0 deletions transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint
case "time_to_unix_milli":
return newTimeToUnixMilli(ctx, cfg)
// Utility transforms.
case "utility_control":
return newUtilityControl(ctx, cfg)
case "utility_delay":
return newUtilityDelay(ctx, cfg)
case "utility_drop":
Expand Down
81 changes: 81 additions & 0 deletions transform/utility_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package transform

import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/aggregate"
iconfig "github.com/brexhq/substation/internal/config"
"github.com/brexhq/substation/message"
)

type utilityControlConfig struct {
Batch iconfig.Batch `json:"batch"`
}

func (c *utilityControlConfig) Decode(in interface{}) error {
return iconfig.Decode(in, c)
}

func newUtilityControl(_ context.Context, cfg config.Config) (*utilityControl, error) {
conf := utilityControlConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: utility_control: %v", err)
}

agg, err := aggregate.New(aggregate.Config{
Count: conf.Batch.Count,
Size: conf.Batch.Size,
Duration: conf.Batch.Duration,
})
if err != nil {
return nil, fmt.Errorf("transform: utility_control: %v", err)
}

tf := utilityControl{
conf: conf,
agg: *agg,
}

return &tf, nil
}

type utilityControl struct {
conf utilityControlConfig

mu sync.Mutex
agg aggregate.Aggregate
}

func (tf *utilityControl) Transform(_ context.Context, msg *message.Message) ([]*message.Message, error) {
tf.mu.Lock()
defer tf.mu.Unlock()

if msg.IsControl() {
// If a control message is received, then the aggregation is reset
// to prevent sending duplicate control messages.
tf.agg.ResetAll()

return []*message.Message{msg}, nil
}

if ok := tf.agg.Add("", msg.Data()); ok {
return []*message.Message{msg}, nil
}

tf.agg.Reset("")
if ok := tf.agg.Add("", msg.Data()); !ok {
return nil, fmt.Errorf("transform: utility_control: %v", errSendBatchMisconfigured)
}

ctrl := message.New().AsControl()
return []*message.Message{msg, ctrl}, nil
}

func (tf *utilityControl) String() string {
b, _ := json.Marshal(tf.conf)
return string(b)
}

0 comments on commit 93b64cd

Please sign in to comment.