diff --git a/src/core/interpreter.cc b/src/core/interpreter.cc index b5ec02033805..39baf8cf9ed7 100644 --- a/src/core/interpreter.cc +++ b/src/core/interpreter.cc @@ -256,8 +256,10 @@ void SetGlobalArrayInternal(lua_State* lua, const char* name, Interpreter::Slice /* In case the error set into the Lua stack by PushError() was generated * by the non-error-trapping version of redis.pcall(), which is redis.call(), * this function will raise the Lua error so that the execution of the - * script will be halted. */ -int RaiseError(lua_State* lua) { + * script will be halted. + * This function never returns, it unwinds the Lua call stack until an error handler is found or the + * script exits */ +int RaiseErrorAndAbort(lua_State* lua) { lua_pushstring(lua, "err"); lua_gettable(lua, -2); return lua_error(lua); @@ -467,7 +469,7 @@ int RedisLogCommand(lua_State* lua) { int argc = lua_gettop(lua); if (argc < 2) { PushError(lua, "redis.log() requires two arguments or more."); - return RaiseError(lua); + return RaiseErrorAndAbort(lua); } return 0; @@ -891,40 +893,12 @@ void Interpreter::RunGC() { lua_gc(lua_, LUA_GCCOLLECT); } -// Returns number of results, which is always 1 in this case. -// Please note that lua resets the stack once the function returns so no need -// to unwind the stack manually in the function (though lua allows doing this). -int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplorer* explorer) { - /* By using Lua debug hooks it is possible to trigger a recursive call - * to luaRedisGenericCommand(), which normally should never happen. - * To make this function reentrant is futile and makes it slower, but - * we should at least detect such a misuse, and abort. */ - if (cmd_depth_) { - const char* recursion_warning = - "luaRedisGenericCommand() recursive call detected. " - "Are you doing funny stuff with Lua debug hooks?"; - PushError(lua_, recursion_warning); - return 1; - } - - if (!redis_func_) { - PushError(lua_, "internal error - redis function not defined"); - return raise_error ? RaiseError(lua_) : 1; - } - - cmd_depth_++; +std::optional> Interpreter::PrepareArgs() { int argc = lua_gettop(lua_); - -#define RETURN_ERROR(err) \ - { \ - PushError(lua_, err); \ - cmd_depth_--; \ - return raise_error ? RaiseError(lua_) : 1; \ - } - /* Require at least one argument */ if (argc == 0) { - RETURN_ERROR("Please specify at least one argument for redis.call()"); + PushError(lua_, "Please specify at least one argument for redis.call()"); + return std::nullopt; } size_t blob_len = 0; @@ -947,21 +921,22 @@ int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplore blob_len += lua_rawlen(lua_, idx) + 1; continue; default: - RETURN_ERROR("Lua redis() command arguments must be strings or integers"); + PushError(lua_, "Lua redis() command arguments must be strings or integers"); + return std::nullopt; } } - char name_buffer[32]; // backing storage for cmd name absl::FixedArray args(argc); // Copy command name to name_buffer and set it as first arg. unsigned name_len = lua_rawlen(lua_, 1); - if (name_len >= sizeof(name_buffer)) { - RETURN_ERROR("Lua redis() command name too long"); + if (name_len >= sizeof(name_buffer_)) { + PushError(lua_, "Lua redis() command name too long"); + return std::nullopt; } - memcpy(name_buffer, lua_tostring(lua_, 1), name_len); - args[0] = {name_buffer, name_len}; + memcpy(name_buffer_, lua_tostring(lua_, 1), name_len); + args[0] = {name_buffer_, name_len}; buffer_.resize(blob_len + 4, '\0'); // backing storage for args char* cur = buffer_.data(); @@ -993,7 +968,13 @@ int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplore /* Pop all arguments from the stack, we do not need them anymore * and this way we guaranty we will have room on the stack for the result. */ lua_pop(lua_, argc); + return args; +} +// Calls redis function +// Returns false if error needs to be raised. +bool Interpreter::CallRedisFunction(bool raise_error, bool async, ObjectExplorer* explorer, + SliceSpan args) { // Calling with custom explorer is not supported with errors or async DCHECK(explorer == nullptr || (!raise_error && !async)); @@ -1003,8 +984,8 @@ int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplore translator.emplace(lua_); explorer = &*translator; } - - redis_func_(CallArgs{SliceSpan{args}, &buffer_, explorer, async, raise_error, &raise_error}); + cmd_depth_++; + redis_func_(CallArgs{args, &buffer_, explorer, async, raise_error, &raise_error}); cmd_depth_--; // Shrink reusable buffer if it's too big. @@ -1014,18 +995,57 @@ int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplore } if (!translator) - return 0; + return true; // Raise error for regular 'call' command if needed. if (raise_error && translator->HasError()) { // error is already on top of stack - return RaiseError(lua_); + return false; } if (!async) DCHECK_EQ(1, lua_gettop(lua_)); - return 1; + return true; +} + +// Returns number of results, which is always 1 in this case. +// Please note that lua resets the stack once the function returns so no need +// to unwind the stack manually in the function (though lua allows doing this). +int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplorer* explorer) { + /* By using Lua debug hooks it is possible to trigger a recursive call + * to luaRedisGenericCommand(), which normally should never happen. + * To make this function reentrant is futile and makes it slower, but + * we should at least detect such a misuse, and abort. */ + if (cmd_depth_) { + const char* recursion_warning = + "luaRedisGenericCommand() recursive call detected. " + "Are you doing funny stuff with Lua debug hooks?"; + PushError(lua_, recursion_warning); + return 1; + } + + if (!redis_func_) { + PushError(lua_, "internal error - redis function not defined"); + if (raise_error) { + return RaiseErrorAndAbort(lua_); + } + return 1; + } + + // IMPORTANT! all allocations within this funciton must be freed + // BEFORE calling RaiseErrorAndAbort in case of script error. RaiseErrorAndAbort + // uses longjmp which bypasses stack unwinding and skips the destruction of objects. + { + std::optional> args = PrepareArgs(); + if (args.has_value()) { + raise_error = !CallRedisFunction(raise_error, async, explorer, SliceSpan{*args}); + } + } + if (!raise_error) { + return 1; + } + return RaiseErrorAndAbort(lua_); // this function never returns, it unwinds the Lua call stack } int Interpreter::RedisCallCommand(lua_State* lua) { diff --git a/src/core/interpreter.h b/src/core/interpreter.h index 27448cbf6dda..aeeac0a4085e 100644 --- a/src/core/interpreter.h +++ b/src/core/interpreter.h @@ -4,6 +4,7 @@ #pragma once +#include #include #include @@ -139,10 +140,14 @@ class Interpreter { static int RedisACallCommand(lua_State* lua); static int RedisAPCallCommand(lua_State* lua); + std::optional> PrepareArgs(); + bool CallRedisFunction(bool raise_error, bool async, ObjectExplorer* explorer, SliceSpan args); + lua_State* lua_; unsigned cmd_depth_ = 0; RedisFunc redis_func_; std::string buffer_; + char name_buffer_[32]; // backing storage for cmd name }; // Manages an internal interpreter pool. This allows multiple connections residing on the same diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index d52b266c3213..1105f1d83408 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -114,3 +114,49 @@ async def test_rss_oom_ratio(df_factory: DflyInstanceFactory, admin_port): # new client create shoud not fail after memory usage decrease client = df_server.client() await client.execute_command("set x y") + + +@pytest.mark.asyncio +@dfly_args( + { + "maxmemory": "512mb", + "proactor_threads": 1, + } +) +async def test_eval_with_oom(df_factory: DflyInstanceFactory): + """ + Test running eval commands when dragonfly returns OOM on write commands and check rss memory + This test was writen after detecting memory leak in script runs on OOM state + """ + df_server = df_factory.create() + df_server.start() + + client = df_server.client() + await client.execute_command("DEBUG POPULATE 20000 key 40000 RAND") + + await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly + + info = await client.info("memory") + logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') + + reject_limit = 512 * 1024 * 1024 # 256mb + assert info["used_memory"] > reject_limit + rss_before_eval = info["used_memory_rss"] + + pipe = client.pipeline(transaction=False) + MSET_SCRIPT = """ + redis.call('MSET', KEYS[1], ARGV[1], KEYS[2], ARGV[2]) + """ + + for _ in range(20): + for _ in range(8000): + pipe.eval(MSET_SCRIPT, 2, "x1", "y1", "x2", "y2") + # reject mset due to oom + with pytest.raises(redis.exceptions.ResponseError): + await pipe.execute() + + await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly + + info = await client.info("memory") + logging.debug(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') + assert rss_before_eval * 1.01 > info["used_memory_rss"]