diff --git a/flow/pua/peerdb.go b/flow/pua/peerdb.go index ccbd06aed6..7a6dab9652 100644 --- a/flow/pua/peerdb.go +++ b/flow/pua/peerdb.go @@ -214,6 +214,16 @@ func LuaRecordIndex(ls *lua.LState) int { ls.Push(lua.LString(record.GetDestinationTableName())) case "source": ls.Push(lua.LString(record.GetSourceTableName())) + case "unchanged_columns": + if ur, ok := record.(*model.UpdateRecord); ok { + tbl := ls.CreateTable(0, len(ur.UnchangedToastColumns)) + for col := range ur.UnchangedToastColumns { + tbl.RawSetString(col, lua.LTrue) + } + ls.Push(tbl) + } else { + ls.Push(lua.LNil) + } default: return 0 } @@ -221,13 +231,22 @@ func LuaRecordIndex(ls *lua.LState) int { } func LuaRecordJson(ls *lua.LState) int { - ud := ls.Get(1) - tbl := ls.CreateTable(0, 6) + ud := ls.CheckUserData(1) + tbl := ls.CreateTable(0, 7) for _, key := range []string{ "kind", "old", "new", "checkpoint", "commit_time", "source", } { tbl.RawSetString(key, ls.GetField(ud, key)) } + if ur, ok := ud.Value.(*model.UpdateRecord); ok { + if len(ur.UnchangedToastColumns) > 0 { + unchanged := ls.CreateTable(len(ur.UnchangedToastColumns), 0) + for col := range ur.UnchangedToastColumns { + unchanged.Append(lua.LString(col)) + } + tbl.RawSetString("unchanged_columns", unchanged) + } + } ls.Push(tbl) return 1 }