Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agentx thread #16986

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 160 additions & 23 deletions lib/agentx.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,84 @@
#include "libfrr.h"
#include "xref.h"
#include "lib/libagentx.h"
#include "ringbuf.h" /* for ringbuf_remain, ringbuf_peek, ringbuf_.. */
#include "frr_pthread.h" /* for struct frr_pthread */

XREF_SETUP();


static int agentx_stop(struct frr_pthread *fpt, void **result);

DEFINE_HOOK(agentx_enabled, (), ());

//bool agentx_enabled = false;

/* the thread master of the agentx's pthread */
static struct event_loop *agentx_tm;
/* the thread master of the main pthread of that process */
static struct event_loop *main_pthread_tm;
static struct event *timeout_thr = NULL;
static struct list *events = NULL;

static void agentx_events_update(void);
struct frr_pthread *agentx_pth;

/* buffer dedicated to "master -> agentx" threads */
static struct ringbuf *ibuf_ax;
#define RINGBUF_NB_TRAP 200
/* mutex dedicated to trap transfert between threads */
static pthread_mutex_t ax_mtx;
/* mutex dedicated to send/read exclusion */
static pthread_mutex_t ax_io_mtx;

/* counters */
static uint64_t nb_locktry_read_fail;
static uint64_t nb_locktry_timeout_fail;


static void agentx_pthreads_init(void)
{
assert(!agentx_pth);

struct frr_pthread_attr ax = {
.start = frr_pthread_attr_default.start,
.stop = frr_pthread_attr_default.stop,
};

agentx_pth = frr_pthread_new(&ax, "Agentx thread", "agentx_pth");
}
vjardin marked this conversation as resolved.
Show resolved Hide resolved

static void agentx_pthreads_run(void)
{
frr_pthread_run(agentx_pth, NULL);

/* Wait until threads are ready. */
frr_pthread_wait_running(agentx_pth);
}

static void agentx_events_update(struct event *t);

/* this function is called in main bgp thread */
static void agentx_timeout(struct event *t)
{
snmp_timeout();
run_alarms();
/* in case of lock nothing is done and */
/* agentx_timeout will be called later. */
if (pthread_mutex_trylock(&ax_io_mtx) == 0) {
snmp_timeout();
run_alarms();
} else {
if (nb_locktry_timeout_fail % 100 == 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you bundle this fixup with e464755? Also, what should an operator do if I see such an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taken into account.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if an operator see a great number of such an error, restarting processes using agentx is good option

zlog_err("%s mutex already locked %llu times", __func__,
nb_locktry_timeout_fail);
nb_locktry_timeout_fail++;
agentx_events_update(NULL);
return;
}
pthread_mutex_unlock(&ax_io_mtx);
netsnmp_check_outstanding_agent_requests();
agentx_events_update();
agentx_events_update(NULL);
}

/* this function is called in main bgp thread */
static void agentx_read(struct event *t)
{
netsnmp_large_fd_set lfds;
Expand Down Expand Up @@ -71,9 +128,25 @@ static void agentx_read(struct event *t)
flog_err(EC_LIB_SYSTEM_CALL, "Failed to set snmp fd non blocking: %s(%d)",
strerror(errno), errno);

netsnmp_large_fd_set_init(&lfds, FD_SETSIZE);
netsnmp_large_fd_setfd(t->u.fd, &lfds);
snmp_read2(&lfds);
/* mutex to avoid sending/reading concurrently */
/* in case of lock nothing is done and agentx_read will */
/* be called later. */
/* snmp_read2 function provides snmpget/walk functions */
/* call to *table functions are made from that point */
/* we are in main bgp thread */
if (pthread_mutex_trylock(&ax_io_mtx) == 0) {
netsnmp_large_fd_set_init(&lfds, FD_SETSIZE);
netsnmp_large_fd_setfd(t->u.fd, &lfds);
snmp_read2(&lfds);
} else {
if (nb_locktry_read_fail % 100 == 0)
zlog_err("%s mutex already locked %llu times", __func__,
nb_locktry_read_fail);
nb_locktry_read_fail++;
agentx_events_update(NULL);
return;
}
pthread_mutex_unlock(&ax_io_mtx);

/* Reset the flag */
if (!nonblock) {
Expand All @@ -87,11 +160,12 @@ static void agentx_read(struct event *t)
}

netsnmp_check_outstanding_agent_requests();
agentx_events_update();
agentx_events_update(NULL);
netsnmp_large_fd_set_cleanup(&lfds);
}

static void agentx_events_update(void)
/* this function is called in main bgp thread */
static void agentx_events_update(struct event *t)
{
int maxfd = 0;
int block = 1;
Expand All @@ -106,10 +180,8 @@ static void agentx_events_update(void)
netsnmp_large_fd_set_init(&lfds, FD_SETSIZE);
snmp_select_info2(&maxfd, &lfds, &timeout, &block);

if (!block) {
event_add_timer_tv(agentx_tm, agentx_timeout, NULL, &timeout,
&timeout_thr);
}
if (!block)
event_add_timer_tv(main_pthread_tm, agentx_timeout, NULL, &timeout, &timeout_thr);

ln = listhead(events);
thr = ln ? listgetdata(ln) : NULL;
Expand Down Expand Up @@ -137,7 +209,7 @@ static void agentx_events_update(void)

thr = XCALLOC(MTYPE_TMP, sizeof(struct event *));
newln = listnode_add_before(events, ln, thr);
event_add_read(agentx_tm, agentx_read, newln, fd, thr);
event_add_read(main_pthread_tm, agentx_read, newln, fd, thr);
}
}

Expand Down Expand Up @@ -196,9 +268,12 @@ static int agentx_log_callback(int major, int minor, void *serverarg,
static int agentx_cli_on(void)
{
if (!agentx_enabled) {
agentx_pthreads_run();
agentx_tm = agentx_pth->master;
init_snmp(FRR_SMUX_NAME);
events = list_new();
agentx_events_update();
agentx_events_update(NULL);
ibuf_ax = ringbuf_new(RINGBUF_NB_TRAP * sizeof(void *));
agentx_enabled = true;
hook_call(agentx_enabled);
}
Expand All @@ -216,7 +291,7 @@ static int agentx_cli_off(void)
static int smux_disable(void)
{
agentx_enabled = false;

agentx_stop(agentx_pth, NULL);
return 0;
}

Expand All @@ -227,11 +302,15 @@ bool smux_enabled(void)

void smux_init(struct event_loop *tm)
{
agentx_tm = tm;
main_pthread_tm = tm;
agentx_pthreads_init();

hook_register(agentx_cli_enabled, agentx_cli_on);
hook_register(agentx_cli_disabled, agentx_cli_off);

pthread_mutex_init(&ax_mtx, NULL);
pthread_mutex_init(&ax_io_mtx, NULL);

netsnmp_enable_subagent();
snmp_disable_log();
snmp_enable_calllog();
Expand All @@ -245,9 +324,12 @@ void smux_init(struct event_loop *tm)
void smux_agentx_enable(void)
{
if (!agentx_enabled) {
agentx_pthreads_run();
agentx_tm = agentx_pth->master;
init_snmp(FRR_SMUX_NAME);
events = list_new();
agentx_events_update();
agentx_events_update(NULL);
ibuf_ax = ringbuf_new(RINGBUF_NB_TRAP * sizeof(void *));
agentx_enabled = true;
}
}
Expand Down Expand Up @@ -275,6 +357,37 @@ void smux_trap(struct variable *vp, size_t vp_len, const oid *ename,
trapobjlen, sptrap);
}

/* function called in the agentx thread */
/* get data to send and from transfert structures */
/* performs oid and data sending via send_v2trap */
static void smux_trap_multi_index_thd(struct event *thread)
{
netsnmp_variable_list **notification_vars = NULL;

frr_with_mutex (&ax_mtx) {
if (ringbuf_remain(ibuf_ax) == 0) {
zlog_err("%s no data to read in ring buffers", __func__);
return;
}
ringbuf_get(ibuf_ax, &notification_vars, sizeof(notification_vars));
}

/* mutex to avoid sending/reading concurrently */
frr_with_mutex (&ax_io_mtx) {
send_v2trap(*notification_vars);
}
snmp_free_varbind(*notification_vars);
SNMP_FREE(notification_vars);
/* continue in main thread */
event_add_event(main_pthread_tm, agentx_events_update, NULL, 0, NULL);
}


/* function called and running in bgp main thread */
/* build oid and data to send via *table functions */
/* call to *tables functions made in bgp main thread */
/* put data in transfert structures (ring buffer) */
/* call the agentx thread for sending data */
int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
size_t enamelen, const oid *name, size_t namelen,
struct index_oid *iname, size_t index_len,
Expand All @@ -286,8 +399,9 @@ int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
oid notification_oid[MAX_OID_LEN];
size_t notification_oid_len;
unsigned int i;
netsnmp_variable_list *notification_vars;

netsnmp_variable_list *notification_vars = NULL;
notification_vars = SNMP_MALLOC_TYPEDEF(netsnmp_variable_list);
if (!agentx_enabled)
return 0;

Expand Down Expand Up @@ -365,16 +479,24 @@ int smux_trap_multi_index(struct variable *vp, size_t vp_len, const oid *ename,
}
}

/* transmission to the agentx thread*/
frr_with_mutex (&ax_mtx) {
if (ringbuf_space(ibuf_ax) < sizeof(notification_vars)) {
zlog_err("%s not enougth space in ibuf_ax needed : %lu free : %lu",
__func__, sizeof(notification_vars), ringbuf_space(ibuf_ax));
return 0;
}
ringbuf_put(ibuf_ax, &notification_vars, sizeof(netsnmp_variable_list **));
}

event_add_event(agentx_tm, smux_trap_multi_index_thd, NULL, 0, NULL);

send_v2trap(notification_vars);
snmp_free_varbind(notification_vars);
agentx_events_update();
return 1;
}

void smux_events_update(void)
{
agentx_events_update();
agentx_events_update(NULL);
}

static void smux_events_delete_thread(void *arg)
Expand All @@ -389,4 +511,19 @@ void smux_terminate(void)
list_delete(&events);
}
}

static int agentx_stop(struct frr_pthread *fpt, void **result)
{
assert(fpt->running);

ringbuf_del(ibuf_ax);
pthread_mutex_unlock(&ax_mtx);
pthread_mutex_unlock(&ax_io_mtx);
pthread_mutex_destroy(&ax_mtx);
pthread_mutex_destroy(&ax_io_mtx);

return 0;
}


#endif /* SNMP_AGENTX */
Loading