Skip to content

Commit

Permalink
lib: Add threading and synchronization mechanisms for traps to AgentX
Browse files Browse the repository at this point in the history
- Introduce a dedicated AgentX thread using `frr_pthread`.

- Add mutex locks (`ax_mtx`, `ax_io_mtx`) to manage thread
synchronization for trap transfer and I/O operations.

- Implemented ring buffers (`ibuf_ax`) for handling
"master -> AgentX" communication, improving data handling
between threads.

- Update the SNMP read operations to use mutex locks to
ensure thread-safe execution.

- Integrated a new dedicated thread to send SNMP traps,
ensuring separation of responsibilities between the main
and AgentX threads.

- Enhanced trap handling to support multi-index traps, with
excess traps being discarded if the buffer is full,
preventing overflow.

- Enhanced trap handling to support multi-index traps.
When more than "RINGBUF_NB_TRAP" traps are pending for
transmission, subsequent traps are discarded to prevent
overflow.

This update significantly improves concurrency,
synchronization, and trap management within the AgentX
module, with added protection against socket's buffer overflow
from excessive traps. The socket's buffer overflow is leading
to process deadlock.

Signed-off-by: Francois Dumontet <[email protected]>
  • Loading branch information
fdumontet6WIND committed Oct 2, 2024
1 parent 81f7790 commit fddcf30
Showing 1 changed file with 104 additions and 13 deletions.
117 changes: 104 additions & 13 deletions lib/agentx.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,53 @@
#include "libfrr.h"
#include "xref.h"
#include "lib/libagentx.h"
#include "ringbuf.h" /* for ringbuf_remain, ringbuf_peek, ringbuf_.. */
#include "frr_pthread.h" /* for struct frr_pthread */

XREF_SETUP();


extern int agentx_stop(struct frr_pthread *fpt, void **result);

DEFINE_HOOK(agentx_enabled, (), ());

//bool agentx_enabled = false;

static struct event_loop *agentx_tm;
static struct event_loop *main_tm;
static struct event *timeout_thr = NULL;
static struct list *events = NULL;

struct frr_pthread *agentx_pth;

/* buffer dedicated to "master -> agentx" threads */
static struct ringbuf *ibuf_ax;
#define RINGBUF_NB_TRAP 200
/* mutex dedicated to trap transfert between threads */
pthread_mutex_t ax_mtx;
/* mutex dedicated to send/read exclusion */
pthread_mutex_t ax_io_mtx;

static void agentx_pthreads_init(void)
{
assert(!agentx_pth);

struct frr_pthread_attr ax = {
.start = frr_pthread_attr_default.start,
.stop = frr_pthread_attr_default.stop,
};

agentx_pth = frr_pthread_new(&ax, "BGP Agentx thread", "bgpd_ax");
}

static void agentx_pthreads_run(void)
{
frr_pthread_run(agentx_pth, NULL);

/* Wait until threads are ready. */
frr_pthread_wait_running(agentx_pth);
}

static void agentx_events_update(struct event *t);

static void agentx_timeout(struct event *t)
Expand Down Expand Up @@ -70,10 +106,11 @@ static void agentx_read(struct event *t)
if (new_flags == -1)
flog_err(EC_LIB_SYSTEM_CALL, "Failed to set snmp fd non blocking: %s(%d)",
strerror(errno), errno);

netsnmp_large_fd_set_init(&lfds, FD_SETSIZE);
netsnmp_large_fd_setfd(t->u.fd, &lfds);
snmp_read2(&lfds);
frr_with_mutex (&ax_io_mtx) {
netsnmp_large_fd_set_init(&lfds, FD_SETSIZE);
netsnmp_large_fd_setfd(t->u.fd, &lfds);
snmp_read2(&lfds);
}

/* Reset the flag */
if (!nonblock) {
Expand Down Expand Up @@ -107,8 +144,7 @@ static void agentx_events_update(struct event *t)
snmp_select_info2(&maxfd, &lfds, &timeout, &block);

if (!block) {
event_add_timer_tv(agentx_tm, agentx_timeout, NULL, &timeout,
&timeout_thr);
event_add_timer_tv(main_tm, agentx_timeout, NULL, &timeout, &timeout_thr);
}

ln = listhead(events);
Expand Down Expand Up @@ -137,7 +173,7 @@ static void agentx_events_update(struct event *t)

thr = XCALLOC(MTYPE_TMP, sizeof(struct event *));
newln = listnode_add_before(events, ln, thr);
event_add_read(agentx_tm, agentx_read, newln, fd, thr);
event_add_read(main_tm, agentx_read, newln, fd, thr);
}
}

Expand Down Expand Up @@ -196,9 +232,12 @@ static int agentx_log_callback(int major, int minor, void *serverarg,
static int agentx_cli_on(void)
{
if (!agentx_enabled) {
agentx_pthreads_run();
agentx_tm = agentx_pth->master;
init_snmp(FRR_SMUX_NAME);
events = list_new();
agentx_events_update(NULL);
ibuf_ax = ringbuf_new(RINGBUF_NB_TRAP * sizeof(void *));
agentx_enabled = true;
hook_call(agentx_enabled);
}
Expand All @@ -215,8 +254,9 @@ static int agentx_cli_off(void)

static int smux_disable(void)
{
agentx_enabled = false;

agentx_enabled = false;
agentx_stop (agentx_pth, NULL);
return 0;
}

Expand All @@ -227,11 +267,15 @@ bool smux_enabled(void)

void smux_init(struct event_loop *tm)
{
agentx_tm = tm;
main_tm = tm;
agentx_pthreads_init();

hook_register(agentx_cli_enabled, agentx_cli_on);
hook_register(agentx_cli_disabled, agentx_cli_off);

pthread_mutex_init(&ax_mtx, NULL);
pthread_mutex_init(&ax_io_mtx, NULL);

netsnmp_enable_subagent();
snmp_disable_log();
snmp_enable_calllog();
Expand All @@ -245,9 +289,12 @@ void smux_init(struct event_loop *tm)
void smux_agentx_enable(void)
{
if (!agentx_enabled) {
agentx_pthreads_run();
agentx_tm = agentx_pth->master;
init_snmp(FRR_SMUX_NAME);
events = list_new();
agentx_events_update(NULL);
ibuf_ax = ringbuf_new(RINGBUF_NB_TRAP * sizeof(void *));
agentx_enabled = true;
}
}
Expand Down Expand Up @@ -275,6 +322,28 @@ void smux_trap(struct variable *vp, size_t vp_len, const oid *ename,
trapobjlen, sptrap);
}

static void smux_trap_multi_index_thd(struct event *thread)
{
netsnmp_variable_list **notification_vars = NULL;

frr_with_mutex (&ax_mtx) {
if (ringbuf_remain(ibuf_ax) == 0) {
zlog_err("%s no data to read in ring buffers", __func__);
return;
}
ringbuf_get(ibuf_ax, &notification_vars, sizeof(notification_vars));
zlog_err("%s get %p %p", __func__, notification_vars, *notification_vars);
}

frr_with_mutex (&ax_io_mtx) {
send_v2trap(*notification_vars);
}
snmp_free_varbind(*notification_vars);
SNMP_FREE(notification_vars);
/* continue in main thread */
event_add_event(main_tm, agentx_events_update, NULL, 0, NULL);
}

int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
size_t enamelen, const oid *name, size_t namelen,
struct index_oid *iname, size_t index_len,
Expand All @@ -286,8 +355,9 @@ int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
oid notification_oid[MAX_OID_LEN];
size_t notification_oid_len;
unsigned int i;
netsnmp_variable_list *notification_vars;

netsnmp_variable_list *notification_vars = NULL;
notification_vars = SNMP_MALLOC_TYPEDEF(netsnmp_variable_list);
if (!agentx_enabled)
return 0;

Expand Down Expand Up @@ -365,10 +435,18 @@ int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
}
}

/* transmission to the agentx thread*/
frr_with_mutex (&ax_mtx) {
if (ringbuf_space(ibuf_ax) < sizeof(notification_vars)) {
zlog_err("%s not enougth space in ibuf_ax needed : %lu free : %lu",
__func__, sizeof(notification_vars), ringbuf_space(ibuf_ax));
return 0;
}
ringbuf_put(ibuf_ax, &notification_vars, sizeof(netsnmp_variable_list **));
}

event_add_event(agentx_tm, smux_trap_multi_index_thd, NULL, 0, NULL);

send_v2trap(notification_vars);
snmp_free_varbind(notification_vars);
agentx_events_update(NULL);
return 1;
}

Expand All @@ -389,4 +467,17 @@ void smux_terminate(void)
list_delete(&events);
}
}

int agentx_stop(struct frr_pthread *fpt, void **result)
{
assert(fpt->running);
pthread_mutex_destroy(&ax_mtx);
pthread_mutex_destroy(&ax_io_mtx);
atomic_store_explicit(&fpt->running, false, memory_order_relaxed);

pthread_join(fpt->thread, result);
return 0;
}


#endif /* SNMP_AGENTX */

0 comments on commit fddcf30

Please sign in to comment.