Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxy-wasm) foreign function support #626

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ NGX_WASMX_DEPS="\
$ngx_addon_dir/src/common/shm/ngx_wa_shm_queue.h \
$ngx_addon_dir/src/common/metrics/ngx_wa_metrics.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_foreign_callback.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_maps.h \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.h"

Expand All @@ -158,6 +159,7 @@ NGX_WASMX_SRCS="\
$ngx_addon_dir/src/common/metrics/ngx_wa_metrics.c \
$ngx_addon_dir/src/common/metrics/ngx_wa_histogram.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_foreign_callback.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_host.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_maps.c \
$ngx_addon_dir/src/common/proxy_wasm/ngx_proxy_wasm_properties.c \
Expand Down
44 changes: 44 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ngx_proxy_wasm.h>
#include <ngx_proxy_wasm_properties.h>
#include <ngx_proxy_wasm_foreign_callback.h>
#ifdef NGX_WASM_HTTP
#include <ngx_http_proxy_wasm.h>
#endif
Expand Down Expand Up @@ -839,6 +840,9 @@ ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
case NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE:
rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK:
rc = filter->subsystem->resume(pwexec, step, &action);
break;
case NGX_PROXY_WASM_STEP_TICK:
pwexec->in_tick = 1;
rc = ngx_proxy_wasm_on_tick(pwexec);
Expand Down Expand Up @@ -927,6 +931,45 @@ ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec)
}


ngx_uint_t
ngx_proxy_wasm_foreign_callbacks_total(ngx_proxy_wasm_exec_t *pwexec)
{
ngx_queue_t *q;
ngx_uint_t n = 0;

for (q = ngx_queue_head(&pwexec->fcallbacks);
q != ngx_queue_sentinel(&pwexec->fcallbacks);
q = ngx_queue_next(q), n++) { /* void */ }

dd("n: %ld", n);

return n;
}


void
ngx_proxy_wasm_foreign_callbacks_cancel(ngx_proxy_wasm_exec_t *pwexec)
{
#ifdef NGX_WASM_HTTP
ngx_queue_t *q;
ngx_proxy_wasm_foreign_cb_t *cb;

while (!ngx_queue_empty(&pwexec->fcallbacks)) {
q = ngx_queue_head(&pwexec->fcallbacks);
cb = ngx_queue_data(q, ngx_proxy_wasm_foreign_cb_t, q);

ngx_log_debug1(NGX_LOG_DEBUG_ALL, pwexec->log, 0,
"proxy_wasm foreign function callback cancelled"
" (callback: %p)", cb);

ngx_queue_remove(&cb->q);

ngx_proxy_wasm_foreign_callback_destroy(cb);
}
#endif
}


/* host handlers */


Expand Down Expand Up @@ -1263,6 +1306,7 @@ ngx_proxy_wasm_create_context(ngx_proxy_wasm_filter_t *filter,
pwexec->store = ictx->store;

ngx_queue_init(&pwexec->calls);
ngx_queue_init(&pwexec->fcallbacks);

} else {
if (in->ictx != ictx) {
Expand Down
60 changes: 36 additions & 24 deletions src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ typedef enum {
NGX_PROXY_WASM_STEP_DONE,
NGX_PROXY_WASM_STEP_TICK,
NGX_PROXY_WASM_STEP_DISPATCH_RESPONSE,
NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK,
} ngx_proxy_wasm_step_e;


Expand Down Expand Up @@ -124,7 +125,7 @@ typedef enum {
NGX_PROXY_WASM_BUFFER_GRPC_RECEIVE_BUFFER = 5,
NGX_PROXY_WASM_BUFFER_VM_CONFIGURATION = 6,
NGX_PROXY_WASM_BUFFER_PLUGIN_CONFIGURATION = 7,
NGX_PROXY_WASM_BUFFER_CALL_DATA = 8,
NGX_PROXY_WASM_BUFFER_FOREIGN_FUNCTION_ARGUMENTS = 8,
} ngx_proxy_wasm_buffer_type_e;


Expand All @@ -147,50 +148,58 @@ typedef enum {
} ngx_proxy_wasm_metric_type_e;


typedef enum {
NGX_PROXY_WASM_FOREIGN_RESOLVE = 0,
} ngx_proxy_wasm_foreign_function_e;


typedef struct ngx_proxy_wasm_ctx_s ngx_proxy_wasm_ctx_t;
typedef struct ngx_proxy_wasm_filter_s ngx_proxy_wasm_filter_t;
typedef struct ngx_proxy_wasm_exec_s ngx_proxy_wasm_exec_t;
typedef struct ngx_proxy_wasm_instance_s ngx_proxy_wasm_instance_t;
#ifdef NGX_WASM_HTTP
typedef struct ngx_http_proxy_wasm_dispatch_s ngx_http_proxy_wasm_dispatch_t;
#endif
typedef struct ngx_proxy_wasm_foreign_cb_s ngx_proxy_wasm_foreign_cb_t;
typedef ngx_str_t ngx_proxy_wasm_marshalled_map_t;


typedef struct {
ngx_queue_t busy;
ngx_queue_t free;
ngx_queue_t sweep;
ngx_pool_t *pool;
ngx_queue_t busy;
ngx_queue_t free;
ngx_queue_t sweep;
ngx_pool_t *pool;
} ngx_proxy_wasm_store_t;


typedef struct {
ngx_str_t log_prefix;
ngx_log_t *orig_log;
ngx_proxy_wasm_exec_t *pwexec;
ngx_str_t log_prefix;
ngx_log_t *orig_log;
ngx_proxy_wasm_exec_t *pwexec;
} ngx_proxy_wasm_log_ctx_t;


struct ngx_proxy_wasm_exec_s {
ngx_uint_t root_id;
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_proxy_wasm_log_ctx_t log_ctx;
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
ngx_uint_t root_id;
ngx_uint_t id;
ngx_uint_t index;
ngx_uint_t tick_period;
ngx_rbtree_node_t node;
ngx_proxy_wasm_err_e ecode;
ngx_pool_t *pool;
ngx_log_t *log;
ngx_proxy_wasm_log_ctx_t log_ctx;
ngx_proxy_wasm_ctx_t *parent;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_instance_t *ictx;
ngx_proxy_wasm_store_t *store;
ngx_event_t *ev;
#ifdef NGX_WASM_HTTP
ngx_http_proxy_wasm_dispatch_t *call; /* swap pointer for host functions */
ngx_http_proxy_wasm_dispatch_t *call; /* swap pointer for host functions */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatch_call

#endif
ngx_queue_t calls;
ngx_queue_t calls;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatch_calls

ngx_proxy_wasm_foreign_cb_t *fcallback;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreign_call

ngx_queue_t fcallbacks;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreign_calls


/* flags */

Expand Down Expand Up @@ -415,6 +424,9 @@ ngx_proxy_wasm_err_e ngx_proxy_wasm_run_step(ngx_proxy_wasm_exec_t *pwexec,
ngx_proxy_wasm_step_e step);
ngx_uint_t ngx_proxy_wasm_dispatch_calls_total(ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_dispatch_calls_cancel(ngx_proxy_wasm_exec_t *pwexec);
ngx_uint_t ngx_proxy_wasm_foreign_callbacks_total(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreign_calls_total
...

ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_foreign_callbacks_cancel(ngx_proxy_wasm_exec_t *pwexec);


/* host handlers */
Expand Down
131 changes: 131 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm_foreign_callback.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"

#include <ngx_proxy_wasm_foreign_callback.h>


void
ngx_proxy_wasm_foreign_callback_destroy(ngx_proxy_wasm_foreign_cb_t *cb)
{

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

ngx_pfree(cb->pwexec->pool, cb);
}


ngx_proxy_wasm_foreign_cb_t *
ngx_proxy_wasm_foreign_callback_alloc(ngx_proxy_wasm_exec_t *pwexec)
{
ngx_proxy_wasm_foreign_cb_t *cb;

cb = ngx_palloc(pwexec->pool, sizeof(ngx_proxy_wasm_foreign_cb_t));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cb = ngx_palloc(pwexec->pool, sizeof(ngx_proxy_wasm_foreign_cb_t));
cb = ngx_pcalloc(pwexec->pool, sizeof(ngx_proxy_wasm_foreign_cb_t));
  • inlined

cb->pwexec = pwexec;

return cb;
}


void
ngx_proxy_wasm_foreign_callback(ngx_proxy_wasm_foreign_cb_t *cb)
{
ngx_proxy_wasm_exec_t *pwexec = cb->pwexec;
ngx_proxy_wasm_err_e ecode = NGX_PROXY_WASM_ERR_NONE;
ngx_proxy_wasm_step_e step = pwexec->parent->step;

ngx_queue_remove(&cb->q);
pwexec->fcallback = cb;

#if (NGX_WASM_HTTP)
pwexec->parent->phase = ngx_wasm_phase_lookup(&ngx_http_wasm_subsystem,
NGX_WASM_BACKGROUND_PHASE);
#endif

ecode = ngx_proxy_wasm_run_step(pwexec,
NGX_PROXY_WASM_STEP_FOREIGN_CALLBACK);
if (ecode != NGX_PROXY_WASM_ERR_NONE) {
/* TODO: error handling */
}

pwexec->parent->step = step;
pwexec->fcallback = NULL;

if (ngx_proxy_wasm_foreign_callbacks_total(pwexec)) {
ngx_log_debug0(NGX_LOG_DEBUG_WASM, pwexec->log, 0,
"proxy_wasm more foreign function callbacks pending...");

#if (NGX_WASM_HTTP)
ngx_wasm_yield(&cb->rctx->env);
#endif
ngx_proxy_wasm_ctx_set_next_action(pwexec->parent,
NGX_PROXY_WASM_ACTION_PAUSE);

} else {
ngx_log_debug0(NGX_LOG_DEBUG_WASM, pwexec->log, 0,
"proxy_wasm last foreign function callback handled");

#if (NGX_WASM_HTTP)
ngx_wasm_continue(&cb->rctx->env);
#endif
ngx_proxy_wasm_ctx_set_next_action(pwexec->parent,
NGX_PROXY_WASM_ACTION_CONTINUE);

/* resume current step if unfinished */
ngx_proxy_wasm_resume(pwexec->parent, pwexec->parent->phase, step);
}

ngx_proxy_wasm_foreign_callback_destroy(cb);
}


ngx_int_t
ngx_proxy_wasm_foreign_callback_buffer_create(ngx_proxy_wasm_foreign_cb_t *cb,
size_t size)
{
ngx_buf_t *b;
ngx_chain_t *cl;
ngx_proxy_wasm_exec_t *pwexec = cb->pwexec;

ngx_wa_assert(pwexec);
ngx_wa_assert(size);

cl = ngx_alloc_chain_link(pwexec->pool);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ngx_wasm_chain_get_free_buf

if (cl == NULL) {
return NGX_ERROR;
}

/* TODO: if size exceeds a threshold, split allocation into N buffers */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ngx_wa_assert(len < NGX_MAX_ERRLEN)


b = ngx_create_temp_buf(pwexec->pool, size);
if (b == NULL) {
return NGX_ERROR;
}

cl->buf = b;
cl->next = NULL;

cb->args_out = cl;

return NGX_OK;
}


ngx_int_t
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

void

ngx_proxy_wasm_foreign_callback_buffer_write(ngx_proxy_wasm_foreign_cb_t *cb,
ngx_str_t *data)
{
size_t b_size;
ngx_buf_t *b = cb->args_out->buf;

b_size = b->end - b->start;

ngx_wa_assert(data->len <= b_size);

if (data->len <= b_size) {
b->last = ngx_cpymem(b->last, data->data, data->len);
}

/* TODO: data->len > b_size */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move up to the assert


return NGX_OK;
}
29 changes: 29 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm_foreign_callback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef _NGX_PROXY_WASM_FOREIGN_CALLBACK_H_INCLUDED_
#define _NGX_PROXY_WASM_FOREIGN_CALLBACK_H_INCLUDED_


#include <ngx_wasm.h>
#include <ngx_proxy_wasm.h>
#include <ngx_wasm_subsystem.h>


struct ngx_proxy_wasm_foreign_cb_s {
ngx_queue_t q;
ngx_proxy_wasm_exec_t *pwexec;
#if (NGX_WASM_HTTP)
ngx_http_wasm_req_ctx_t *rctx;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ngx_http_wasm_req_ctx_t *rctx;

#endif
ngx_proxy_wasm_foreign_function_e fcode;
ngx_chain_t *args_out;
};


ngx_proxy_wasm_foreign_cb_t * ngx_proxy_wasm_foreign_callback_alloc(
ngx_proxy_wasm_exec_t *pwexec);
void ngx_proxy_wasm_foreign_callback(ngx_proxy_wasm_foreign_cb_t *cb);
ngx_int_t ngx_proxy_wasm_foreign_callback_buffer_create(
ngx_proxy_wasm_foreign_cb_t *cb, size_t size);
ngx_int_t ngx_proxy_wasm_foreign_callback_buffer_write(
ngx_proxy_wasm_foreign_cb_t *cb, ngx_str_t *data);
void ngx_proxy_wasm_foreign_callback_destroy(ngx_proxy_wasm_foreign_cb_t *cb);
#endif
Loading
Loading