Skip to content

Commit

Permalink
share code between print functions
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 8, 2024
1 parent 9e56e46 commit 82c9920
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 40 deletions.
13 changes: 3 additions & 10 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"log/slog"
"reflect"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -350,15 +349,9 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
stream := model.NewQRecordStream(bufferSize)
outstream := stream
if config.Script != "" {
ls, err := utils.LoadScript(ctx, config.Script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, strings.Join(ss, "\t"))
return 0
})
ls, err := utils.LoadScript(ctx, config.Script, utils.LuaPrintFn(func(s string) {
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s)
}))
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return err
Expand Down
13 changes: 3 additions & 10 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -196,15 +195,9 @@ func (c *EventHubConnector) processBatch(
var fn *lua.LFunction
if req.Script != "" {
var err error
ls, err = utils.LoadScript(ctx, req.Script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t"))
return 0
})
ls, err = utils.LoadScript(ctx, req.Script, utils.LuaPrintFn(func(s string) {
_ = c.LogFlowInfo(ctx, req.FlowJobName, s)
}))
if err != nil {
return 0, err
}
Expand Down
13 changes: 3 additions & 10 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -178,15 +177,9 @@ func (c *KafkaConnector) createPool(
}

return utils.LuaPool(func() (*lua.LState, error) {
ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t"))
return 0
})
ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) {
_ = c.LogFlowInfo(ctx, flowJobName, s)
}))
if err != nil {
return nil, err
}
Expand Down
13 changes: 3 additions & 10 deletions flow/connectors/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log/slog"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -130,15 +129,9 @@ func (c *PubSubConnector) createPool(
queueErr func(error),
) (*utils.LPool[[]PubSubMessage], error) {
return utils.LuaPool(func() (*lua.LState, error) {
ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t"))
return 0
})
ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) {
_ = c.LogFlowInfo(ctx, flowJobName, s)
}))
if err != nil {
return nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"context"
"fmt"
"strings"

"github.com/yuin/gopher-lua"

Expand Down Expand Up @@ -35,6 +36,18 @@ func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error) {
}
}

func LuaPrintFn(fn func(string)) lua.LGFunction {
return func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
fn(strings.Join(ss, "\t"))
return 0
}
}

func LoadScript(ctx context.Context, script string, printfn lua.LGFunction) (*lua.LState, error) {
ls := lua.NewState(lua.Options{SkipOpenLibs: true})
ls.SetContext(ctx)
Expand Down

0 comments on commit 82c9920

Please sign in to comment.