Skip to content

Commit

Permalink
need to actually call function loader returns..
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 12, 2024
1 parent 7301480 commit 149e834
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,20 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader)
pua.RegisterTypes(ls)
err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(req.Script))
if err != nil {
return nil, fmt.Errorf("error loading script %s: %w", req.Script, err)
}
err = ls.PCall(0, 0, nil)
if err != nil {
return nil, fmt.Errorf("error executing script %s: %w", req.Script, err)
}
// discard results, for now rely on globals instead of script returning anything
ls.SetTop(0)

var ok bool
lfn := ls.Env.RawGetString("onRecord")
fn, ok = lfn.(*lua.LFunction)
if !ok {
return nil, fmt.Errorf("script should define `onRecord` as function, not %s", lfn.Type())
return nil, fmt.Errorf("script should define `onRecord` as function, not %s", lfn)
}
} else {
return nil, errors.New("kafka mirror must have script")
Expand Down

0 comments on commit 149e834

Please sign in to comment.