diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index f007922f46..5bcfe839a6 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -22,6 +22,7 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/pua" + "github.com/PeerDB-io/peer-flow/pua/flatbuffers" ) type KafkaConnector struct { @@ -137,7 +138,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords var fn *lua.LFunction var ls *lua.LState if req.Script != "" { - ls = lua.NewState(lua.Options{SkipOpenLibs: true, IncludeGoStackTrace: true}) + ls = lua.NewState(lua.Options{SkipOpenLibs: true}) defer ls.Close() ls.SetContext(wgCtx) for _, pair := range []struct { @@ -157,7 +158,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords return nil, fmt.Errorf("failed to initialize Lua runtime: %w", err) } } - ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader) + ls.PreloadModule("flatbuffers", pua_flatbuffers.Loader) pua.RegisterTypes(ls) ls.Env.RawSetString("print", ls.NewFunction(func(ls *lua.LState) int { top := ls.GetTop() @@ -191,7 +192,6 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords if err := wgCtx.Err(); err != nil { return nil, err } - topic := record.GetDestinationTableName() ls.Push(fn) ls.Push(pua.LuaRecord.New(ls, record)) err := ls.PCall(1, 1, nil) @@ -199,14 +199,16 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords return nil, fmt.Errorf("script failed: %w", err) } value := ls.Get(-1) + ls.SetTop(0) if value != lua.LNil { lstr, ok := value.(lua.LString) if !ok { - return nil, fmt.Errorf("script returned non-nil non-string: %v", value) + return nil, fmt.Errorf("script returned non-nil non-string: %s", value) } + wg.Add(1) + topic := record.GetDestinationTableName() c.client.Produce(wgCtx, &kgo.Record{Topic: topic, Value: []byte(lstr)}, produceCb) - numRecords += 1 tableNameRowsMapping[topic] += 1 } diff --git a/flow/pua/flatbuffers.go b/flow/pua/flatbuffers/flatbuffers.go similarity index 93% rename from flow/pua/flatbuffers.go rename to flow/pua/flatbuffers/flatbuffers.go index c27431007a..2163417d30 100644 --- a/flow/pua/flatbuffers.go +++ b/flow/pua/flatbuffers/flatbuffers.go @@ -1,10 +1,10 @@ -package pua +package pua_flatbuffers import ( "github.com/yuin/gopher-lua" ) -func FlatBuffers_Loader(ls *lua.LState) int { +func Loader(ls *lua.LState) int { ls.PreloadModule("flatbuffers.binaryarray", FlatBuffers_BinaryArray_Loader) ls.PreloadModule("flatbuffers.builder", FlatBuffers_Builder_Loader) ls.PreloadModule("flatbuffers.numTypes", FlatBuffers_N_Loader) diff --git a/flow/pua/flatbuffers_binaryarray.go b/flow/pua/flatbuffers/flatbuffers_binaryarray.go similarity index 74% rename from flow/pua/flatbuffers_binaryarray.go rename to flow/pua/flatbuffers/flatbuffers_binaryarray.go index 635e8f7f55..caa087c7fc 100644 --- a/flow/pua/flatbuffers_binaryarray.go +++ b/flow/pua/flatbuffers/flatbuffers_binaryarray.go @@ -1,14 +1,12 @@ -package pua +package pua_flatbuffers import ( "github.com/yuin/gopher-lua" -) -type BinaryArray struct { - data []byte -} + "github.com/PeerDB-io/peer-flow/pua" +) -var LuaBinaryArray = LuaUserDataType[BinaryArray]{Name: "flatbuffers_binaryarray"} +var LuaBinaryArray = pua.LuaUserDataType[[]byte]{Name: "flatbuffers_binaryarray"} func FlatBuffers_BinaryArray_Loader(ls *lua.LState) int { m := ls.NewTable() @@ -28,16 +26,12 @@ func FlatBuffers_BinaryArray_Loader(ls *lua.LState) int { func BinaryArrayNew(ls *lua.LState) int { lval := ls.Get(1) - var ba BinaryArray + var ba []byte switch val := lval.(type) { case lua.LString: - ba = BinaryArray{ - data: []byte(val), - } + ba = []byte(val) case lua.LNumber: - ba = BinaryArray{ - data: make([]byte, int(val)), - } + ba = make([]byte, int(val)) default: ls.RaiseError("Expect a integer size value or string to construct a binary array") return 0 @@ -48,7 +42,7 @@ func BinaryArrayNew(ls *lua.LState) int { func BinaryArrayLen(ls *lua.LState) int { ba := LuaBinaryArray.StartMeta(ls) - ls.Push(lua.LNumber(len(ba.data))) + ls.Push(lua.LNumber(len(ba))) return 1 } @@ -56,9 +50,9 @@ func BinaryArrayIndex(ls *lua.LState) int { ba, key := LuaBinaryArray.StartIndex(ls) switch key { case "size": - ls.Push(lua.LNumber(len(ba.data))) + ls.Push(lua.LNumber(len(ba))) case "str": - ls.Push(lua.LString(ba.data)) + ls.Push(lua.LString(ba)) case "data": ls.RaiseError("BinaryArray data property inaccessible") return 0 @@ -77,29 +71,28 @@ func BinaryArraySlice(ls *lua.LState) int { startPos = 0 } if luaEndPos, ok := ls.Get(3).(lua.LNumber); ok { - endPos = min(int(luaEndPos), len(ba.data)) + endPos = min(int(luaEndPos), len(ba)) } else { - endPos = len(ba.data) + endPos = len(ba) } - ls.Push(lua.LString(ba.data[startPos:endPos])) + ls.Push(lua.LString(ba[startPos:endPos])) return 1 } func BinaryArrayGrow(ls *lua.LState) int { baud, ba := LuaBinaryArray.Check(ls, 1) newsize := int(ls.CheckNumber(2)) - if newsize > len(ba.data) { + if newsize > len(ba) { newdata := make([]byte, newsize) - copy(newdata[newsize-len(ba.data):], ba.data) - ba.data = newdata - baud.Value = ba + copy(newdata[newsize-len(ba):], ba) + baud.Value = newdata } return 0 } -func (ba *BinaryArray) Pad(n int, start int) { +func Pad(ba []byte, n int, start int) { for i := range n { - ba.data[start+i] = 0 + ba[start+i] = 0 } } @@ -107,7 +100,7 @@ func BinaryArrayPad(ls *lua.LState) int { ba := LuaBinaryArray.StartMeta(ls) n := int(ls.CheckNumber(2)) startPos := int(ls.CheckNumber(3)) - ba.Pad(n, startPos) + Pad(ba, n, startPos) return 0 } @@ -116,10 +109,10 @@ func BinaryArraySet(ls *lua.LState) int { idx := int(ls.CheckNumber(3)) value := ls.Get(2) if num, ok := value.(lua.LNumber); ok { - ba.data[idx] = byte(num) + ba[idx] = byte(num) } if str, ok := value.(lua.LString); ok { - ba.data[idx] = str[0] + ba[idx] = str[0] } return 0 } diff --git a/flow/pua/flatbuffers_builder.go b/flow/pua/flatbuffers/flatbuffers_builder.go similarity index 94% rename from flow/pua/flatbuffers_builder.go rename to flow/pua/flatbuffers/flatbuffers_builder.go index 4cde881345..76c4902c76 100644 --- a/flow/pua/flatbuffers_builder.go +++ b/flow/pua/flatbuffers/flatbuffers_builder.go @@ -1,15 +1,17 @@ -package pua +package pua_flatbuffers import ( "slices" "github.com/yuin/gopher-lua" + + "github.com/PeerDB-io/peer-flow/pua" ) const VtableMetadataFields int = 2 type Builder struct { - ba BinaryArray + ba []byte vtables []int currentVT []int head int @@ -30,24 +32,24 @@ func (b *Builder) EndVector(ls *lua.LState, vectorSize int) int { } func (b *Builder) Offset() int { - return len(b.ba.data) - b.head + return len(b.ba) - b.head } func (b *Builder) Pad(pad int) { if pad > 0 { b.head -= pad - b.ba.Pad(pad, b.head) + Pad(b.ba, pad, b.head) } } func (b *Builder) Place(ls *lua.LState, x lua.LValue, n N) { b.head -= int(n.width) - n.Pack(ls, b.ba.data[b.head:], x) + n.Pack(ls, b.ba[b.head:], x) } func (b *Builder) PlaceU64(u64 uint64, n N) { b.head -= int(n.width) - n.PackU64(b.ba.data[b.head:], u64) + n.PackU64(b.ba[b.head:], u64) } func (b *Builder) Prep(width uint8, additional int) { @@ -60,12 +62,12 @@ func (b *Builder) Prep(width uint8, additional int) { space := alignsize + int(width) + additional if b.head < space { - oldlen := len(b.ba.data) - newdata := slices.Grow(b.ba.data, space) + oldlen := len(b.ba) + newdata := slices.Grow(b.ba, space) newdata = newdata[:cap(newdata)] copy(newdata[len(newdata)-oldlen:], newdata[:oldlen]) b.head += len(newdata) - oldlen - b.ba.data = newdata + b.ba = newdata } b.Pad(alignsize) @@ -164,9 +166,9 @@ func (b *Builder) WriteVtable(ls *lua.LState) int { var existingVtable int for i := len(b.vtables) - 1; i >= 0; i -= 1 { vt2Offset := b.vtables[i] - vt2Start := len(b.ba.data) - vt2Offset - vt2Len := uint16n.UnpackU64(b.ba.data[vt2Start:]) - vt2 := b.ba.data[vt2Start+VtableMetadataFields*2 : vt2Start+int(vt2Len)] + vt2Start := len(b.ba) - vt2Offset + vt2Len := uint16n.UnpackU64(b.ba[vt2Start:]) + vt2 := b.ba[vt2Start+VtableMetadataFields*2 : vt2Start+int(vt2Len)] if vtableEqual(b.currentVT, objectOffset, vt2) { existingVtable = vt2Offset break @@ -187,11 +189,11 @@ func (b *Builder) WriteVtable(ls *lua.LState) int { b.PrependVOffsetT(uint16(len(b.currentVT)+VtableMetadataFields) * 2) newOffset := b.Offset() - int32n.PackU64(b.ba.data[len(b.ba.data)-objectOffset:], uint64(newOffset-objectOffset)) + int32n.PackU64(b.ba[len(b.ba)-objectOffset:], uint64(newOffset-objectOffset)) b.vtables = append(b.vtables, newOffset) } else { - b.head = len(b.ba.data) - objectOffset - int32n.PackU64(b.ba.data[b.head:], uint64(existingVtable-objectOffset)) + b.head = len(b.ba) - objectOffset + int32n.PackU64(b.ba[b.head:], uint64(existingVtable-objectOffset)) } if len(b.currentVT) != 0 { @@ -200,7 +202,7 @@ func (b *Builder) WriteVtable(ls *lua.LState) int { return objectOffset } -var LuaBuilder = LuaUserDataType[*Builder]{Name: "flatbuffers_builder"} +var LuaBuilder = pua.LuaUserDataType[*Builder]{Name: "flatbuffers_builder"} func FlatBuffers_Builder_Loader(ls *lua.LState) int { m := ls.NewTable() @@ -269,7 +271,7 @@ func BuilderNew(ls *lua.LState) int { initialSize := int(ls.CheckNumber(1)) ls.Push(LuaBuilder.New(ls, &Builder{ - ba: BinaryArray{data: make([]byte, initialSize)}, + ba: make([]byte, initialSize), vtables: make([]int, 0, 4), currentVT: make([]int, 0, 4), head: initialSize, @@ -291,16 +293,16 @@ func BuilderClear(ls *lua.LState) int { } b.currentVT = b.currentVT[:0] b.objectEnd = 0 - b.head = len(b.ba.data) + b.head = len(b.ba) return 0 } func BuilderOutput(ls *lua.LState) int { b := LuaBuilder.StartMeta(ls) if lua.LVIsFalse(ls.Get(2)) { - ls.Push(lua.LString(b.ba.data[b.head:])) + ls.Push(lua.LString(b.ba[b.head:])) } else { - ls.Push(lua.LString(b.ba.data)) + ls.Push(lua.LString(b.ba)) } return 1 } @@ -419,7 +421,7 @@ func createBytesHelper(ls *lua.LState, addnul bool) int { b.Prep(4, lens) } b.head -= lens - copy(b.ba.data[b.head:], s) + copy(b.ba[b.head:], s) ls.Push(lua.LNumber(b.EndVector(ls, lens))) return 1 diff --git a/flow/pua/flatbuffers_numtypes.go b/flow/pua/flatbuffers/flatbuffers_numtypes.go similarity index 74% rename from flow/pua/flatbuffers_numtypes.go rename to flow/pua/flatbuffers/flatbuffers_numtypes.go index 32a23fc205..9d62f197aa 100644 --- a/flow/pua/flatbuffers_numtypes.go +++ b/flow/pua/flatbuffers/flatbuffers_numtypes.go @@ -1,4 +1,4 @@ -package pua +package pua_flatbuffers import ( "encoding/binary" @@ -6,6 +6,8 @@ import ( "strconv" "github.com/yuin/gopher-lua" + + "github.com/PeerDB-io/peer-flow/pua" ) // Minimal API implemented for generated code @@ -146,9 +148,9 @@ func (n *N) Unpack(ls *lua.LState, buf []byte) lua.LValue { case 8: u64 := binary.LittleEndian.Uint64(buf) if n.signed { - return LuaI64.New(ls, int64(u64)) + return pua.LuaI64.New(ls, int64(u64)) } else { - return LuaU64.New(ls, u64) + return pua.LuaU64.New(ls, u64) } } case tyfloat: @@ -165,62 +167,57 @@ func (n *N) Unpack(ls *lua.LState, buf []byte) lua.LValue { panic("invalid numeric metatype") } -var LuaN = LuaUserDataType[N]{Name: "flatbuffers_n"} +var LuaN = pua.LuaUserDataType[N]{Name: "flatbuffers_n"} func FlatBuffers_N_Loader(ls *lua.LState) int { - m := ls.NewTable() - + mtidx := ls.CreateTable(0, 1) + mtidx.RawSetString("Unpack", ls.NewFunction(NUnpack)) mt := LuaView.NewMetatable(ls) - ls.SetField(mt, "__index", ls.NewFunction(NIndex)) + mt.RawSetString("__index", mtidx) uint16ud := LuaN.New(ls, uint16n) uint32ud := LuaN.New(ls, uint32n) int32ud := LuaN.New(ls, int32n) - ls.SetField(m, "Uint8", LuaN.New(ls, uint8n)) - ls.SetField(m, "Uint16", uint16ud) - ls.SetField(m, "Uint32", uint32ud) - ls.SetField(m, "Uint64", LuaN.New(ls, uint64n)) - ls.SetField(m, "Int8", LuaN.New(ls, int8n)) - ls.SetField(m, "Int16", LuaN.New(ls, int16n)) - ls.SetField(m, "Int32", int32ud) - ls.SetField(m, "Int64", LuaN.New(ls, int64n)) - ls.SetField(m, "Float32", LuaN.New(ls, float32n)) - ls.SetField(m, "Float64", LuaN.New(ls, float64n)) - ls.SetField(m, "Bool", LuaN.New(ls, booln)) - - ls.SetField(m, "UOffsetT", uint32ud) - ls.SetField(m, "VOffsetT", uint16ud) - ls.SetField(m, "SOffsetT", int32ud) + m := ls.NewTable() + m.RawSetString("Uint8", LuaN.New(ls, uint8n)) + m.RawSetString("Uint16", uint16ud) + m.RawSetString("Uint32", uint32ud) + m.RawSetString("Uint64", LuaN.New(ls, uint64n)) + m.RawSetString("Int8", LuaN.New(ls, int8n)) + m.RawSetString("Int16", LuaN.New(ls, int16n)) + m.RawSetString("Int32", int32ud) + m.RawSetString("Int64", LuaN.New(ls, int64n)) + m.RawSetString("Float32", LuaN.New(ls, float32n)) + m.RawSetString("Float64", LuaN.New(ls, float64n)) + m.RawSetString("Bool", LuaN.New(ls, booln)) + + m.RawSetString("UOffsetT", uint32ud) + m.RawSetString("VOffsetT", uint16ud) + m.RawSetString("SOffsetT", int32ud) ls.Push(m) return 1 } -func NIndex(ls *lua.LState) int { - n, key := LuaN.StartIndex(ls) - if key == "Unpack" { - var buf []byte - switch v := ls.Get(1).(type) { - case lua.LString: - buf = []byte(v) - case *lua.LUserData: - ba, ok := v.Value.(BinaryArray) - if ok { - buf = ba.data - } else { - ls.RaiseError("Invalid buf userdata passed to unpack") - return 0 - } - default: - ls.RaiseError("Invalid buf passed to unpack") +func NUnpack(ls *lua.LState) int { + n := LuaN.StartMeta(ls) + pos := max(CheckOffset(ls, 2), 1) + var buf []byte + switch v := ls.Get(1).(type) { + case lua.LString: + buf = []byte(v[pos-1:]) + case *lua.LUserData: + if ba, ok := v.Value.([]byte); ok { + buf = ba[pos-1:] + } else { + ls.RaiseError("Invalid buf userdata passed to unpack") return 0 } - pos := max(CheckOffset(ls, 2), 1) - ls.Push(n.Unpack(ls, buf[pos-1:])) - return 1 - } else { - ls.RaiseError("Unsupported field on N: " + key) + default: + ls.RaiseError("Invalid buf passed to unpack") return 0 } + ls.Push(n.Unpack(ls, buf)) + return 1 } diff --git a/flow/pua/flatbuffers_view.go b/flow/pua/flatbuffers/flatbuffers_view.go similarity index 76% rename from flow/pua/flatbuffers_view.go rename to flow/pua/flatbuffers/flatbuffers_view.go index 985fa84f0d..ec796efd49 100644 --- a/flow/pua/flatbuffers_view.go +++ b/flow/pua/flatbuffers/flatbuffers_view.go @@ -1,24 +1,20 @@ -package pua +package pua_flatbuffers import ( "github.com/yuin/gopher-lua" + + "github.com/PeerDB-io/peer-flow/pua" ) type View struct { - ba BinaryArray + ba []byte pos int // 0-based offset vtable int // 0-based offset vtableEnd uint16 hasv bool } -/* -func (view *View) Get(ls *lua.LState, n N, offset int) lua.LValue { - return n.Unpack(ls, view.ba.data[offset-1:]) -} -*/ - -var LuaView = LuaUserDataType[*View]{Name: "flatbuffers_view"} +var LuaView = pua.LuaUserDataType[*View]{Name: "flatbuffers_view"} func CheckOffset(ls *lua.LState, idx int) int { num := ls.CheckNumber(idx) @@ -41,13 +37,13 @@ func FlatBuffers_View_Loader(ls *lua.LState) int { func ViewNew(ls *lua.LState) int { buf := ls.Get(1) - var ba BinaryArray + var ba []byte switch val := buf.(type) { case lua.LString: - ba = BinaryArray{data: []byte(val)} + ba = []byte(val) case *lua.LUserData: var ok bool - ba, ok = val.Value.(BinaryArray) + ba, ok = val.Value.([]byte) if !ok { ls.RaiseError("invalid buf userdata passed to view.New") return 0 @@ -96,12 +92,12 @@ func ViewIndex(ls *lua.LState) int { func (view *View) Offset(vtoff uint16) uint16 { if !view.hasv { - view.vtable = view.pos - int(int32(int32n.UnpackU64(view.ba.data[view.pos:]))) - view.vtableEnd = uint16(uint16n.UnpackU64(view.ba.data[view.vtable:])) + view.vtable = view.pos - int(int32(int32n.UnpackU64(view.ba[view.pos:]))) + view.vtableEnd = uint16(uint16n.UnpackU64(view.ba[view.vtable:])) view.hasv = true } if vtoff < view.vtableEnd { - return uint16(uint16n.UnpackU64(view.ba.data[view.vtable+int(vtoff):])) + return uint16(uint16n.UnpackU64(view.ba[view.vtable+int(vtoff):])) } else { return 0 } @@ -109,12 +105,12 @@ func (view *View) Offset(vtoff uint16) uint16 { func (view *View) Vector(off int) int { off += view.pos - return off + int(uint32n.UnpackU64(view.ba.data[off:])) + 4 + return off + int(uint32n.UnpackU64(view.ba[off:])) + 4 } func (view *View) VectorLen(off int) uint32 { - off += int(uint32n.UnpackU64(view.ba.data[view.pos+off:])) - return uint32(uint32n.UnpackU64(view.ba.data[off:])) + off += int(uint32n.UnpackU64(view.ba[view.pos+off:])) + return uint32(uint32n.UnpackU64(view.ba[off:])) } func ViewOffset(ls *lua.LState) int { @@ -127,17 +123,17 @@ func ViewOffset(ls *lua.LState) int { func ViewIndirect(ls *lua.LState) int { view := LuaView.StartMeta(ls) off := CheckOffset(ls, 2) - ls.Push(lua.LNumber(off + int(uint32n.UnpackU64(view.ba.data[off:])))) + ls.Push(lua.LNumber(off + int(uint32n.UnpackU64(view.ba[off:])))) return 1 } func ViewString(ls *lua.LState) int { view := LuaView.StartMeta(ls) off := CheckOffset(ls, 2) - off += int(uint32n.UnpackU64(view.ba.data[off:])) + off += int(uint32n.UnpackU64(view.ba[off:])) start := off + 4 - length := int(uint32n.UnpackU64(view.ba.data[off:])) - ls.Push(lua.LString(view.ba.data[start : start+length])) + length := int(uint32n.UnpackU64(view.ba[off:])) + ls.Push(lua.LString(view.ba[start : start+length])) return 1 } @@ -175,7 +171,7 @@ func ViewVectorAsString(ls *lua.LState) int { stop = int(view.VectorLen(int(o))) } a := view.Vector(int(o)) + start - ls.Push(lua.LString(view.ba.data[a : a+stop-start])) + ls.Push(lua.LString(view.ba[a : a+stop-start])) return 1 } @@ -184,7 +180,7 @@ func ViewUnion(ls *lua.LState) int { t2ud, t2 := LuaView.Check(ls, 2) off := CheckOffset(ls, 3) off += view.pos - t2.pos = off + int(uint32n.UnpackU64(view.ba.data[off:])) + t2.pos = off + int(uint32n.UnpackU64(view.ba[off:])) t2.ba = view.ba t2ud.Value = t2 return 0 @@ -194,7 +190,7 @@ func ViewGet(ls *lua.LState) int { view := LuaView.StartMeta(ls) _, n := LuaN.Check(ls, 2) off := CheckOffset(ls, 3) - ls.Push(n.Unpack(ls, view.ba.data[off-1:])) + ls.Push(n.Unpack(ls, view.ba[off-1:])) return 1 } @@ -207,7 +203,7 @@ func ViewGetSlot(ls *lua.LState) int { return 1 } _, validatorFlags := LuaN.Check(ls, 4) - ls.Push(validatorFlags.Unpack(ls, view.ba.data[view.pos+int(off):])) + ls.Push(validatorFlags.Unpack(ls, view.ba[view.pos+int(off):])) return 1 }