Skip to content

Commit

Permalink
ruleset: reuse threads
Browse files Browse the repository at this point in the history
Signed-off-by: He Xian <[email protected]>
  • Loading branch information
hexian000 committed Jan 22, 2025
1 parent 3dded77 commit 39a406f
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 84 deletions.
5 changes: 5 additions & 0 deletions src/ruleset.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ static int l_panic(lua_State *L)

static int ruleset_luainit(lua_State *restrict L)
{
/* init registry */
const char *strings[] = {
ERR_MEMORY,
ERR_BAD_REGISTRY,
Expand All @@ -96,6 +97,10 @@ static int ruleset_luainit(lua_State *restrict L)
lua_rawseti(L, -2, i + 1);
}
lua_rawseti(L, LUA_REGISTRYINDEX, RIDX_CONSTANT);
lua_newtable(L); /* await context */
lua_rawseti(L, LUA_REGISTRYINDEX, RIDX_AWAIT_CONTEXT);
aux_newweaktable(L, "k"); /* idle threads */
lua_rawseti(L, LUA_REGISTRYINDEX, RIDX_IDLE_THREAD);
/* load Lua libraries */
luaL_openlibs(L);
/* restrict package searcher */
Expand Down
22 changes: 7 additions & 15 deletions src/ruleset/await.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
} \
} while (0)

#define HAVE_LUA_TOCLOSE (LUA_VERSION_NUM >= 504)

/* [-0, +0, m] */
static void context_pin(lua_State *restrict L, const void *p)
{
Expand Down Expand Up @@ -119,7 +117,7 @@ static void sleep_finish_cb(
static int
await_sleep_k(lua_State *restrict L, const int status, const lua_KContext ctx)
{
CHECK(status == LUA_YIELD);
ASSERT(status == LUA_YIELD);
const int base = (int)ctx;
ASSERT(lua_gettop(L) >= base + 1);
CLOSESLOT(await_sleep_close, base);
Expand Down Expand Up @@ -212,7 +210,7 @@ static void resolve_finish_cb(
static int
await_resolve_k(lua_State *restrict L, const int status, const lua_KContext ctx)
{
CHECK(status == LUA_YIELD);
ASSERT(status == LUA_YIELD);
const int base = (int)ctx;
ASSERT(lua_gettop(L) >= base + 1);
CLOSESLOT(await_resolve_close, base);
Expand Down Expand Up @@ -299,7 +297,7 @@ static void invoke_cb(
static int
await_invoke_k(lua_State *restrict L, const int status, const lua_KContext ctx)
{
CHECK(status == LUA_YIELD);
ASSERT(status == LUA_YIELD);
const int base = (int)ctx;
ASSERT(lua_gettop(L) >= base + 1);
CLOSESLOT(await_invoke_close, base);
Expand All @@ -312,11 +310,13 @@ await_invoke_k(lua_State *restrict L, const int status, const lua_KContext ctx)
const char *errmsg = lua_touserdata(L, base + 2);
const size_t errlen = *(size_t *)lua_touserdata(L, base + 3);
struct stream *stream = lua_touserdata(L, base + 4);
lua_pushboolean(L, errmsg == NULL);
lua_settop(L, base);
if (errmsg != NULL) {
lua_pushboolean(L, 0);
lua_pushlstring(L, errmsg, errlen);
return 2;
}
lua_pushboolean(L, 1);
if (lua_load(L, aux_reader, (void *)stream, "=(rpc)", "t")) {
return lua_error(L);
}
Expand Down Expand Up @@ -429,7 +429,7 @@ static void child_finish_cb(
static int
await_execute_k(lua_State *restrict L, const int status, const lua_KContext ctx)
{
CHECK(status == LUA_YIELD);
ASSERT(status == LUA_YIELD);
const int base = (int)ctx;
ASSERT(lua_gettop(L) >= base + 1);
struct await_execute_userdata *restrict ud = lua_touserdata(L, base);
Expand Down Expand Up @@ -517,14 +517,6 @@ static int await_execute(lua_State *restrict L)

int luaopen_await(lua_State *restrict L)
{
lua_newtable(L); /* async routine */
lua_newtable(L); /* mt */
lua_pushliteral(L, "k");
lua_setfield(L, -2, "__mode");
lua_setmetatable(L, -2);
lua_rawseti(L, LUA_REGISTRYINDEX, RIDX_ASYNC_ROUTINE);
lua_newtable(L); /* await context */
lua_rawseti(L, LUA_REGISTRYINDEX, RIDX_AWAIT_CONTEXT);
const luaL_Reg awaitlib[] = {
{ "execute", await_execute },
{ "invoke", await_invoke },
Expand Down
130 changes: 92 additions & 38 deletions src/ruleset/base.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ struct ruleset *aux_getruleset(lua_State *restrict L)
return ud;
}

void aux_newweaktable(lua_State *restrict L, const char *mode)
{
lua_newtable(L);
lua_newtable(L);
lua_pushstring(L, mode);
lua_setfield(L, -2, "__mode");
lua_setmetatable(L, -2);
}

void aux_getregtable(lua_State *restrict L, const int idx)
{
if (lua_rawgeti(L, LUA_REGISTRYINDEX, idx) != LUA_TTABLE) {
Expand All @@ -40,6 +49,81 @@ void aux_getregtable(lua_State *restrict L, const int idx)
}
}

static void aux_putthread(lua_State *restrict L)
{
aux_getregtable(L, RIDX_IDLE_THREAD);
lua_pushthread(L);
lua_pushboolean(L, 1);
lua_rawset(L, -3);
lua_pop(L, 1);
}

static int thread_main_k(lua_State *L, int status, lua_KContext ctx);

static int
thread_call_k(lua_State *restrict L, int status, const lua_KContext ctx)
{
const int errfunc = (int)ctx;
/* lua stack: errfunc? finish ? ... */
const int n = lua_gettop(L);
const int nargs = n - (errfunc + 1);
ASSERT(nargs >= 1);
lua_pushboolean(L, (status == LUA_OK || status == LUA_YIELD));
lua_replace(L, errfunc + 2);
status = lua_pcall(L, nargs, 0, errfunc);
if (status != LUA_OK && status != LUA_YIELD) {
lua_rawseti(L, LUA_REGISTRYINDEX, RIDX_LASTERROR);
}
aux_putthread(L);
return lua_yieldk(L, 0, ctx, thread_main_k);
}

static int
thread_main_k(lua_State *restrict L, int status, const lua_KContext ctx)
{
ASSERT(status == LUA_YIELD);
const int errfunc = (int)ctx;
/* lua stack: errfunc? finish ? func ... */
const int n = lua_gettop(L);
const int nargs = n - (errfunc + 3);
ASSERT(nargs >= 0);
status = lua_pcallk(L, nargs, LUA_MULTRET, errfunc, ctx, thread_call_k);
return thread_call_k(L, status, ctx);
}

static int thread_main(lua_State *restrict L)
{
lua_settop(L, 0);
int errfunc = 0;
if (G.conf->traceback) {
lua_pushcfunction(L, aux_traceback);
errfunc = lua_absindex(L, -1);
}
return lua_yieldk(L, 0, errfunc, thread_main_k);
}

/* [-0, +1, v] */
lua_State *aux_getthread(lua_State *restrict L)
{
lua_pushnil(L);
aux_getregtable(L, RIDX_IDLE_THREAD);
lua_pushnil(L);
if (lua_next(L, -2)) {
lua_copy(L, -2, -4);
lua_pop(L, 1);
lua_pushnil(L);
lua_rawset(L, -3);
lua_pop(L, 1);
return lua_tothread(L, -1);
}
lua_pop(L, 2);
lua_State *restrict co = lua_newthread(L);
lua_pushcfunction(co, thread_main);
const int status = aux_resume(co, NULL, 0);
ASSERT(status == LUA_YIELD);
return co;
}

const char *aux_reader(lua_State *L, void *ud, size_t *restrict sz)
{
UNUSED(L);
Expand Down Expand Up @@ -165,48 +249,15 @@ int aux_traceback(lua_State *restrict L)
return 1;
}

void aux_resume(lua_State *restrict L, const int tidx, const int narg)
int aux_resume(lua_State *restrict L, lua_State *restrict from, const int narg)
{
lua_State *restrict co = lua_tothread(L, tidx);
ASSERT(co != NULL);
int status, nres;
#if LUA_VERSION_NUM >= 504
status = lua_resume(co, NULL, narg, &nres);
status = lua_resume(L, from, narg, &nres);
#elif LUA_VERSION_NUM == 503
status = lua_resume(co, NULL, narg);
nres = lua_gettop(co);
status = lua_resume(L, from, narg);
#endif
if (status == LUA_YIELD) {
return;
}
/* routine is finished */
aux_getregtable(L, RIDX_ASYNC_ROUTINE);
lua_pushvalue(L, tidx); /* co */
if (lua_rawget(L, -2) == LUA_TNIL) {
/* no finish function */
return;
}
lua_pushvalue(L, tidx);
lua_pushnil(L);
/* lua stack: ... RIDX_ASYNC_ROUTINE finish co nil */
lua_rawset(L, -4);
int errfunc = 0;
if (G.conf->traceback) {
errfunc = lua_absindex(L, -2);
lua_pushcfunction(L, aux_traceback);
lua_replace(L, errfunc);
}
/* lua stack: ... traceback? finish */
if (status != LUA_OK) {
nres = 1;
}
luaL_checkstack(L, 1 + nres, NULL);
lua_pushboolean(L, status == LUA_OK);
lua_xmove(co, L, nres);
/* call finish function */
if (lua_pcall(L, 1 + nres, 0, errfunc) != LUA_OK) {
lua_rawseti(L, LUA_REGISTRYINDEX, RIDX_LASTERROR);
}
return status;
}

static bool ruleset_pcallv(
Expand Down Expand Up @@ -268,5 +319,8 @@ void ruleset_resume(struct ruleset *restrict r, void *ctx, const int narg, ...)
lua_pushlightuserdata(co, va_arg(args, void *));
}
va_end(args);
aux_resume(L, 1, narg);
const int status = aux_resume(co, NULL, narg);
if (status != LUA_OK && status != LUA_YIELD) {
lua_rawseti(co, LUA_REGISTRYINDEX, RIDX_LASTERROR);
}
}
15 changes: 11 additions & 4 deletions src/ruleset/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,29 @@ enum ruleset_ridx {
RIDX_CONSTANT = LUA_RIDX_LAST + 1,
/* last error */
RIDX_LASTERROR,
/* t[coroutine] = finish callback */
RIDX_ASYNC_ROUTINE,
/* t[lightuserdata] = coroutine */
/* t[lightuserdata] = thread */
RIDX_AWAIT_CONTEXT,
/* t[thread] = true */
RIDX_IDLE_THREAD,
};

#define ERR_MEMORY "out of memory"
#define ERR_BAD_REGISTRY "Lua registry is corrupted"
#define ERR_INVALID_INVOKE "invalid invocation target"
#define ERR_NOT_ASYNC_ROUTINE "not in asynchronous routine"

#define HAVE_LUA_TOCLOSE (LUA_VERSION_NUM >= 504)

struct ruleset *aux_getruleset(lua_State *L);

void aux_newweaktable(lua_State *L, const char *mode);

/* [-0, +1, v] */
void aux_getregtable(lua_State *L, int idx);

/* [-0, +1, v] */
lua_State *aux_getthread(lua_State *L);

const char *aux_reader(lua_State *L, void *ud, size_t *sz);

/* [-1, +1, v] */
Expand All @@ -53,7 +60,7 @@ bool aux_todialreq(lua_State *L, int n);

int aux_traceback(lua_State *L);

void aux_resume(lua_State *L, int tidx, int narg);
int aux_resume(lua_State *L, lua_State *from, int narg);

/* main routine */
bool ruleset_pcall(
Expand Down
47 changes: 21 additions & 26 deletions src/ruleset/cfunc.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ int cfunc_request(lua_State *restrict L)
const char *username = lua_touserdata(L, 4);
const char *password = lua_touserdata(L, 5);
const struct ruleset_request_cb *in_cb = lua_touserdata(L, 6);
lua_settop(L, 0);

struct ruleset_state *restrict state =
lua_newuserdata(L, sizeof(struct ruleset_state));
*state = (struct ruleset_state){
Expand All @@ -103,27 +105,21 @@ int cfunc_request(lua_State *restrict L)
lua_setfield(L, -2, "__gc");
}
lua_setmetatable(L, -2);
lua_copy(L, -1, 1);

lua_State *restrict co = lua_newthread(L);
lua_copy(L, -1, 2);
lua_State *restrict co = aux_getthread(L);
lua_pushvalue(L, 1);
lua_pushcclosure(L, request_finish, 1);
lua_xmove(L, co, 1); /* finish */
(void)lua_getglobal(co, "ruleset");
(void)lua_getfield(co, -1, func);
lua_replace(co, -2);
lua_pushstring(co, request);
lua_pushstring(co, username);
lua_pushstring(co, password);

lua_settop(L, 2);
aux_getregtable(L, RIDX_ASYNC_ROUTINE);
lua_pushvalue(L, 2);
lua_pushvalue(L, 1);
lua_pushcclosure(L, request_finish, 1);
lua_rawset(L, -3);

state->request = *in_cb;
*pstate = state;
aux_resume(L, 2, 3);
/* lua stack: state co; co stack: finish ? func request username password */
aux_resume(co, L, 6);
lua_settop(L, 1);
return 1;
}
Expand Down Expand Up @@ -199,6 +195,8 @@ int cfunc_rpcall(lua_State *restrict L)
struct ruleset_state **pstate = lua_touserdata(L, 1);
struct stream *stream = lua_touserdata(L, 2);
const struct ruleset_rpcall_cb *in_cb = lua_touserdata(L, 3);
lua_settop(L, 0);

struct ruleset_state *restrict state =
lua_newuserdata(L, sizeof(struct ruleset_state));
*state = (struct ruleset_state){
Expand All @@ -210,34 +208,31 @@ int cfunc_rpcall(lua_State *restrict L)
lua_setfield(L, -2, "__gc");
}
lua_setmetatable(L, -2);
lua_copy(L, -1, 1);
lua_settop(L, 1);

lua_State *restrict co = lua_newthread(L);
lua_State *restrict co = aux_getthread(L);
lua_pushvalue(L, 1);
lua_pushcclosure(L, rpcall_finish, 1);
lua_pushnil(L);

if (lua_load(L, aux_reader, stream, "=(rpc)", "t")) {
return lua_error(L);
}
lua_newtable(L);
lua_newtable(L);
aux_getregtable(L, LUA_RIDX_GLOBALS);
/* lua stack: state co chunk env mt _G */
/* lua stack: state co finish chunk env mt _G */
lua_setfield(L, -2, "__index");
lua_setmetatable(L, -2);
const char *upvalue = lua_setupvalue(L, -2, 1);
CHECK(upvalue != NULL && strcmp(upvalue, "_ENV") == 0);
/* lua stack: state co finish chunk */

lua_xmove(L, co, 3);
/* lua stack: state co; co stack: finish ? chunk */

aux_getregtable(L, RIDX_ASYNC_ROUTINE);
lua_pushvalue(L, 2);
lua_pushvalue(L, 1);
lua_pushcclosure(L, rpcall_finish, 1);
/* lua stack: state co chunk RIDX_ASYNC_ROUTINE co finish */
lua_rawset(L, -3);
lua_pop(L, 1);
lua_xmove(L, co, 1);
state->rpcall = *in_cb;
*pstate = state;
/* lua stack: state co; co stack: chunk */
aux_resume(L, 2, 0);
aux_resume(co, L, 3);
lua_settop(L, 1);
return 1;
}
Expand Down
Loading

0 comments on commit 39a406f

Please sign in to comment.