Skip to content

Commit

Permalink
Move flatbuffers into its own package, fix stack not being cleared in…
Browse files Browse the repository at this point in the history
… kafka code
  • Loading branch information
serprex committed Mar 19, 2024
1 parent d1c0a89 commit 7f010dc
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 126 deletions.
12 changes: 7 additions & 5 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
"github.com/PeerDB-io/peer-flow/pua/flatbuffers"
)

type KafkaConnector struct {
Expand Down Expand Up @@ -137,7 +138,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
var fn *lua.LFunction
var ls *lua.LState
if req.Script != "" {
ls = lua.NewState(lua.Options{SkipOpenLibs: true, IncludeGoStackTrace: true})
ls = lua.NewState(lua.Options{SkipOpenLibs: true})
defer ls.Close()
ls.SetContext(wgCtx)
for _, pair := range []struct {
Expand All @@ -157,7 +158,7 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
return nil, fmt.Errorf("failed to initialize Lua runtime: %w", err)
}
}
ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader)
ls.PreloadModule("flatbuffers", pua_flatbuffers.Loader)
pua.RegisterTypes(ls)
ls.Env.RawSetString("print", ls.NewFunction(func(ls *lua.LState) int {
top := ls.GetTop()
Expand Down Expand Up @@ -191,22 +192,23 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
if err := wgCtx.Err(); err != nil {
return nil, err
}
topic := record.GetDestinationTableName()
ls.Push(fn)
ls.Push(pua.LuaRecord.New(ls, record))
err := ls.PCall(1, 1, nil)
if err != nil {
return nil, fmt.Errorf("script failed: %w", err)
}
value := ls.Get(-1)
ls.SetTop(0)
if value != lua.LNil {
lstr, ok := value.(lua.LString)
if !ok {
return nil, fmt.Errorf("script returned non-nil non-string: %v", value)
return nil, fmt.Errorf("script returned non-nil non-string: %s", value)
}

wg.Add(1)
topic := record.GetDestinationTableName()
c.client.Produce(wgCtx, &kgo.Record{Topic: topic, Value: []byte(lstr)}, produceCb)

numRecords += 1
tableNameRowsMapping[topic] += 1
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package pua
package pua_flatbuffers

import (
"github.com/yuin/gopher-lua"
)

func FlatBuffers_Loader(ls *lua.LState) int {
func Loader(ls *lua.LState) int {
ls.PreloadModule("flatbuffers.binaryarray", FlatBuffers_BinaryArray_Loader)
ls.PreloadModule("flatbuffers.builder", FlatBuffers_Builder_Loader)
ls.PreloadModule("flatbuffers.numTypes", FlatBuffers_N_Loader)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package pua
package pua_flatbuffers

import (
"github.com/yuin/gopher-lua"
)

type BinaryArray struct {
data []byte
}
"github.com/PeerDB-io/peer-flow/pua"
)

var LuaBinaryArray = LuaUserDataType[BinaryArray]{Name: "flatbuffers_binaryarray"}
var LuaBinaryArray = pua.LuaUserDataType[[]byte]{Name: "flatbuffers_binaryarray"}

func FlatBuffers_BinaryArray_Loader(ls *lua.LState) int {
m := ls.NewTable()
Expand All @@ -28,16 +26,12 @@ func FlatBuffers_BinaryArray_Loader(ls *lua.LState) int {

func BinaryArrayNew(ls *lua.LState) int {
lval := ls.Get(1)
var ba BinaryArray
var ba []byte
switch val := lval.(type) {
case lua.LString:
ba = BinaryArray{
data: []byte(val),
}
ba = []byte(val)
case lua.LNumber:
ba = BinaryArray{
data: make([]byte, int(val)),
}
ba = make([]byte, int(val))
default:
ls.RaiseError("Expect a integer size value or string to construct a binary array")
return 0
Expand All @@ -48,17 +42,17 @@ func BinaryArrayNew(ls *lua.LState) int {

func BinaryArrayLen(ls *lua.LState) int {
ba := LuaBinaryArray.StartMeta(ls)
ls.Push(lua.LNumber(len(ba.data)))
ls.Push(lua.LNumber(len(ba)))
return 1
}

func BinaryArrayIndex(ls *lua.LState) int {
ba, key := LuaBinaryArray.StartIndex(ls)
switch key {
case "size":
ls.Push(lua.LNumber(len(ba.data)))
ls.Push(lua.LNumber(len(ba)))
case "str":
ls.Push(lua.LString(ba.data))
ls.Push(lua.LString(ba))
case "data":
ls.RaiseError("BinaryArray data property inaccessible")
return 0
Expand All @@ -77,37 +71,36 @@ func BinaryArraySlice(ls *lua.LState) int {
startPos = 0
}
if luaEndPos, ok := ls.Get(3).(lua.LNumber); ok {
endPos = min(int(luaEndPos), len(ba.data))
endPos = min(int(luaEndPos), len(ba))
} else {
endPos = len(ba.data)
endPos = len(ba)
}
ls.Push(lua.LString(ba.data[startPos:endPos]))
ls.Push(lua.LString(ba[startPos:endPos]))
return 1
}

func BinaryArrayGrow(ls *lua.LState) int {
baud, ba := LuaBinaryArray.Check(ls, 1)
newsize := int(ls.CheckNumber(2))
if newsize > len(ba.data) {
if newsize > len(ba) {
newdata := make([]byte, newsize)
copy(newdata[newsize-len(ba.data):], ba.data)
ba.data = newdata
baud.Value = ba
copy(newdata[newsize-len(ba):], ba)
baud.Value = newdata
}
return 0
}

func (ba *BinaryArray) Pad(n int, start int) {
func Pad(ba []byte, n int, start int) {
for i := range n {
ba.data[start+i] = 0
ba[start+i] = 0
}
}

func BinaryArrayPad(ls *lua.LState) int {
ba := LuaBinaryArray.StartMeta(ls)
n := int(ls.CheckNumber(2))
startPos := int(ls.CheckNumber(3))
ba.Pad(n, startPos)
Pad(ba, n, startPos)
return 0
}

Expand All @@ -116,10 +109,10 @@ func BinaryArraySet(ls *lua.LState) int {
idx := int(ls.CheckNumber(3))
value := ls.Get(2)
if num, ok := value.(lua.LNumber); ok {
ba.data[idx] = byte(num)
ba[idx] = byte(num)
}
if str, ok := value.(lua.LString); ok {
ba.data[idx] = str[0]
ba[idx] = str[0]
}
return 0
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package pua
package pua_flatbuffers

import (
"slices"

"github.com/yuin/gopher-lua"

"github.com/PeerDB-io/peer-flow/pua"
)

const VtableMetadataFields int = 2

type Builder struct {
ba BinaryArray
ba []byte
vtables []int
currentVT []int
head int
Expand All @@ -30,24 +32,24 @@ func (b *Builder) EndVector(ls *lua.LState, vectorSize int) int {
}

func (b *Builder) Offset() int {
return len(b.ba.data) - b.head
return len(b.ba) - b.head
}

func (b *Builder) Pad(pad int) {
if pad > 0 {
b.head -= pad
b.ba.Pad(pad, b.head)
Pad(b.ba, pad, b.head)
}
}

func (b *Builder) Place(ls *lua.LState, x lua.LValue, n N) {
b.head -= int(n.width)
n.Pack(ls, b.ba.data[b.head:], x)
n.Pack(ls, b.ba[b.head:], x)
}

func (b *Builder) PlaceU64(u64 uint64, n N) {
b.head -= int(n.width)
n.PackU64(b.ba.data[b.head:], u64)
n.PackU64(b.ba[b.head:], u64)
}

func (b *Builder) Prep(width uint8, additional int) {
Expand All @@ -60,12 +62,12 @@ func (b *Builder) Prep(width uint8, additional int) {
space := alignsize + int(width) + additional

if b.head < space {
oldlen := len(b.ba.data)
newdata := slices.Grow(b.ba.data, space)
oldlen := len(b.ba)
newdata := slices.Grow(b.ba, space)
newdata = newdata[:cap(newdata)]
copy(newdata[len(newdata)-oldlen:], newdata[:oldlen])
b.head += len(newdata) - oldlen
b.ba.data = newdata
b.ba = newdata
}

b.Pad(alignsize)
Expand Down Expand Up @@ -164,9 +166,9 @@ func (b *Builder) WriteVtable(ls *lua.LState) int {
var existingVtable int
for i := len(b.vtables) - 1; i >= 0; i -= 1 {
vt2Offset := b.vtables[i]
vt2Start := len(b.ba.data) - vt2Offset
vt2Len := uint16n.UnpackU64(b.ba.data[vt2Start:])
vt2 := b.ba.data[vt2Start+VtableMetadataFields*2 : vt2Start+int(vt2Len)]
vt2Start := len(b.ba) - vt2Offset
vt2Len := uint16n.UnpackU64(b.ba[vt2Start:])
vt2 := b.ba[vt2Start+VtableMetadataFields*2 : vt2Start+int(vt2Len)]
if vtableEqual(b.currentVT, objectOffset, vt2) {
existingVtable = vt2Offset
break
Expand All @@ -187,11 +189,11 @@ func (b *Builder) WriteVtable(ls *lua.LState) int {
b.PrependVOffsetT(uint16(len(b.currentVT)+VtableMetadataFields) * 2)

newOffset := b.Offset()
int32n.PackU64(b.ba.data[len(b.ba.data)-objectOffset:], uint64(newOffset-objectOffset))
int32n.PackU64(b.ba[len(b.ba)-objectOffset:], uint64(newOffset-objectOffset))
b.vtables = append(b.vtables, newOffset)
} else {
b.head = len(b.ba.data) - objectOffset
int32n.PackU64(b.ba.data[b.head:], uint64(existingVtable-objectOffset))
b.head = len(b.ba) - objectOffset
int32n.PackU64(b.ba[b.head:], uint64(existingVtable-objectOffset))
}

if len(b.currentVT) != 0 {
Expand All @@ -200,7 +202,7 @@ func (b *Builder) WriteVtable(ls *lua.LState) int {
return objectOffset
}

var LuaBuilder = LuaUserDataType[*Builder]{Name: "flatbuffers_builder"}
var LuaBuilder = pua.LuaUserDataType[*Builder]{Name: "flatbuffers_builder"}

func FlatBuffers_Builder_Loader(ls *lua.LState) int {
m := ls.NewTable()
Expand Down Expand Up @@ -269,7 +271,7 @@ func BuilderNew(ls *lua.LState) int {
initialSize := int(ls.CheckNumber(1))

ls.Push(LuaBuilder.New(ls, &Builder{
ba: BinaryArray{data: make([]byte, initialSize)},
ba: make([]byte, initialSize),
vtables: make([]int, 0, 4),
currentVT: make([]int, 0, 4),
head: initialSize,
Expand All @@ -291,16 +293,16 @@ func BuilderClear(ls *lua.LState) int {
}
b.currentVT = b.currentVT[:0]
b.objectEnd = 0
b.head = len(b.ba.data)
b.head = len(b.ba)
return 0
}

func BuilderOutput(ls *lua.LState) int {
b := LuaBuilder.StartMeta(ls)
if lua.LVIsFalse(ls.Get(2)) {
ls.Push(lua.LString(b.ba.data[b.head:]))
ls.Push(lua.LString(b.ba[b.head:]))
} else {
ls.Push(lua.LString(b.ba.data))
ls.Push(lua.LString(b.ba))
}
return 1
}
Expand Down Expand Up @@ -419,7 +421,7 @@ func createBytesHelper(ls *lua.LState, addnul bool) int {
b.Prep(4, lens)
}
b.head -= lens
copy(b.ba.data[b.head:], s)
copy(b.ba[b.head:], s)

ls.Push(lua.LNumber(b.EndVector(ls, lens)))
return 1
Expand Down
Loading

0 comments on commit 7f010dc

Please sign in to comment.