diff --git a/zebra/zserv.c b/zebra/zserv.c index 6a64176d98b3..72597341b430 100644 --- a/zebra/zserv.c +++ b/zebra/zserv.c @@ -306,6 +306,14 @@ static void zserv_write(struct event *thread) * this task reschedules itself. * * Any failure in any of these actions is handled by terminating the client. + * + * The client's input buffer ibuf_fifo can have a maximum items as configured + * in the packets_to_process. This way we are not filling up the FIFO more + * than the maximum when the zebra main is busy. If the fifo has space, we + * reschedule ourselves to read more. + * + * The main thread processes the items in ibuf_fifo and always signals the + * client IO thread. */ static void zserv_read(struct event *thread) { @@ -313,15 +321,25 @@ static void zserv_read(struct event *thread) int sock; size_t already; struct stream_fifo *cache; - uint32_t p2p_orig; - - uint32_t p2p; + uint32_t p2p; /* Temp p2p used to process */ + uint32_t p2p_orig; /* Configured p2p (Default-1000) */ + int p2p_avail; /* How much space is available for p2p */ struct zmsghdr hdr; + size_t client_ibuf_fifo_cnt = stream_fifo_count_safe(client->ibuf_fifo); p2p_orig = atomic_load_explicit(&zrouter.packets_to_process, memory_order_relaxed); + p2p_avail = p2p_orig - client_ibuf_fifo_cnt; + + /* + * Do nothing if ibuf_fifo count has reached its max limit. Otherwise + * proceed and reschedule ourselves if there is space in the ibuf_fifo. + */ + if (p2p_avail <= 0) + return; + + p2p = p2p_avail; cache = stream_fifo_new(); - p2p = p2p_orig; sock = EVENT_FD(thread); while (p2p) { @@ -421,7 +439,7 @@ static void zserv_read(struct event *thread) p2p--; } - if (p2p < p2p_orig) { + if (p2p < (uint32_t)p2p_avail) { uint64_t time_now = monotime(NULL); /* update session statistics */ @@ -435,19 +453,23 @@ static void zserv_read(struct event *thread) while (cache->head) stream_fifo_push(client->ibuf_fifo, stream_fifo_pop(cache)); + /* Need to update count as main thread could have processed few */ + client_ibuf_fifo_cnt = + stream_fifo_count_safe(client->ibuf_fifo); } /* Schedule job to process those packets */ zserv_event(client, ZSERV_PROCESS_MESSAGES); - } if (IS_ZEBRA_DEBUG_PACKET) - zlog_debug("Read %d packets from client: %s", p2p_orig - p2p, - zebra_route_string(client->proto)); + zlog_debug("Read %d packets from client: %s. Current ibuf fifo count: %zu. Conf P2p %d", + p2p_avail - p2p, zebra_route_string(client->proto), + client_ibuf_fifo_cnt, p2p_orig); - /* Reschedule ourselves */ - zserv_client_event(client, ZSERV_CLIENT_READ); + /* Reschedule ourselves since we have space in ibuf_fifo */ + if (client_ibuf_fifo_cnt != p2p_orig) + zserv_client_event(client, ZSERV_CLIENT_READ); stream_fifo_free(cache); @@ -483,14 +505,20 @@ static void zserv_client_event(struct zserv *client, * as the task argument. * * Each message is popped off the client's input queue and the action associated - * with the message is executed. This proceeds until there are no more messages, - * an error occurs, or the processing limit is reached. + * with the message is executed. This proceeds until an error occurs, or the + * processing limit is reached. * * The client's I/O thread can push at most zrouter.packets_to_process messages * onto the input buffer before notifying us there are packets to read. As long * as we always process zrouter.packets_to_process messages here, then we can * rely on the read thread to handle queuing this task enough times to process * everything on the input queue. + * + * If the client ibuf always schedules a wakeup to the client IO to read more + * items from the socked buffer. This way we ensure + * - Client IO thread always tries to read the socket buffer and add more + * items to the ibuf_fifo (until max limit) + * - the hidden config change (zebra zapi-packets <>) is taken into account. */ static void zserv_process_messages(struct event *thread) { @@ -524,6 +552,9 @@ static void zserv_process_messages(struct event *thread) /* Reschedule ourselves if necessary */ if (need_resched) zserv_event(client, ZSERV_PROCESS_MESSAGES); + + /* Signal a wake up to client IO thread */ + zserv_client_event(client, ZSERV_CLIENT_READ); } int zserv_send_message(struct zserv *client, struct stream *msg)