From 3293f8ff999ffb08cb7ff1cde42ee084f73dbed0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 11 Apr 2024 20:23:00 +0000 Subject: [PATCH] Lua: add unchanged_columns field to Record (#1608) This allows disambiguating a null value as unchanged for toast columns Ignore UnchangedToastColumns on DeleteRecord since queues don't care about soft delete --- flow/pua/peerdb.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index ccbd06aed6..7a6dab9652 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -214,6 +214,16 @@ func LuaRecordIndex(ls *lua.LState) int { ls.Push(lua.LString(record.GetDestinationTableName())) case "source": ls.Push(lua.LString(record.GetSourceTableName())) + case "unchanged_columns": + if ur, ok := record.(*model.UpdateRecord); ok { + tbl := ls.CreateTable(0, len(ur.UnchangedToastColumns)) + for col := range ur.UnchangedToastColumns { + tbl.RawSetString(col, lua.LTrue) + } + ls.Push(tbl) + } else { + ls.Push(lua.LNil) + } default: return 0 } @@ -221,13 +231,22 @@ func LuaRecordIndex(ls *lua.LState) int { } func LuaRecordJson(ls *lua.LState) int { - ud := ls.Get(1) - tbl := ls.CreateTable(0, 6) + ud := ls.CheckUserData(1) + tbl := ls.CreateTable(0, 7) for _, key := range []string{ "kind", "old", "new", "checkpoint", "commit_time", "source", } { tbl.RawSetString(key, ls.GetField(ud, key)) } + if ur, ok := ud.Value.(*model.UpdateRecord); ok { + if len(ur.UnchangedToastColumns) > 0 { + unchanged := ls.CreateTable(len(ur.UnchangedToastColumns), 0) + for col := range ur.UnchangedToastColumns { + unchanged.Append(lua.LString(col)) + } + tbl.RawSetString("unchanged_columns", unchanged) + } + } ls.Push(tbl) return 1 }