diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index 636f173d98..2090c5e617 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -159,42 +159,42 @@ func (c *KafkaConnector) SyncFlowCleanup(ctx context.Context, jobName string) er } func loadScript(ctx context.Context, script string, print lua.LGFunction) (*lua.LState, error) { - if script != "" { - ls := lua.NewState(lua.Options{SkipOpenLibs: true}) - defer ls.Close() - ls.SetContext(ctx) - for _, pair := range []struct { - n string - f lua.LGFunction - }{ - {lua.LoadLibName, lua.OpenPackage}, // Must be first - {lua.BaseLibName, lua.OpenBase}, - {lua.TabLibName, lua.OpenTable}, - {lua.StringLibName, lua.OpenString}, - {lua.MathLibName, lua.OpenMath}, - } { - ls.Push(ls.NewFunction(pair.f)) - ls.Push(lua.LString(pair.n)) - err := ls.PCall(1, 0, nil) - if err != nil { - return nil, fmt.Errorf("failed to initialize Lua runtime: %w", err) - } - } - ls.PreloadModule("flatbuffers", gluaflatbuffers.Loader) - pua.RegisterTypes(ls) - ls.Env.RawSetString("print", ls.NewFunction(print)) - err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(script)) - if err != nil { - return nil, fmt.Errorf("error loading script %s: %w", script, err) - } - err = ls.PCall(0, 0, nil) + if script == "" { + return nil, errors.New("kafka mirror must have script") + } + + ls := lua.NewState(lua.Options{SkipOpenLibs: true}) + defer ls.Close() + ls.SetContext(ctx) + for _, pair := range []struct { + n string + f lua.LGFunction + }{ + {lua.LoadLibName, lua.OpenPackage}, // Must be first + {lua.BaseLibName, lua.OpenBase}, + {lua.TabLibName, lua.OpenTable}, + {lua.StringLibName, lua.OpenString}, + {lua.MathLibName, lua.OpenMath}, + } { + ls.Push(ls.NewFunction(pair.f)) + ls.Push(lua.LString(pair.n)) + err := ls.PCall(1, 0, nil) if err != nil { - return nil, fmt.Errorf("error executing script %s: %w", script, err) + return nil, fmt.Errorf("failed to initialize Lua runtime: %w", err) } - return ls, nil - } else { - return nil, errors.New("kafka mirror must have script") } + ls.PreloadModule("flatbuffers", gluaflatbuffers.Loader) + pua.RegisterTypes(ls) + ls.Env.RawSetString("print", ls.NewFunction(print)) + err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(script)) + if err != nil { + return nil, fmt.Errorf("error loading script %s: %w", script, err) + } + err = ls.PCall(0, 0, nil) + if err != nil { + return nil, fmt.Errorf("error executing script %s: %w", script, err) + } + return ls, nil } func lvalueToKafkaRecord(ls *lua.LState, value lua.LValue) (*kgo.Record, error) {