Skip to content

Commit

Permalink
Wire print to LogFlowInfo
Browse files Browse the repository at this point in the history
Fix Prep
__tostring, not __string
implement __tostring for 64 bit ints
  • Loading branch information
serprex committed Mar 15, 2024
1 parent d5747f9 commit 5743350
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 12 deletions.
7 changes: 7 additions & 0 deletions flow/connectors/external_metadata/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func (p *PostgresMetadataStore) Ping(ctx context.Context) error {
return nil
}

func (p *PostgresMetadataStore) LogFlowInfo(ctx context.Context, flowName string, info string) error {
_, err := p.pool.Exec(ctx,
"INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)",
flowName, info, "info")
return err
}

func (p *PostgresMetadataStore) FetchLastOffset(ctx context.Context, jobName string) (int64, error) {
row := p.pool.QueryRow(ctx,
`SELECT last_offset FROM `+
Expand Down
11 changes: 10 additions & 1 deletion flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"log/slog"
"strings"
"sync"

"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -111,7 +112,6 @@ func (c *KafkaConnector) SetupMetadataTables(_ context.Context) error {
}

func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
c.logger.Info("ReplayTableSchemaDeltas for event hub is a no-op")
return nil
}

Expand Down Expand Up @@ -159,6 +159,15 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
}
ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader)
pua.RegisterTypes(ls)
ls.Env.RawSetString("print", ls.NewFunction(func(ls *lua.LState) int {
top := ls.GetTop()
ss := make([]string, top)
for i := range top {
ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String()
}
_ = c.pgMetadata.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t"))
return 0
}))
err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(req.Script))
if err != nil {
return nil, fmt.Errorf("error loading script %s: %w", req.Script, err)
Expand Down
25 changes: 16 additions & 9 deletions flow/pua/flatbuffers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,23 @@ func (b *Builder) Prep(width uint8, additional int) {
b.minalign = width
}
k := len(b.ba.data) - b.head + additional
alignsize := (^k + 1) & int(width-1)
desiredSize := alignsize + int(width) + additional
alignsize := -k & int(width-1)

for b.head < desiredSize {
space := alignsize + int(width) + additional

if b.head < space {
oldBufSize := len(b.ba.data)
newBufSize := oldBufSize + 1
for newBufSize < desiredSize {
newBufSize *= 2
if oldBufSize == 0 {
b.ba.data = make([]byte, space)
b.head = space
} else {
newBufSize := oldBufSize
for newBufSize < oldBufSize+space {
newBufSize *= 2
}
b.ba.Grow(newBufSize)
b.head += newBufSize - oldBufSize
}
b.ba.Grow(newBufSize)
b.head += len(b.ba.data) - oldBufSize
}

b.Pad(alignsize)
Expand Down Expand Up @@ -421,7 +427,8 @@ func BuilderCreateString(ls *lua.LState) int {
b.head -= lens
copy(b.ba.data[b.head:], s)

return b.EndVector(ls, lens)
ls.Push(lua.LNumber(b.EndVector(ls, lens)))
return 1
}

func BuilderSlot(ls *lua.LState) int {
Expand Down
19 changes: 17 additions & 2 deletions flow/pua/peerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -45,20 +46,22 @@ func RegisterTypes(ls *lua.LState) {
mt = LuaQValue.NewMetatable(ls)
mt.RawSetString("__index", ls.NewFunction(LuaQValueIndex))
mt.RawSetString("__len", ls.NewFunction(LuaQValueLen))
mt.RawSetString("__string", ls.NewFunction(LuaQValueString))
mt.RawSetString("__tostring", ls.NewFunction(LuaQValueString))

mt = LuaUuid.NewMetatable(ls)
mt.RawSetString("__index", ls.NewFunction(LuaUuidIndex))
mt.RawSetString("__string", ls.NewFunction(LuaUuidString))
mt.RawSetString("__tostring", ls.NewFunction(LuaUuidString))

mt = LuaI64.NewMetatable(ls)
mt.RawSetString("__index", ls.NewFunction(LuaI64Index))
mt.RawSetString("__tostring", ls.NewFunction(LuaI64String))
mt.RawSetString("__eq", ls.NewFunction(Lua64Eq))
mt.RawSetString("__le", ls.NewFunction(Lua64Le))
mt.RawSetString("__lt", ls.NewFunction(Lua64Lt))

mt = LuaU64.NewMetatable(ls)
mt.RawSetString("__index", ls.NewFunction(LuaU64Index))
mt.RawSetString("__tostring", ls.NewFunction(LuaU64String))
mt.RawSetString("__eq", ls.NewFunction(Lua64Eq))
mt.RawSetString("__le", ls.NewFunction(Lua64Le))
mt.RawSetString("__lt", ls.NewFunction(Lua64Lt))
Expand Down Expand Up @@ -492,6 +495,18 @@ func LuaU64Index(ls *lua.LState) int {
return 1
}

func LuaI64String(ls *lua.LState) int {
i64 := LuaI64.StartMeta(ls)
ls.Push(lua.LString(strconv.FormatInt(i64, 10)))
return 1
}

func LuaU64String(ls *lua.LState) int {
u64 := LuaU64.StartMeta(ls)
ls.Push(lua.LString(strconv.FormatUint(u64, 10)))
return 1
}

func LuaTimeIndex(ls *lua.LState) int {
tm, key := LuaTime.StartIndex(ls)
switch key {
Expand Down

0 comments on commit 5743350

Please sign in to comment.