From 57433508b1cb1f25e267de51b8c9cc10056482e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 15 Mar 2024 17:17:56 +0000 Subject: [PATCH] Wire print to LogFlowInfo Fix Prep __tostring, not __string implement __tostring for 64 bit ints --- flow/connectors/external_metadata/store.go | 7 ++++++ flow/connectors/kafka/kafka.go | 11 +++++++++- flow/pua/flatbuffers_builder.go | 25 ++++++++++++++-------- flow/pua/peerdb.go | 19 ++++++++++++++-- 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/flow/connectors/external_metadata/store.go b/flow/connectors/external_metadata/store.go index 5fffec0f69..bbe986b497 100644 --- a/flow/connectors/external_metadata/store.go +++ b/flow/connectors/external_metadata/store.go @@ -55,6 +55,13 @@ func (p *PostgresMetadataStore) Ping(ctx context.Context) error { return nil } +func (p *PostgresMetadataStore) LogFlowInfo(ctx context.Context, flowName string, info string) error { + _, err := p.pool.Exec(ctx, + "INSERT INTO peerdb_stats.flow_errors(flow_name,error_message,error_type) VALUES($1,$2,$3)", + flowName, info, "info") + return err +} + func (p *PostgresMetadataStore) FetchLastOffset(ctx context.Context, jobName string) (int64, error) { row := p.pool.QueryRow(ctx, `SELECT last_offset FROM `+ diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index 6db283a11f..920d14c872 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log/slog" + "strings" "sync" "github.com/twmb/franz-go/pkg/kgo" @@ -111,7 +112,6 @@ func (c *KafkaConnector) SetupMetadataTables(_ context.Context) error { } func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error { - c.logger.Info("ReplayTableSchemaDeltas for event hub is a no-op") return nil } @@ -159,6 +159,15 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords } ls.PreloadModule("flatbuffers", pua.FlatBuffers_Loader) pua.RegisterTypes(ls) + ls.Env.RawSetString("print", ls.NewFunction(func(ls *lua.LState) int { + top := ls.GetTop() + ss := make([]string, top) + for i := range top { + ss[i] = ls.ToStringMeta(ls.Get(i + 1)).String() + } + _ = c.pgMetadata.LogFlowInfo(ctx, req.FlowJobName, strings.Join(ss, "\t")) + return 0 + })) err := ls.GPCall(pua.LoadPeerdbScript, lua.LString(req.Script)) if err != nil { return nil, fmt.Errorf("error loading script %s: %w", req.Script, err) diff --git a/flow/pua/flatbuffers_builder.go b/flow/pua/flatbuffers_builder.go index 91afbccc84..828fbbdcfd 100644 --- a/flow/pua/flatbuffers_builder.go +++ b/flow/pua/flatbuffers_builder.go @@ -53,17 +53,23 @@ func (b *Builder) Prep(width uint8, additional int) { b.minalign = width } k := len(b.ba.data) - b.head + additional - alignsize := (^k + 1) & int(width-1) - desiredSize := alignsize + int(width) + additional + alignsize := -k & int(width-1) - for b.head < desiredSize { + space := alignsize + int(width) + additional + + if b.head < space { oldBufSize := len(b.ba.data) - newBufSize := oldBufSize + 1 - for newBufSize < desiredSize { - newBufSize *= 2 + if oldBufSize == 0 { + b.ba.data = make([]byte, space) + b.head = space + } else { + newBufSize := oldBufSize + for newBufSize < oldBufSize+space { + newBufSize *= 2 + } + b.ba.Grow(newBufSize) + b.head += newBufSize - oldBufSize } - b.ba.Grow(newBufSize) - b.head += len(b.ba.data) - oldBufSize } b.Pad(alignsize) @@ -421,7 +427,8 @@ func BuilderCreateString(ls *lua.LState) int { b.head -= lens copy(b.ba.data[b.head:], s) - return b.EndVector(ls, lens) + ls.Push(lua.LNumber(b.EndVector(ls, lens))) + return 1 } func BuilderSlot(ls *lua.LState) int { diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index 9f0c852516..8c1cefaf73 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "reflect" + "strconv" "strings" "time" @@ -45,20 +46,22 @@ func RegisterTypes(ls *lua.LState) { mt = LuaQValue.NewMetatable(ls) mt.RawSetString("__index", ls.NewFunction(LuaQValueIndex)) mt.RawSetString("__len", ls.NewFunction(LuaQValueLen)) - mt.RawSetString("__string", ls.NewFunction(LuaQValueString)) + mt.RawSetString("__tostring", ls.NewFunction(LuaQValueString)) mt = LuaUuid.NewMetatable(ls) mt.RawSetString("__index", ls.NewFunction(LuaUuidIndex)) - mt.RawSetString("__string", ls.NewFunction(LuaUuidString)) + mt.RawSetString("__tostring", ls.NewFunction(LuaUuidString)) mt = LuaI64.NewMetatable(ls) mt.RawSetString("__index", ls.NewFunction(LuaI64Index)) + mt.RawSetString("__tostring", ls.NewFunction(LuaI64String)) mt.RawSetString("__eq", ls.NewFunction(Lua64Eq)) mt.RawSetString("__le", ls.NewFunction(Lua64Le)) mt.RawSetString("__lt", ls.NewFunction(Lua64Lt)) mt = LuaU64.NewMetatable(ls) mt.RawSetString("__index", ls.NewFunction(LuaU64Index)) + mt.RawSetString("__tostring", ls.NewFunction(LuaU64String)) mt.RawSetString("__eq", ls.NewFunction(Lua64Eq)) mt.RawSetString("__le", ls.NewFunction(Lua64Le)) mt.RawSetString("__lt", ls.NewFunction(Lua64Lt)) @@ -492,6 +495,18 @@ func LuaU64Index(ls *lua.LState) int { return 1 } +func LuaI64String(ls *lua.LState) int { + i64 := LuaI64.StartMeta(ls) + ls.Push(lua.LString(strconv.FormatInt(i64, 10))) + return 1 +} + +func LuaU64String(ls *lua.LState) int { + u64 := LuaU64.StartMeta(ls) + ls.Push(lua.LString(strconv.FormatUint(u64, 10))) + return 1 +} + func LuaTimeIndex(ls *lua.LState) int { tm, key := LuaTime.StartIndex(ls) switch key {