Skip to content

Commit

Permalink
Merge pull request #14724 from donaldsharp/workqueue_cleanup
Browse files Browse the repository at this point in the history
Workqueue cleanup
  • Loading branch information
ton31337 authored Nov 4, 2023
2 parents d4c596d + 2a65f05 commit 4bdba57
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 69 deletions.
26 changes: 13 additions & 13 deletions lib/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,32 @@ extern "C" {
/* Create a new buffer. Memory will be allocated in chunks of the given
size. If the argument is 0, the library will supply a reasonable
default size suitable for buffering socket I/O. */
extern struct buffer *buffer_new(size_t);
extern struct buffer *buffer_new(size_t size);

/* Free all data in the buffer. */
extern void buffer_reset(struct buffer *);
extern void buffer_reset(struct buffer *b);

/* This function first calls buffer_reset to release all buffered data.
Then it frees the struct buffer itself. */
extern void buffer_free(struct buffer *);
extern void buffer_free(struct buffer *b);

/* Add the given data to the end of the buffer. */
extern void buffer_put(struct buffer *, const void *, size_t);
extern void buffer_put(struct buffer *b, const void *p, size_t size);
/* Add a single character to the end of the buffer. */
extern void buffer_putc(struct buffer *, uint8_t);
extern void buffer_putc(struct buffer *b, uint8_t c);
/* Add a NUL-terminated string to the end of the buffer. */
extern void buffer_putstr(struct buffer *, const char *);
extern void buffer_putstr(struct buffer *b, const char *str);
/* Add given data, inline-expanding \n to \r\n */
extern void buffer_put_crlf(struct buffer *b, const void *p, size_t size);

/* Combine all accumulated (and unflushed) data inside the buffer into a
single NUL-terminated string allocated using XMALLOC(MTYPE_TMP). Note
that this function does not alter the state of the buffer, so the data
is still inside waiting to be flushed. */
char *buffer_getstr(struct buffer *);
char *buffer_getstr(struct buffer *b);

/* Returns 1 if there is no pending data in the buffer. Otherwise returns 0. */
int buffer_empty(struct buffer *);
int buffer_empty(struct buffer *b);

typedef enum {
/* An I/O error occurred. The buffer should be destroyed and the
Expand All @@ -59,20 +59,20 @@ typedef enum {

/* Try to write this data to the file descriptor. Any data that cannot
be written immediately is added to the buffer queue. */
extern buffer_status_t buffer_write(struct buffer *, int fd, const void *,
size_t);
extern buffer_status_t buffer_write(struct buffer *b, int fd, const void *p,
size_t size);

/* This function attempts to flush some (but perhaps not all) of
the queued data to the given file descriptor. */
extern buffer_status_t buffer_flush_available(struct buffer *, int fd);
extern buffer_status_t buffer_flush_available(struct buffer *b, int fd);

/* The following 2 functions (buffer_flush_all and buffer_flush_window)
are for use in lib/vty.c only. They should not be used elsewhere. */

/* Call buffer_flush_available repeatedly until either all data has been
flushed, or an I/O error has been encountered, or the operation would
block. */
extern buffer_status_t buffer_flush_all(struct buffer *, int fd);
extern buffer_status_t buffer_flush_all(struct buffer *b, int fd);

/* Attempt to write enough data to the given fd to fill a window of the
given width and height (and remove the data written from the buffer).
Expand All @@ -85,7 +85,7 @@ extern buffer_status_t buffer_flush_all(struct buffer *, int fd);
to return -1 (because the logic for handling the erase and more features
is too complicated to retry the write later).
*/
extern buffer_status_t buffer_flush_window(struct buffer *, int fd, int width,
extern buffer_status_t buffer_flush_window(struct buffer *b, int fd, int width,
int height, int erase, int no_more);

#ifdef __cplusplus
Expand Down
40 changes: 21 additions & 19 deletions lib/if.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,9 @@ extern int if_cmp_name_func(const char *p1, const char *p2);
* This is useful for vrf route-leaking. So more than anything
* else think before you use VRF_UNKNOWN
*/
extern void if_update_to_new_vrf(struct interface *, vrf_id_t vrf_id);
extern void if_update_to_new_vrf(struct interface *ifp, vrf_id_t vrf_id);

extern struct interface *if_lookup_by_index(ifindex_t, vrf_id_t vrf_id);
extern struct interface *if_lookup_by_index(ifindex_t ifindex, vrf_id_t vrf_id);
extern struct interface *if_vrf_lookup_by_index_next(ifindex_t ifindex,
vrf_id_t vrf_id);
extern struct interface *if_lookup_address_local(const void *matchaddr,
Expand Down Expand Up @@ -564,7 +564,7 @@ extern int if_set_index(struct interface *ifp, ifindex_t ifindex);
/* Delete the interface, but do not free the structure, and leave it in the
interface list. It is often advisable to leave the pseudo interface
structure because there may be configuration information attached. */
extern void if_delete_retain(struct interface *);
extern void if_delete_retain(struct interface *ifp);

/* Delete and free the interface structure: calls if_delete_retain and then
deletes it from the interface list and frees the structure. */
Expand All @@ -582,13 +582,13 @@ extern int if_is_pointopoint(const struct interface *ifp);
extern int if_is_multicast(const struct interface *ifp);
extern void if_terminate(struct vrf *vrf);
extern void if_dump_all(void);
extern const char *if_flag_dump(unsigned long);
extern const char *if_link_type_str(enum zebra_link_type);
extern const char *if_flag_dump(unsigned long flags);
extern const char *if_link_type_str(enum zebra_link_type zlt);

/* Please use ifindex2ifname instead of if_indextoname where possible;
ifindex2ifname uses internal interface info, whereas if_indextoname must
make a system call. */
extern const char *ifindex2ifname(ifindex_t, vrf_id_t vrf_id);
extern const char *ifindex2ifname(ifindex_t ifindex, vrf_id_t vrf_id);

/* Please use ifname2ifindex instead of if_nametoindex where possible;
ifname2ifindex uses internal interface info, whereas if_nametoindex must
Expand All @@ -598,29 +598,31 @@ extern ifindex_t ifname2ifindex(const char *ifname, vrf_id_t vrf_id);
/* Connected address functions. */
extern struct connected *connected_new(void);
extern void connected_free(struct connected **connected);
extern struct connected *
connected_add_by_prefix(struct interface *, struct prefix *, struct prefix *);
extern struct connected *connected_delete_by_prefix(struct interface *,
struct prefix *);
extern struct connected *connected_lookup_prefix(struct interface *,
const struct prefix *);
extern struct connected *connected_lookup_prefix_exact(struct interface *,
const struct prefix *);
extern unsigned int connected_count_by_family(struct interface *, int family);
extern struct connected *connected_add_by_prefix(struct interface *ifp,
struct prefix *p,
struct prefix *dest);
extern struct connected *connected_delete_by_prefix(struct interface *ifp,
struct prefix *p);
extern struct connected *connected_lookup_prefix(struct interface *ifp,
const struct prefix *p);
extern struct connected *connected_lookup_prefix_exact(struct interface *ifp,
const struct prefix *p);
extern unsigned int connected_count_by_family(struct interface *ifp, int family);
extern struct nbr_connected *nbr_connected_new(void);
extern void nbr_connected_free(struct nbr_connected *);
struct nbr_connected *nbr_connected_check(struct interface *, struct prefix *);
extern void nbr_connected_free(struct nbr_connected *connected);
struct nbr_connected *nbr_connected_check(struct interface *ifp,
struct prefix *p);
struct connected *connected_get_linklocal(struct interface *ifp);

/* link parameters */
bool if_link_params_cmp(struct if_link_params *iflp1,
struct if_link_params *iflp2);
void if_link_params_copy(struct if_link_params *dst,
struct if_link_params *src);
struct if_link_params *if_link_params_get(struct interface *);
struct if_link_params *if_link_params_get(struct interface *ifp);
struct if_link_params *if_link_params_enable(struct interface *ifp);
struct if_link_params *if_link_params_init(struct interface *ifp);
void if_link_params_free(struct interface *);
void if_link_params_free(struct interface *ifp);

/* Northbound. */
struct vty;
Expand Down
30 changes: 18 additions & 12 deletions lib/workqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ static void work_queue_item_free(struct work_queue_item *item)
return;
}

static inline void work_queue_item_dequeue(struct work_queue *wq,
struct work_queue_item *item)
{
assert(wq->item_count > 0);

wq->item_count--;
STAILQ_REMOVE(&wq->items, item, work_queue_item, wq);
}

static void work_queue_item_remove(struct work_queue *wq,
struct work_queue_item *item)
{
Expand Down Expand Up @@ -133,6 +142,13 @@ static int work_queue_schedule(struct work_queue *wq, unsigned int delay)
return 0;
}

static inline void work_queue_item_enqueue(struct work_queue *wq,
struct work_queue_item *item)
{
STAILQ_INSERT_TAIL(&wq->items, item, wq);
wq->item_count++;
}

void work_queue_add(struct work_queue *wq, void *data)
{
struct work_queue_item *item;
Expand Down Expand Up @@ -265,8 +281,7 @@ void work_queue_run(struct event *thread)
do {
ret = wq->spec.workfunc(wq, item->data);
item->ran++;
} while ((ret == WQ_RETRY_NOW)
&& (item->ran < wq->spec.max_retries));
} while (item->ran < wq->spec.max_retries);

switch (ret) {
case WQ_QUEUE_BLOCKED: {
Expand All @@ -276,9 +291,6 @@ void work_queue_run(struct event *thread)
item->ran--;
goto stats;
}
case WQ_RETRY_LATER: {
goto stats;
}
case WQ_REQUEUE: {
item->ran--;
work_queue_item_requeue(wq, item);
Expand All @@ -296,11 +308,6 @@ void work_queue_run(struct event *thread)
titem = item;
break;
}
case WQ_RETRY_NOW:
/* a RETRY_NOW that gets here has exceeded max_tries, same
* as ERROR
*/
fallthrough;
case WQ_SUCCESS:
default: {
work_queue_item_remove(wq, item);
Expand Down Expand Up @@ -352,8 +359,7 @@ void work_queue_run(struct event *thread)

/* Is the queue done yet? If it is, call the completion callback. */
if (!work_queue_empty(wq)) {
if (ret == WQ_RETRY_LATER ||
ret == WQ_QUEUE_BLOCKED)
if (ret == WQ_QUEUE_BLOCKED)
work_queue_schedule(wq, wq->spec.retry);
else
work_queue_schedule(wq, 0);
Expand Down
21 changes: 2 additions & 19 deletions lib/workqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ DECLARE_MTYPE(WORK_QUEUE);
/* action value, for use by item processor and item error handlers */
typedef enum {
WQ_SUCCESS = 0,
WQ_RETRY_NOW, /* retry immediately */
WQ_RETRY_LATER, /* retry later, cease processing work queue */
WQ_REQUEUE, /* requeue item, continue processing work queue */
WQ_REQUEUE, /* requeue item, continue processing work queue */
WQ_QUEUE_BLOCKED, /* Queue cant be processed at this time.
* Similar to WQ_RETRY_LATER, but doesn't penalise
* the particular item.. */
Expand Down Expand Up @@ -117,22 +115,6 @@ work_queue_last_item(struct work_queue *wq)
return STAILQ_LAST(&wq->items, work_queue_item, wq);
}

static inline void work_queue_item_enqueue(struct work_queue *wq,
struct work_queue_item *item)
{
STAILQ_INSERT_TAIL(&wq->items, item, wq);
wq->item_count++;
}

static inline void work_queue_item_dequeue(struct work_queue *wq,
struct work_queue_item *item)
{
assert(wq->item_count > 0);

wq->item_count--;
STAILQ_REMOVE(&wq->items, item, work_queue_item, wq);
}

/* create a new work queue, of given name.
* user must fill in the spec of the returned work queue before adding
* anything to it
Expand Down Expand Up @@ -160,6 +142,7 @@ bool work_queue_is_scheduled(struct work_queue *wq);
/* Helpers, exported for thread.c and command.c */
extern void work_queue_run(struct event *thread);

/* Function to initialize the workqueue cli */
extern void workqueue_cmd_init(void);

#ifdef __cplusplus
Expand Down
6 changes: 0 additions & 6 deletions tests/lib/test_heavy_wq.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ static wq_item_status slow_func(struct work_queue *wq, void *data)
for (j = 0; j < 300; j++)
x += sin(x) * j;

if ((hn->i % ITERS_LATER) == 0)
return WQ_RETRY_LATER;

if ((hn->i % ITERS_ERR) == 0)
return WQ_RETRY_NOW;

if ((hn->i % ITERS_PRINT) == 0)
printf("%s did %d, x = %g\n", hn->str, hn->i, x);

Expand Down
1 change: 1 addition & 0 deletions zebra/zebra_script.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ void lua_pushzebra_dplane_ctx(lua_State *L, const struct zebra_dplane_ctx *ctx)
lua_setfield(L, -2, "mtu");
}
lua_setfield(L, -2, "gre");
break;

case DPLANE_OP_ADDR_INSTALL:
case DPLANE_OP_ADDR_UNINSTALL:
Expand Down

0 comments on commit 4bdba57

Please sign in to comment.