Skip to content

Commit

Permalink
lib: Add threading and synchronization mechanisms for traps to AgentX
Browse files Browse the repository at this point in the history
- Introduce a dedicated AgentX thread using `frr_pthread`.

- Add mutex locks (`ax_mtx`, `ax_io_mtx`) to manage thread
synchronization for trap transfer and I/O operations.

- Implemented ring buffers (`ibuf_ax`) for handling
"master -> AgentX" communication, improving data handling
between threads.

- Update the SNMP read operations to use mutex locks to
ensure thread-safe execution.

- Integrated a new dedicated thread to send SNMP traps,
ensuring separation of responsibilities between the main
and AgentX threads.

- Enhanced trap handling to support multi-index traps, with
excess traps being discarded if the buffer is full,
preventing overflow.

- Enhanced trap handling to support multi-index traps.
When more than "RINGBUF_NB_TRAP" traps are pending for
transmission, subsequent traps are discarded to prevent
overflow.

This update significantly improves concurrency,
synchronization, and trap management within the AgentX
module, with added protection against socket's buffer overflow
from excessive traps. The socket's buffer overflow is leading
to process deadlock.

Signed-off-by: Francois Dumontet <[email protected]>
  • Loading branch information
fdumontet6WIND committed Oct 16, 2024
1 parent 81f7790 commit 6124dae
Showing 1 changed file with 153 additions and 16 deletions.
169 changes: 153 additions & 16 deletions lib/agentx.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,84 @@
#include "libfrr.h"
#include "xref.h"
#include "lib/libagentx.h"
#include "ringbuf.h" /* for ringbuf_remain, ringbuf_peek, ringbuf_.. */
#include "frr_pthread.h" /* for struct frr_pthread */

XREF_SETUP();


static int agentx_stop(struct frr_pthread *fpt, void **result);

DEFINE_HOOK(agentx_enabled, (), ());

//bool agentx_enabled = false;

/* the thread master of the agentx's pthread */
static struct event_loop *agentx_tm;
/* the thread master of the main pthread of that process */
static struct event_loop *main_pthread_tm;
static struct event *timeout_thr = NULL;
static struct list *events = NULL;

struct frr_pthread *agentx_pth;

/* buffer dedicated to "master -> agentx" threads */
static struct ringbuf *ibuf_ax;
#define RINGBUF_NB_TRAP 200
/* mutex dedicated to trap transfert between threads */
static pthread_mutex_t ax_mtx;
/* mutex dedicated to send/read exclusion */
static pthread_mutex_t ax_io_mtx;

/* counters */
static uint64_t nb_locktry_read_fail;
static uint64_t nb_locktry_timeout_fail;


static void agentx_pthreads_init(void)
{
assert(!agentx_pth);

struct frr_pthread_attr ax = {
.start = frr_pthread_attr_default.start,
.stop = frr_pthread_attr_default.stop,
};

agentx_pth = frr_pthread_new(&ax, "Agentx thread", "agentx_pth");
}

static void agentx_pthreads_run(void)
{
frr_pthread_run(agentx_pth, NULL);

/* Wait until threads are ready. */
frr_pthread_wait_running(agentx_pth);
}

static void agentx_events_update(struct event *t);

/* this function is called in main bgp thread */
static void agentx_timeout(struct event *t)
{
snmp_timeout();
run_alarms();
/* in case of lock nothing is done and */
/* agentx_timeout will be called later. */
if (pthread_mutex_trylock(&ax_io_mtx) == 0) {
snmp_timeout();
run_alarms();
} else {
if (nb_locktry_timeout_fail % 100 == 0)
zlog_err("%s mutex already locked %llu times", __func__,
nb_locktry_timeout_fail);
nb_locktry_timeout_fail++;
agentx_events_update(NULL);
return;
}
pthread_mutex_unlock(&ax_io_mtx);
netsnmp_check_outstanding_agent_requests();
agentx_events_update(NULL);
}

/* this function is called in main bgp thread */
static void agentx_read(struct event *t)
{
netsnmp_large_fd_set lfds;
Expand Down Expand Up @@ -71,9 +128,25 @@ static void agentx_read(struct event *t)
flog_err(EC_LIB_SYSTEM_CALL, "Failed to set snmp fd non blocking: %s(%d)",
strerror(errno), errno);

netsnmp_large_fd_set_init(&lfds, FD_SETSIZE);
netsnmp_large_fd_setfd(t->u.fd, &lfds);
snmp_read2(&lfds);
/* mutex to avoid sending/reading concurrently */
/* in case of lock nothing is done and agentx_read will */
/* be called later. */
/* snmp_read2 function provides snmpget/walk functions */
/* call to *table functions are made from that point */
/* we are in main bgp thread */
if (pthread_mutex_trylock(&ax_io_mtx) == 0) {
netsnmp_large_fd_set_init(&lfds, FD_SETSIZE);
netsnmp_large_fd_setfd(t->u.fd, &lfds);
snmp_read2(&lfds);
} else {
if (nb_locktry_read_fail % 100 == 0)
zlog_err("%s mutex already locked %llu times", __func__,
nb_locktry_read_fail);
nb_locktry_read_fail++;
agentx_events_update(NULL);
return;
}
pthread_mutex_unlock(&ax_io_mtx);

/* Reset the flag */
if (!nonblock) {
Expand All @@ -91,6 +164,7 @@ static void agentx_read(struct event *t)
netsnmp_large_fd_set_cleanup(&lfds);
}

/* this function is called in main bgp thread */
static void agentx_events_update(struct event *t)
{
int maxfd = 0;
Expand All @@ -106,10 +180,8 @@ static void agentx_events_update(struct event *t)
netsnmp_large_fd_set_init(&lfds, FD_SETSIZE);
snmp_select_info2(&maxfd, &lfds, &timeout, &block);

if (!block) {
event_add_timer_tv(agentx_tm, agentx_timeout, NULL, &timeout,
&timeout_thr);
}
if (!block)
event_add_timer_tv(main_pthread_tm, agentx_timeout, NULL, &timeout, &timeout_thr);

ln = listhead(events);
thr = ln ? listgetdata(ln) : NULL;
Expand Down Expand Up @@ -137,7 +209,7 @@ static void agentx_events_update(struct event *t)

thr = XCALLOC(MTYPE_TMP, sizeof(struct event *));
newln = listnode_add_before(events, ln, thr);
event_add_read(agentx_tm, agentx_read, newln, fd, thr);
event_add_read(main_pthread_tm, agentx_read, newln, fd, thr);
}
}

Expand Down Expand Up @@ -196,9 +268,12 @@ static int agentx_log_callback(int major, int minor, void *serverarg,
static int agentx_cli_on(void)
{
if (!agentx_enabled) {
agentx_pthreads_run();
agentx_tm = agentx_pth->master;
init_snmp(FRR_SMUX_NAME);
events = list_new();
agentx_events_update(NULL);
ibuf_ax = ringbuf_new(RINGBUF_NB_TRAP * sizeof(void *));
agentx_enabled = true;
hook_call(agentx_enabled);
}
Expand All @@ -216,7 +291,7 @@ static int agentx_cli_off(void)
static int smux_disable(void)
{
agentx_enabled = false;

agentx_stop(agentx_pth, NULL);
return 0;
}

Expand All @@ -227,11 +302,15 @@ bool smux_enabled(void)

void smux_init(struct event_loop *tm)
{
agentx_tm = tm;
main_pthread_tm = tm;
agentx_pthreads_init();

hook_register(agentx_cli_enabled, agentx_cli_on);
hook_register(agentx_cli_disabled, agentx_cli_off);

pthread_mutex_init(&ax_mtx, NULL);
pthread_mutex_init(&ax_io_mtx, NULL);

netsnmp_enable_subagent();
snmp_disable_log();
snmp_enable_calllog();
Expand All @@ -245,9 +324,12 @@ void smux_init(struct event_loop *tm)
void smux_agentx_enable(void)
{
if (!agentx_enabled) {
agentx_pthreads_run();
agentx_tm = agentx_pth->master;
init_snmp(FRR_SMUX_NAME);
events = list_new();
agentx_events_update(NULL);
ibuf_ax = ringbuf_new(RINGBUF_NB_TRAP * sizeof(void *));
agentx_enabled = true;
}
}
Expand Down Expand Up @@ -275,6 +357,37 @@ void smux_trap(struct variable *vp, size_t vp_len, const oid *ename,
trapobjlen, sptrap);
}

/* function called in the agentx thread */
/* get data to send and from transfert structures */
/* performs oid and data sending via send_v2trap */
static void smux_trap_multi_index_thd(struct event *thread)
{
netsnmp_variable_list **notification_vars = NULL;

frr_with_mutex (&ax_mtx) {
if (ringbuf_remain(ibuf_ax) == 0) {
zlog_err("%s no data to read in ring buffers", __func__);
return;
}
ringbuf_get(ibuf_ax, &notification_vars, sizeof(notification_vars));
}

/* mutex to avoid sending/reading concurrently */
frr_with_mutex (&ax_io_mtx) {
send_v2trap(*notification_vars);
}
snmp_free_varbind(*notification_vars);
SNMP_FREE(notification_vars);
/* continue in main thread */
event_add_event(main_pthread_tm, agentx_events_update, NULL, 0, NULL);
}


/* function called and running in bgp main thread */
/* build oid and data to send via *table functions */
/* call to *tables functions made in bgp main thread */
/* put data in transfert structures (ring buffer) */
/* call the agentx thread for sending data */
int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
size_t enamelen, const oid *name, size_t namelen,
struct index_oid *iname, size_t index_len,
Expand All @@ -286,8 +399,9 @@ int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
oid notification_oid[MAX_OID_LEN];
size_t notification_oid_len;
unsigned int i;
netsnmp_variable_list *notification_vars;

netsnmp_variable_list *notification_vars = NULL;
notification_vars = SNMP_MALLOC_TYPEDEF(netsnmp_variable_list);
if (!agentx_enabled)
return 0;

Expand Down Expand Up @@ -365,10 +479,18 @@ int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
}
}

/* transmission to the agentx thread*/
frr_with_mutex (&ax_mtx) {
if (ringbuf_space(ibuf_ax) < sizeof(notification_vars)) {
zlog_err("%s not enougth space in ibuf_ax needed : %lu free : %lu",
__func__, sizeof(notification_vars), ringbuf_space(ibuf_ax));
return 0;
}
ringbuf_put(ibuf_ax, &notification_vars, sizeof(netsnmp_variable_list **));
}

event_add_event(agentx_tm, smux_trap_multi_index_thd, NULL, 0, NULL);

send_v2trap(notification_vars);
snmp_free_varbind(notification_vars);
agentx_events_update(NULL);
return 1;
}

Expand All @@ -389,4 +511,19 @@ void smux_terminate(void)
list_delete(&events);
}
}

static int agentx_stop(struct frr_pthread *fpt, void **result)
{
assert(fpt->running);

ringbuf_del(ibuf_ax);
pthread_mutex_unlock(&ax_mtx);
pthread_mutex_unlock(&ax_io_mtx);
pthread_mutex_destroy(&ax_mtx);
pthread_mutex_destroy(&ax_io_mtx);

return 0;
}


#endif /* SNMP_AGENTX */

0 comments on commit 6124dae

Please sign in to comment.