Skip to content

Commit

Permalink
feat(transform): Add ID Settings Field (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
jshlbrd authored Jun 10, 2024
1 parent 18b2b44 commit ab1a4ff
Show file tree
Hide file tree
Showing 88 changed files with 983 additions and 474 deletions.
379 changes: 255 additions & 124 deletions build/config/substation.libsonnet

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/config/config.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ local sub = import '../../build/config/substation.libsonnet';
// Substation application configs always contain an array named `transforms`.
transforms: [
// Each transform function is defined in the `substation` library.
sub.transform.object.insert({ object: { target_key: 'a' }, value: 'z' }),
sub.transform.object.insert({ id: 'insert-z', object: { target_key: 'a' }, value: 'z' }),
// Transform functions can be conditionally applied using the
// `meta.switch` function.
sub.transform.meta.switch({ cases: [
Expand Down
2 changes: 2 additions & 0 deletions transform/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

type aggregateArrayConfig struct {
ID string `json:"id"`
Object iconfig.Object `json:"object"`
Batch iconfig.Batch `json:"batch"`
}
Expand All @@ -30,6 +31,7 @@ type aggregateStrConfig struct {
// Separator is the string that is used to join and split data.
Separator string `json:"separator"`

ID string `json:"id"`
Object iconfig.Object `json:"object"`
Batch iconfig.Batch `json:"batch"`
}
Expand Down
12 changes: 8 additions & 4 deletions transform/aggregate_from_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import (
func newAggregateFromArray(_ context.Context, cfg config.Config) (*aggregateFromArray, error) {
conf := aggregateArrayConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: aggregate_from_array: %v", err)
return nil, fmt.Errorf("transform aggregate_from_array: %v", err)
}

if conf.ID == "" {
conf.ID = "aggregate_from_array"
}

tf := aggregateFromArray{
Expand Down Expand Up @@ -43,7 +47,7 @@ func (tf *aggregateFromArray) Transform(ctx context.Context, msg *message.Messag
if tf.hasObjSrc {
value = msg.GetValue(tf.conf.Object.SourceKey)
if err := msg.DeleteValue(tf.conf.Object.SourceKey); err != nil {
return nil, err
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}
} else {
value = bytesToValue(msg.Data())
Expand All @@ -59,14 +63,14 @@ func (tf *aggregateFromArray) Transform(ctx context.Context, msg *message.Messag
if tf.hasObjSrc {
for key, val := range msg.GetValue("@this").Map() {
if err := outMsg.SetValue(key, val.Value()); err != nil {
return nil, err
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}
}
}

if tf.hasObjTrg {
if err := outMsg.SetValue(tf.conf.Object.TargetKey, res); err != nil {
return nil, err
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

output = append(output, outMsg)
Expand Down
8 changes: 6 additions & 2 deletions transform/aggregate_from_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ import (
func newAggregateFromString(_ context.Context, cfg config.Config) (*aggregateFromString, error) {
conf := aggregateStrConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: aggregate_from_string: %v", err)
return nil, fmt.Errorf("transform aggregate_from_string: %v", err)
}

if conf.ID == "" {
conf.ID = "aggregate_from_string"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: aggregate_from_string: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}

tf := aggregateFromString{
Expand Down
14 changes: 9 additions & 5 deletions transform/aggregate_to_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import (
func newAggregateToArray(_ context.Context, cfg config.Config) (*aggregateToArray, error) {
conf := aggregateArrayConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: aggregate_to_array: %v", err)
return nil, fmt.Errorf("transform aggregate_to_array: %v", err)
}

if conf.ID == "" {
conf.ID = "aggregate_to_array"
}

tf := aggregateToArray{
Expand All @@ -28,7 +32,7 @@ func newAggregateToArray(_ context.Context, cfg config.Config) (*aggregateToArra
Duration: conf.Batch.Duration,
})
if err != nil {
return nil, fmt.Errorf("transform: aggregate_to_array: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}
tf.agg = *agg

Expand Down Expand Up @@ -56,7 +60,7 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message)
outMsg := message.New()
if tf.hasObjTrg {
if err := outMsg.SetValue(tf.conf.Object.TargetKey, array); err != nil {
return nil, fmt.Errorf("transform: aggregate_to_array: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}
} else {
outMsg.SetData(array)
Expand All @@ -81,7 +85,7 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message)
outMsg := message.New()
if tf.hasObjTrg {
if err := outMsg.SetValue(tf.conf.Object.TargetKey, array); err != nil {
return nil, fmt.Errorf("transform: aggregate_to_array: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}
} else {
outMsg.SetData(array)
Expand All @@ -90,7 +94,7 @@ func (tf *aggregateToArray) Transform(ctx context.Context, msg *message.Message)
// If data cannot be added after reset, then the batch is misconfgured.
tf.agg.Reset(key)
if ok := tf.agg.Add(key, msg.Data()); !ok {
return nil, fmt.Errorf("transform: send_stdout: %v", errSendBatchMisconfigured)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, errSendBatchMisconfigured)
}

return []*message.Message{outMsg}, nil
Expand Down
12 changes: 8 additions & 4 deletions transform/aggregate_to_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
func newAggregateToString(_ context.Context, cfg config.Config) (*aggregateToString, error) {
conf := aggregateStrConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: aggregate_to_string: %v", err)
return nil, fmt.Errorf("transform aggregate_to_string: %v", err)
}

if conf.ID == "" {
conf.ID = "aggregate_to_string"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: aggregate_to_string: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}

tf := aggregateToString{
Expand All @@ -32,7 +36,7 @@ func newAggregateToString(_ context.Context, cfg config.Config) (*aggregateToStr
Duration: conf.Batch.Duration,
})
if err != nil {
return nil, fmt.Errorf("transform: aggregate_to_string: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}
tf.agg = agg

Expand Down Expand Up @@ -80,7 +84,7 @@ func (tf *aggregateToString) Transform(ctx context.Context, msg *message.Message
// If data cannot be added after reset, then the batch is misconfgured.
tf.agg.Reset(key)
if ok := tf.agg.Add(key, msg.Data()); !ok {
return nil, fmt.Errorf("transform: send_stdout: %v", errSendBatchMisconfigured)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, errSendBatchMisconfigured)
}

return []*message.Message{outMsg}, nil
Expand Down
11 changes: 8 additions & 3 deletions transform/array_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type arrayJoinConfig struct {
// Separator is the string that is used to join data.
Separator string `json:"separator"`

ID string `json:"id"`
Object iconfig.Object `json:"object"`
}

Expand All @@ -38,11 +39,15 @@ func (c *arrayJoinConfig) Validate() error {
func newArrayJoin(_ context.Context, cfg config.Config) (*arrayJoin, error) {
conf := arrayJoinConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: array_join: %v", err)
return nil, fmt.Errorf("transform array_join: %v", err)
}

if conf.ID == "" {
conf.ID = "array_join"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: array_join: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}

tf := arrayJoin{
Expand Down Expand Up @@ -92,7 +97,7 @@ func (tf *arrayJoin) Transform(ctx context.Context, msg *message.Message) ([]*me

if tf.hasObjectSetKey {
if err := msg.SetValue(tf.conf.Object.TargetKey, str); err != nil {
return nil, fmt.Errorf("transform: array_join: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

return []*message.Message{msg}, nil
Expand Down
11 changes: 8 additions & 3 deletions transform/array_zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

type arrayZipConfig struct {
ID string `json:"id"`
Object iconfig.Object `json:"object"`
}

Expand All @@ -34,11 +35,15 @@ func (c *arrayZipConfig) Validate() error {
func newArrayZip(_ context.Context, cfg config.Config) (*arrayZip, error) {
conf := arrayZipConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: array_zip: %v", err)
return nil, fmt.Errorf("transform array_zip: %v", err)
}

if conf.ID == "" {
conf.ID = "array_zip"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: array_zip: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}

tf := arrayZip{
Expand Down Expand Up @@ -86,7 +91,7 @@ func (tf *arrayZip) Transform(ctx context.Context, msg *message.Message) ([]*mes

if tf.hasObjDst {
if err := msg.SetValue(tf.conf.Object.TargetKey, b); err != nil {
return nil, fmt.Errorf("transform: array_zip: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

return []*message.Message{msg}, nil
Expand Down
1 change: 1 addition & 0 deletions transform/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const enrichHTTPInterp = `${DATA}`

type enrichDNSConfig struct {
ID string `json:"id"`
Object iconfig.Object `json:"object"`
Request iconfig.Request `json:"request"`
}
Expand Down
15 changes: 10 additions & 5 deletions transform/enrich_aws_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type enrichAWSDynamoDBConfig struct {
// This is optional and defaults to true.
ScanIndexForward bool `json:"scan_index_forward"`

ID string `json:"id"`
Object iconfig.Object `json:"object"`
AWS iconfig.AWS `json:"aws"`
Retry iconfig.Retry `json:"retry"`
Expand Down Expand Up @@ -71,11 +72,15 @@ func (c *enrichAWSDynamoDBConfig) Validate() error {
func newEnrichAWSDynamoDB(_ context.Context, cfg config.Config) (*enrichAWSDynamoDB, error) {
conf := enrichAWSDynamoDBConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: enrich_aws_dynamodb: %v", err)
return nil, fmt.Errorf("transform enrich_aws_dynamodb: %v", err)
}

if conf.ID == "" {
conf.ID = "enrich_aws_dynamodb"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: enrich_aws_dynamodb: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}

tf := enrichAWSDynamoDB{
Expand Down Expand Up @@ -114,7 +119,7 @@ func (tf *enrichAWSDynamoDB) Transform(ctx context.Context, msg *message.Message
}

if !json.Valid(tmp.Data()) {
return nil, fmt.Errorf("transform: enrich_aws_dynamodb: %v", errMsgInvalidObject)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, errMsgInvalidObject)
}

pk := tmp.GetValue(tf.conf.PartitionKey)
Expand All @@ -125,7 +130,7 @@ func (tf *enrichAWSDynamoDB) Transform(ctx context.Context, msg *message.Message
sk := tmp.GetValue(tf.conf.SortKey)
value, err := tf.dynamodb(ctx, pk.String(), sk.String())
if err != nil {
return nil, fmt.Errorf("transform: enrich_aws_dynamodb: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

// No match.
Expand All @@ -134,7 +139,7 @@ func (tf *enrichAWSDynamoDB) Transform(ctx context.Context, msg *message.Message
}

if err := msg.SetValue(tf.conf.Object.TargetKey, value); err != nil {
return nil, fmt.Errorf("transform: enrich_aws_dynamodb: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

return []*message.Message{msg}, nil
Expand Down
17 changes: 11 additions & 6 deletions transform/enrich_aws_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type enrichAWSLambdaConfig struct {
// FunctionName is the AWS Lambda function to synchronously invoke.
FunctionName string `json:"function_name"`

ID string `json:"id"`
Object iconfig.Object `json:"object"`
AWS iconfig.AWS `json:"aws"`
Retry iconfig.Retry `json:"retry"`
Expand Down Expand Up @@ -46,11 +47,15 @@ func (c *enrichAWSLambdaConfig) Validate() error {
func newEnrichAWSLambda(_ context.Context, cfg config.Config) (*enrichAWSLambda, error) {
conf := enrichAWSLambdaConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: enrich_aws_lambda: %v", err)
return nil, fmt.Errorf("transform enrich_aws_lambda: %v", err)
}

if conf.ID == "" {
conf.ID = "enrich_aws_lambda"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: enrich_aws_lambda: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}

tf := enrichAWSLambda{
Expand Down Expand Up @@ -86,21 +91,21 @@ func (tf *enrichAWSLambda) Transform(ctx context.Context, msg *message.Message)
}

if !json.Valid(value.Bytes()) {
return nil, fmt.Errorf("transform: enrich_aws_lambda: %v", errMsgInvalidObject)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, errMsgInvalidObject)
}

resp, err := tf.client.Invoke(ctx, tf.conf.FunctionName, value.Bytes())
if err != nil {
return nil, fmt.Errorf("transform: enrich_aws_lambda: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

if resp.FunctionError != nil {
resErr := gjson.GetBytes(resp.Payload, "errorMessage").String()
return nil, fmt.Errorf("transform: enrich_aws_lambda: %v", resErr)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, resErr)
}

if err := msg.SetValue(tf.conf.Object.TargetKey, resp.Payload); err != nil {
return nil, fmt.Errorf("transform: enrich_aws_lambda: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

return []*message.Message{msg}, nil
Expand Down
16 changes: 10 additions & 6 deletions transform/enrich_dns_domain_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ import (
func newEnrichDNSDomainLookup(_ context.Context, cfg config.Config) (*enrichDNSDomainLookup, error) {
conf := enrichDNSConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: enrich_dns_domain_lookup: %v", err)
return nil, fmt.Errorf("transform enrich_dns_domain_lookup: %v", err)
}

if conf.ID == "" {
conf.ID = "enrich_dns_domain_lookup"
}

if err := conf.Validate(); err != nil {
return nil, fmt.Errorf("transform: enrich_dns_domain_lookup: %v", err)
return nil, fmt.Errorf("transform %s: %v", conf.ID, err)
}

dur, err := time.ParseDuration(conf.Request.Timeout)
if err != nil {
return nil, fmt.Errorf("transform: enrich_dns_domain_lookup: duration: %v", err)
return nil, fmt.Errorf("transform %s: duration: %v", conf.ID, err)
}

tf := enrichDNSDomainLookup{
Expand Down Expand Up @@ -59,7 +63,7 @@ func (tf *enrichDNSDomainLookup) Transform(ctx context.Context, msg *message.Mes
str := string(msg.Data())
names, err := tf.resolver.LookupHost(resolverCtx, str)
if err != nil {
return nil, fmt.Errorf("transform: enrich_dns_domain_lookup: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

// Return the first name.
Expand All @@ -75,11 +79,11 @@ func (tf *enrichDNSDomainLookup) Transform(ctx context.Context, msg *message.Mes

names, err := tf.resolver.LookupHost(resolverCtx, value.String())
if err != nil {
return nil, fmt.Errorf("transform: enrich_dns_domain_lookup: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

if err := msg.SetValue(tf.conf.Object.TargetKey, names); err != nil {
return nil, fmt.Errorf("transform: enrich_dns_domain_lookup: %v", err)
return nil, fmt.Errorf("transform %s: %v", tf.conf.ID, err)
}

return []*message.Message{msg}, nil
Expand Down
Loading

0 comments on commit ab1a4ff

Please sign in to comment.