diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index e2c71b0fb1..a5adb705cc 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -166,17 +166,20 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader) pua.RegisterTypes(ls) err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(req.Script)) + if err != nil { + return nil, fmt.Errorf("error loading script %s: %w", req.Script, err) + } + err = ls.PCall(0, 0, nil) if err != nil { return nil, fmt.Errorf("error executing script %s: %w", req.Script, err) } // discard results, for now rely on globals instead of script returning anything - ls.SetTop(0) var ok bool lfn := ls.Env.RawGetString("onRecord") fn, ok = lfn.(*lua.LFunction) if !ok { - return nil, fmt.Errorf("script should define `onRecord` as function, not %s", lfn.Type()) + return nil, fmt.Errorf("script should define `onRecord` as function, not %s", lfn) } } else { return nil, errors.New("kafka mirror must have script")