Skip to content

Commit

Permalink
[cappl-86] feat(workflows/wasm): emit msgs to beholder (#845)
Browse files Browse the repository at this point in the history
* wip(wasm): adds Emit to Runtime interface

WIP on Runtime with panics

* refactor(wasm): separte funcs out of NewRunner

* refactor(wasm): shifts logging related funcs around

* feat(wasm): adds custom pb message

* feat(wasm): calls emit from guest runner

* refactor(workflows): splits out emitter interface + docstring

* feat(host): defines a beholder adapter for emitter

* wip(host): implement host side emit

* refactor(wasm/host): abstracts read and write to wasm

* protos wip

* feat(wasm): emits error response

* refactor(wasm/host): write all failures from wasm to memory

* feat(wasm): inject metadata into module

* feat(events+wasm): pull emit md from req md

* feat(custmsg): creates labels from map

* feat(wasm): adds tests and validates labels

* feat(wasm/host): use custmsg implementation for calling beholder

* chore(wasm+host): docstrings and lint

* chore(host): new emitter iface + private func types

* chore(multi) review comments

* chore(wasm): add id and md to config directly

* refactor(custmsg+host): adapter labeler from config for emit

* refactor(wasm): remove emitter from mod config

* refactor(custmsg+wasm): expose emitlabeler on guest

* refactor(wasm+sdk): EmitLabeler to MessageEmitter

* refactor(wasm+events): share label keys

* refactor(wasm+values): use map[string]string directly
  • Loading branch information
MStreet3 authored Oct 22, 2024
1 parent ceeb473 commit 9225bc1
Show file tree
Hide file tree
Showing 18 changed files with 1,459 additions and 186 deletions.
32 changes: 16 additions & 16 deletions pkg/capabilities/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (

const (
// Duplicates the attributes in beholder/message.go::Metadata
labelWorkflowOwner = "workflow_owner_address"
labelWorkflowID = "workflow_id"
labelWorkflowExecutionID = "workflow_execution_id"
labelWorkflowName = "workflow_name"
labelCapabilityContractAddress = "capability_contract_address"
labelCapabilityID = "capability_id"
labelCapabilityVersion = "capability_version"
labelCapabilityName = "capability_name"
LabelWorkflowOwner = "workflow_owner_address"
LabelWorkflowID = "workflow_id"
LabelWorkflowExecutionID = "workflow_execution_id"
LabelWorkflowName = "workflow_name"
LabelCapabilityContractAddress = "capability_contract_address"
LabelCapabilityID = "capability_id"
LabelCapabilityVersion = "capability_version"
LabelCapabilityName = "capability_name"
)

type EmitMetadata struct {
Expand Down Expand Up @@ -93,35 +93,35 @@ func (e EmitMetadata) attrs() []any {
a := []any{}

if e.WorkflowOwner != "" {
a = append(a, labelWorkflowOwner, e.WorkflowOwner)
a = append(a, LabelWorkflowOwner, e.WorkflowOwner)
}

if e.WorkflowID != "" {
a = append(a, labelWorkflowID, e.WorkflowID)
a = append(a, LabelWorkflowID, e.WorkflowID)
}

if e.WorkflowExecutionID != "" {
a = append(a, labelWorkflowExecutionID, e.WorkflowExecutionID)
a = append(a, LabelWorkflowExecutionID, e.WorkflowExecutionID)
}

if e.WorkflowName != "" {
a = append(a, labelWorkflowName, e.WorkflowName)
a = append(a, LabelWorkflowName, e.WorkflowName)
}

if e.CapabilityContractAddress != "" {
a = append(a, labelCapabilityContractAddress, e.CapabilityContractAddress)
a = append(a, LabelCapabilityContractAddress, e.CapabilityContractAddress)
}

if e.CapabilityID != "" {
a = append(a, labelCapabilityID, e.CapabilityID)
a = append(a, LabelCapabilityID, e.CapabilityID)
}

if e.CapabilityVersion != "" {
a = append(a, labelCapabilityVersion, e.CapabilityVersion)
a = append(a, LabelCapabilityVersion, e.CapabilityVersion)
}

if e.CapabilityName != "" {
a = append(a, labelCapabilityName, e.CapabilityName)
a = append(a, LabelCapabilityName, e.CapabilityName)
}

return a
Expand Down
40 changes: 35 additions & 5 deletions pkg/custmsg/custom_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,35 @@ func NewLabeler() Labeler {
return Labeler{labels: make(map[string]string)}
}

// WithMapLabels adds multiple key-value pairs to the CustomMessageLabeler for transmission
// With SendLogAsCustomMessage
func (l Labeler) WithMapLabels(labels map[string]string) Labeler {
newCustomMessageLabeler := NewLabeler()

// Copy existing labels from the current agent
for k, v := range l.labels {
newCustomMessageLabeler.labels[k] = v
}

// Add new key-value pairs
for k, v := range labels {
newCustomMessageLabeler.labels[k] = v
}

return newCustomMessageLabeler
}

// With adds multiple key-value pairs to the CustomMessageLabeler for transmission With SendLogAsCustomMessage
func (c Labeler) With(keyValues ...string) Labeler {
func (l Labeler) With(keyValues ...string) Labeler {
newCustomMessageLabeler := NewLabeler()

if len(keyValues)%2 != 0 {
// If an odd number of key-value arguments is passed, return the original CustomMessageLabeler unchanged
return c
return l
}

// Copy existing labels from the current agent
for k, v := range c.labels {
for k, v := range l.labels {
newCustomMessageLabeler.labels[k] = v
}

Expand All @@ -43,10 +61,22 @@ func (c Labeler) With(keyValues ...string) Labeler {
return newCustomMessageLabeler
}

func (l Labeler) Emit(msg string) error {
return sendLogAsCustomMessageW(msg, l.labels)
}

func (l Labeler) Labels() map[string]string {
copied := make(map[string]string, len(l.labels))
for k, v := range l.labels {
copied[k] = v
}
return copied
}

// SendLogAsCustomMessage emits a BaseMessage With msg and labels as data.
// any key in labels that is not part of orderedLabelKeys will not be transmitted
func (c Labeler) SendLogAsCustomMessage(msg string) error {
return sendLogAsCustomMessageW(msg, c.labels)
func (l Labeler) SendLogAsCustomMessage(msg string) error {
return sendLogAsCustomMessageW(msg, l.labels)
}

func sendLogAsCustomMessageW(msg string, labels map[string]string) error {
Expand Down
16 changes: 15 additions & 1 deletion pkg/custmsg/custom_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,19 @@ func Test_CustomMessageAgent(t *testing.T) {
cma1 := cma.With("key1", "value1")
cma2 := cma1.With("key2", "value2")

assert.NotEqual(t, cma1.labels, cma2.labels)
assert.NotEqual(t, cma1.Labels(), cma2.Labels())
}

func Test_CustomMessageAgent_With(t *testing.T) {
cma := NewLabeler()
cma = cma.With("key1", "value1")

assert.Equal(t, cma.Labels(), map[string]string{"key1": "value1"})
}

func Test_CustomMessageAgent_WithMapLabels(t *testing.T) {
cma := NewLabeler()
cma = cma.WithMapLabels(map[string]string{"key1": "value1"})

assert.Equal(t, cma.Labels(), map[string]string{"key1": "value1"})
}
2 changes: 1 addition & 1 deletion pkg/values/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func EmptyMap() *Map {
}
}

func NewMap(m map[string]any) (*Map, error) {
func NewMap[T any](m map[string]T) (*Map, error) {
mv := map[string]Value{}
for k, v := range m {
val, err := Wrap(v)
Expand Down
13 changes: 13 additions & 0 deletions pkg/workflows/sdk/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,22 @@ import (

var BreakErr = capabilities.ErrStopExecution

type MessageEmitter interface {
// Emit sends a message to the labeler's destination.
Emit(string) error

// With sets the labels for the message to be emitted. Labels are passed as key-value pairs
// and are cumulative.
With(kvs ...string) MessageEmitter
}

// Guest interface
type Runtime interface {
Logger() logger.Logger
Fetch(req FetchRequest) (FetchResponse, error)

// Emitter sends the given message and labels to the configured collector.
Emitter() MessageEmitter
}

type FetchRequest struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/workflows/sdk/testutils/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ func (nr *NoopRuntime) Logger() logger.Logger {
l, _ := logger.New()
return l
}

func (nr *NoopRuntime) Emitter() sdk.MessageEmitter {
return nil
}
Loading

0 comments on commit 9225bc1

Please sign in to comment.