From 82c9920bb0085317c328897f69f8182e7c176247 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 8 May 2024 12:31:50 +0000 Subject: [PATCH] share code between print functions --- flow/activities/flowable_core.go | 13 +++---------- flow/connectors/eventhub/eventhub.go | 13 +++---------- flow/connectors/kafka/kafka.go | 13 +++---------- flow/connectors/pubsub/pubsub.go | 13 +++---------- flow/connectors/utils/lua.go | 13 +++++++++++++ 5 files changed, 25 insertions(+), 40 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 96cea69856..ad3dca378a 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -6,7 +6,6 @@ import ( "fmt" "log/slog" "reflect" - "strings" "sync/atomic" "time" @@ -350,15 +349,9 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, stream := model.NewQRecordStream(bufferSize) outstream := stream if config.Script != "" { - ls, err := utils.LoadScript(ctx, config.Script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - a.Alerter.LogFlowInfo(ctx, config.FlowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err := utils.LoadScript(ctx, config.Script, utils.LuaPrintFn(func(s string) { + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, s) + })) if err != nil { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 4f95d1007e..1182f4d413 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "strings" "sync/atomic" "time" @@ -196,15 +195,9 @@ func (c *EventHubConnector) processBatch( var fn *lua.LFunction if req.Script != "" { var err error - ls, err = utils.LoadScript(ctx, req.Script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err = utils.LoadScript(ctx, req.Script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, req.FlowJobName, s) + })) if err != nil { return 0, err } diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index cfe4652598..c58da5e50f 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "fmt" "log/slog" - "strings" "sync/atomic" "time" @@ -178,15 +177,9 @@ func (c *KafkaConnector) createPool( } return utils.LuaPool(func() (*lua.LState, error) { - ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, flowJobName, s) + })) if err != nil { return nil, err } diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 54031f016d..0a8709b3b2 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "strings" "sync" "sync/atomic" "time" @@ -130,15 +129,9 @@ func (c *PubSubConnector) createPool( queueErr func(error), ) (*utils.LPool[[]PubSubMessage], error) { return utils.LuaPool(func() (*lua.LState, error) { - ls, err := utils.LoadScript(ctx, script, func(ls *lua.LState) int { - top := ls.GetTop() - ss := make([]string, top) - for i := range top { - ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() - } - _ = c.LogFlowInfo(ctx, flowJobName, strings.Join(ss, "\t")) - return 0 - }) + ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { + _ = c.LogFlowInfo(ctx, flowJobName, s) + })) if err != nil { return nil, err } diff --git a/flow/connectors/utils/lua.go b/flow/connectors/utils/lua.go index 47676721b3..f1d82f373f 100644 --- a/flow/connectors/utils/lua.go +++ b/flow/connectors/utils/lua.go @@ -3,6 +3,7 @@ package utils import ( "context" "fmt" + "strings" "github.com/yuin/gopher-lua" @@ -35,6 +36,18 @@ func LVAsStringOrNil(ls *lua.LState, v lua.LValue) (string, error) { } } +func LuaPrintFn(fn func(string)) lua.LGFunction { + return func(ls *lua.LState) int { + top := ls.GetTop() + ss := make([]string, top) + for i := range top { + ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() + } + fn(strings.Join(ss, "\t")) + return 0 + } +} + func LoadScript(ctx context.Context, script string, printfn lua.LGFunction) (*lua.LState, error) { ls := lua.NewState(lua.Options{SkipOpenLibs: true}) ls.SetContext(ctx)