From 4be689fdf637544dc076e9354edcc615212b4ba9 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Tue, 19 Sep 2023 08:58:09 +0200 Subject: [PATCH 01/15] test: jbuf packets with equal timestamps --- test/jbuf.c | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/test/jbuf.c b/test/jbuf.c index e7d3dad12..d6373efec 100644 --- a/test/jbuf.c +++ b/test/jbuf.c @@ -281,37 +281,53 @@ int test_jbuf_adaptive_video(void) hdr.ts = 100; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(1, jbuf_packets(jb)); hdr.seq = 2; hdr.ts = 100; /* Same frame */ err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(2, jbuf_packets(jb)); hdr.seq = 4; hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(3, jbuf_packets(jb)); hdr.seq = 3; /* unordered late packet */ hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(4, jbuf_packets(jb)); hdr.seq = 5; hdr.ts = 300; err = jbuf_put(jb, &hdr, frv[4]); TEST_ERR(err); - TEST_EQUALS(3, jbuf_frames(jb)); TEST_EQUALS(5, jbuf_packets(jb)); + err = jbuf_get(jb, &hdr2, &mem); /* first packet with unique frame */ + mem = mem_deref(mem); + TEST_EQUALS(EAGAIN, err); + TEST_EQUALS(1, hdr2.seq); + TEST_EQUALS(100, hdr2.ts); + + err = jbuf_get(jb, &hdr2, &mem); /* second packet with unique frame */ + mem = mem_deref(mem); + TEST_EQUALS(EAGAIN, err); /* n > wish */ + TEST_EQUALS(2, hdr2.seq); + TEST_EQUALS(100, hdr2.ts); + + err = jbuf_get(jb, &hdr2, &mem); + mem = mem_deref(mem); + TEST_EQUALS(EAGAIN, err); /* n > wish */ + err = jbuf_get(jb, &hdr2, &mem); + mem = mem_deref(mem); + TEST_ERR(err); /* n == wish */ + TEST_EQUALS(4, hdr2.seq); + TEST_EQUALS(200, hdr2.ts); + /* --- Test late packet, unique frame --- */ jbuf_flush(jb); @@ -319,28 +335,24 @@ int test_jbuf_adaptive_video(void) hdr.ts = 100; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(1, jbuf_packets(jb)); hdr.seq = 2; hdr.ts = 100; /* Same frame */ err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(2, jbuf_packets(jb)); hdr.seq = 4; hdr.ts = 300; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(3, jbuf_packets(jb)); hdr.seq = 3; /* unordered late packet */ hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); - TEST_EQUALS(3, jbuf_frames(jb)); TEST_EQUALS(4, jbuf_packets(jb)); /* --- Test lost get --- */ @@ -350,14 +362,12 @@ int test_jbuf_adaptive_video(void) hdr.ts = 100; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(1, jbuf_packets(jb)); hdr.seq = 2; hdr.ts = 100; /* Same frame */ err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(2, jbuf_packets(jb)); /* LOST hdr.seq = 3; */ @@ -366,26 +376,22 @@ int test_jbuf_adaptive_video(void) hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(3, jbuf_packets(jb)); hdr.seq = 5; hdr.ts = 300; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); - TEST_EQUALS(3, jbuf_frames(jb)); TEST_EQUALS(4, jbuf_packets(jb)); err = jbuf_get(jb, &hdr2, &mem); TEST_EQUALS(EAGAIN, err); mem = mem_deref(mem); - TEST_EQUALS(3, jbuf_frames(jb)); TEST_EQUALS(3, jbuf_packets(jb)); err = jbuf_get(jb, &hdr2, &mem); TEST_EQUALS(EAGAIN, err); mem = mem_deref(mem); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(2, jbuf_packets(jb)); err = 0; From 3b1d6a14107c2de86cca4342a8b6fb9b1c49518b Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Mon, 18 Sep 2023 16:11:01 +0200 Subject: [PATCH 02/15] jbuf: replace adaptive mode by frame completeness check A frame is a sequence of RTP packets with equal timestamp. Now `jbuf_get()` does not pass packets before the oldest frame is complete. --- include/re_jbuf.h | 5 +- src/jbuf/jbuf.c | 328 ++++++++++++++++++++++------------------------ 2 files changed, 160 insertions(+), 173 deletions(-) diff --git a/include/re_jbuf.h b/include/re_jbuf.h index f9698b485..71f8703ca 100644 --- a/include/re_jbuf.h +++ b/include/re_jbuf.h @@ -23,12 +23,12 @@ struct jbuf_stat { /** Jitter buffer type */ enum jbuf_type { JBUF_OFF, - JBUF_FIXED, - JBUF_ADAPTIVE + JBUF_FIXED }; int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max); +int jbuf_resize(struct jbuf *jb, uint32_t packets); int jbuf_set_type(struct jbuf *jb, enum jbuf_type jbtype); int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem); int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem); @@ -36,5 +36,4 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem); void jbuf_flush(struct jbuf *jb); int jbuf_stats(const struct jbuf *jb, struct jbuf_stat *jstat); int jbuf_debug(struct re_printf *pf, const struct jbuf *jb); -uint32_t jbuf_frames(const struct jbuf *jb); uint32_t jbuf_packets(const struct jbuf *jb); diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index d2a32dc56..40d2dac9a 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -1,9 +1,6 @@ /** * @file jbuf.c Jitter Buffer implementation * - * This is an adaptive jitter buffer implementation. See doc/jbuf for further - * details! - * * Copyright (C) 2010 Creytiv.com */ #include @@ -11,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +44,7 @@ enum { }; -/** Defines a packet frame */ +/** Defines a packet */ struct packet { struct le le; /**< Linked list element */ struct rtp_header hdr; /**< RTP Header */ @@ -57,19 +55,25 @@ struct packet { /** * Defines a jitter buffer * - * The jitter buffer is for incoming RTP packets, which are sorted by - * sequence number. + * The jitter buffer is for incoming RTP packets + * + * Features: + * - re-order out-of-order packets + * - ensure frame completeness + * - specify min/max number of frames */ struct jbuf { struct list pooll; /**< List of free packets in pool */ struct list packetl; /**< List of buffered packets */ uint32_t n; /**< [# packets] Current # of packets in buffer */ uint32_t nf; /**< [# frames] Current # of frames in buffer */ + uint32_t ncf; /**< [# frames] # of complete frames in buffer */ uint32_t min; /**< [# frames] Minimum # of frames to buffer */ uint32_t max; /**< [# frames] Maximum # of frames to buffer */ - uint32_t wish; /**< [# frames] Wish size for adaptive mode */ + uint32_t packets; /**< [# packets] Size of the packet pool */ uint16_t seq_put; /**< Sequence number for last jbuf_put() */ - uint16_t seq_get; /**< Sequence number of last played frame */ + uint16_t seq_get; /**< Sequence number of last played packet */ + struct le *end; /**< End of complete sequence */ uint32_t ssrc; /**< Previous ssrc */ uint64_t tr; /**< Time of previous jbuf_put() */ int pt; /**< Payload type */ @@ -89,6 +93,38 @@ struct jbuf { }; +/** + * Update frame counter and sequence end list element pointer before oldest + * packet is removed + * + * @param jb Jitter buffer + */ +static void jbuf_update_nf(struct jbuf *jb) +{ + struct le *le = jb->packetl.head; + + if (!le) + return; + + struct le *n = le->next; + + if (n) { + struct packet *p = le->data; + struct packet *pn = n->data; + + if (p->hdr.ts != pn->hdr.ts) { + if (jb->ncf) + --jb->ncf; + + --jb->nf; + } + } + + if (jb->end == le) + jb->end = NULL; +} + + /** Is x less than y? */ static inline bool seq_less(uint16_t x, uint16_t y) { @@ -148,7 +184,7 @@ static void plot_jbuf_event(struct jbuf *jb, char ph) /** - * Get a frame from the pool + * Get a packet from the pool */ static void packet_alloc(struct jbuf *jb, struct packet **f) { @@ -162,9 +198,10 @@ static void packet_alloc(struct jbuf *jb, struct packet **f) else { struct packet *f0; - /* Steal an old frame */ + /* Steal an old packet */ le = jb->packetl.head; f0 = le->data; + jbuf_update_nf(jb); #if JBUF_STAT STAT_INC(n_overflow); @@ -213,14 +250,13 @@ static void jbuf_destructor(void *data) * * @param jbp Pointer to returned jitter buffer * @param min Minimum delay in [frames] - * @param max Maximum delay in [packets] + * @param max Maximum delay in [frames] * * @return 0 if success, otherwise errorcode */ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) { struct jbuf *jb; - uint32_t i; int err = 0; if (!jbp || ( min > max)) @@ -242,10 +278,10 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) jb->jbtype = JBUF_FIXED; jb->min = min; jb->max = max; - jb->wish = min; + jb->packets = 0; tmr_init(&jb->tmr); - DEBUG_INFO("alloc: delay=%u-%u frames/packets\n", min, max); + DEBUG_INFO("alloc: delay=%u-%u [frames]\n", min, max); jb->pt = -1; err = mutex_alloc(&jb->lock); @@ -255,16 +291,7 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) mem_destructor(jb, jbuf_destructor); /* Allocate all packets now */ - for (i=0; imax; i++) { - struct packet *f = mem_zalloc(sizeof(*f), NULL); - if (!f) { - err = ENOMEM; - break; - } - - list_append(&jb->pooll, &f->le, f); - DEBUG_INFO("alloc: adding to pool list %u\n", i); - } + err = jbuf_resize(jb, max); out: if (err) @@ -276,6 +303,33 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) } +/** + * Resize the packet pool + * + * @param jb The jitter buffer + * @param packets Size of the packet pool in [packets]. Default: jb->max + * + * @return 0 if success, otherwise errorcode + */ +int jbuf_resize(struct jbuf *jb, uint32_t packets) +{ + if (packets <= jb->packets) + return EINVAL; + + for (uint32_t i=jb->packets; ipooll, &f->le, f); + DEBUG_INFO("alloc: adding to pool list %u\n", i); + } + + jb->packets = packets; + return 0; +} + + /** * Set jitter buffer type. * @@ -295,67 +349,61 @@ int jbuf_set_type(struct jbuf *jb, enum jbuf_type jbtype) } -static void wish_down(void *arg) +/** + * Moves end of complete sequence and counts complete frames. If `end` is NULL, + * a search starts from the head which points to the oldest packet + * + * @param jb Jitter buffer + * @param cur Current inserted list element + */ +static void jbuf_move_end(struct jbuf *jb, struct le *cur) { - struct jbuf *jb = arg; - - if (jb->wish > jb->min) { - DEBUG_INFO("wish size changed %u --> %u\n", jb->wish, - jb->wish - 1); - --jb->wish; + struct le *end; + + if (!jb->end) { + jb->ncf = 0; + cur = jb->packetl.head; + if (!cur) + return; + + jb->end = cur; + cur = cur->next; + if (!cur) + return; } -} - -static void calc_rdiff(struct jbuf *jb, uint16_t seq) -{ - int32_t rdiff; - int32_t adiff; - int32_t s; /**< EMA coefficient */ - float ratio = 1.0; /**< Frame packet ratio */ - uint32_t wish; - uint32_t max = jb->max; - bool down = false; - - if (jb->jbtype != JBUF_ADAPTIVE) + end = cur->prev; + if (!end) return; - if (!jb->seq_get) + /* update only if endsing packet was inserted right now */ + if (jb->end != end) return; - if (jb->nf) { - ratio = (float)jb->n / (float)jb->nf; - max = (uint32_t)(max / ratio); - } + for (; end->next; end = end->next) { + struct packet *pm = end->data; + struct packet *pn = end->next->data; - rdiff = (int16_t)(jb->seq_put + 1 - seq); - adiff = abs(rdiff * JBUF_RDIFF_EMA_COEFF); - s = adiff > jb->rdiff ? JBUF_RDIFF_UP_SPEED : - jb->wish > 2 ? 1 : - jb->wish > 1 ? 2 : 3; - jb->rdiff += (adiff - jb->rdiff) * s / JBUF_RDIFF_EMA_COEFF; + if (pm->hdr.seq + 1 != pn->hdr.seq) + break; - wish = (uint32_t)(jb->rdiff / (float)JBUF_RDIFF_EMA_COEFF / ratio); - if (wish < jb->min) - wish = jb->min; + if (pm->hdr.ts != pn->hdr.ts) + ++jb->ncf; + } - if (max && wish >= max) - wish = max - 1; + jb->end = end; +} - if (wish > jb->wish) { - DEBUG_INFO("wish size changed %u --> %u\n", jb->wish, wish); - jb->wish = wish; - } - else if (wish < jb->wish) { - uint32_t dt = wish + 1 == jb->wish ? 6000 : 1000; - if (!tmr_isrunning(&jb->tmr) || tmr_get_expire(&jb->tmr) > dt) - tmr_start(&jb->tmr, dt, wish_down, jb); - down = true; - } +static bool jbuf_frame_ready(struct jbuf *jb) +{ + if (!jb->packetl.head) + return false; + + if (jb->nf < jb->min) + return false; - if (!down && tmr_isrunning(&jb->tmr)) - tmr_cancel(&jb->tmr); + return jb->ncf || jb->nf > jb->max; } @@ -370,12 +418,10 @@ static void calc_rdiff(struct jbuf *jb, uint16_t seq) */ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) { - struct packet *f; - struct packet *fc; + struct packet *p; struct le *le, *tail; uint16_t seq; uint64_t tr, dt; - bool equal; int err = 0; if (!jb || !hdr) @@ -404,10 +450,6 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) jb->ssrc = hdr->ssrc; if (jb->running) { - - if (jb->jbtype == JBUF_ADAPTIVE) - calc_rdiff(jb, seq); - /* Packet arrived too late to be put into buffer */ if (jb->seq_get && seq_less(seq, jb->seq_get + 1)) { STAT_INC(n_late); @@ -423,7 +465,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) STAT_INC(n_put); - packet_alloc(jb, &f); + packet_alloc(jb, &p); tail = jb->packetl.tail; @@ -431,7 +473,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) Frame is later than tail -> append to tail */ if (!tail || seq_less(((struct packet *)tail->data)->hdr.seq, seq)) { - list_append(&jb->packetl, &f->le, f); + list_append(&jb->packetl, &p->le, p); goto success; } @@ -443,7 +485,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) DEBUG_PRINTF("put: out-of-sequence" " - inserting after seq=%u (seq=%u)\n", seq_le, seq); - list_insert_after(&jb->packetl, le, &f->le, f); + list_insert_after(&jb->packetl, le, &p->le, p); break; } else if (seq == seq_le) { /* less likely */ @@ -451,8 +493,8 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) DEBUG_INFO("duplicate: seq=%u\n", seq); STAT_INC(n_dups); plot_jbuf_event(jb, 'D'); - list_insert_after(&jb->packetl, le, &f->le, f); - packet_deref(jb, f); + list_insert_after(&jb->packetl, le, &p->le, p); + packet_deref(jb, p); err = EALREADY; goto out; } @@ -464,7 +506,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) if (!le) { DEBUG_PRINTF("put: out-of-sequence" " - put in head (seq=%u)\n", seq); - list_prepend(&jb->packetl, &f->le, f); + list_prepend(&jb->packetl, &p->le, p); } STAT_INC(n_oos); @@ -476,22 +518,13 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) jb->seq_put = seq; /* Success */ - f->hdr = *hdr; - f->mem = mem_ref(mem); + p->hdr = *hdr; + p->mem = mem_ref(mem); + if (tail && ((struct packet *)tail->data)->hdr.ts != hdr->ts) + jb->nf++; - equal = false; - if (f->le.prev) { - fc = f->le.prev->data; - equal = (fc->hdr.ts == f->hdr.ts); - } - - if (!equal && f->le.next) { - fc = f->le.next->data; - equal = (fc->hdr.ts == f->hdr.ts); - } - - if (!equal) - ++jb->nf; + /* Missing frame detection */ + jbuf_move_end(jb, &p->le); out: #ifdef RE_JBUF_TRACE @@ -514,7 +547,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) */ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) { - struct packet *f; + struct packet *p; int err = 0; if (!jb || !hdr || !mem) @@ -523,68 +556,51 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) mtx_lock(jb->lock); STAT_INC(n_get); - if (jb->nf <= jb->wish || !jb->packetl.head) { - DEBUG_INFO("not enough buffer packets - wait.. " - "(n=%u wish=%u)\n", jb->n, jb->wish); + if (!jbuf_frame_ready(jb)) { + DEBUG_INFO("no frame ready - wait.. " + "(n=%u min=%u)\n", jb->n, jb->min); STAT_INC(n_underflow); plot_jbuf_event(jb, 'U'); err = ENOENT; goto out; } - /* When we get one packet P[i], check that the next packet P[i+1] - is present and have a seq no. of seq[i] + 1. - If not, we should consider that packet lost. */ - - f = jb->packetl.head->data; + p = jb->packetl.head->data; #if JBUF_STAT /* Check sequence of previously played packet */ if (jb->seq_get) { - const int16_t seq_diff = f->hdr.seq - jb->seq_get; - if (seq_less(f->hdr.seq, jb->seq_get)) { - DEBUG_WARNING("get: seq=%u too late\n", f->hdr.seq); + const int16_t seq_diff = p->hdr.seq - jb->seq_get; + if (seq_less(p->hdr.seq, jb->seq_get)) { + DEBUG_WARNING("get: seq=%u too late\n", p->hdr.seq); } else if (seq_diff > 1) { STAT_ADD(n_lost, 1); plot_jbuf_event(jb, 'T'); DEBUG_INFO("get: n_lost: diff=%d,seq=%u,seq_get=%u\n", - seq_diff, f->hdr.seq, jb->seq_get); + seq_diff, p->hdr.seq, jb->seq_get); } } #endif /* Update sequence number for 'get' */ - jb->seq_get = f->hdr.seq; + jb->seq_get = p->hdr.seq; - *hdr = f->hdr; - *mem = mem_ref(f->mem); - - /* decrease not equal frames */ - if (f->le.next) { - struct packet *next_f = f->le.next->data; + *hdr = p->hdr; + *mem = mem_ref(p->mem); - if (f->hdr.ts != next_f->hdr.ts) - --jb->nf; - } - else { - --jb->nf; - } + jbuf_update_nf(jb); + packet_deref(jb, p); - packet_deref(jb, f); - - if (jb->nf > jb->wish) { - DEBUG_INFO("reducing jitter buffer " - "(nf=%u min=%u wish=%u max=%u)\n", - jb->nf, jb->min, jb->wish, jb->max); + if (jbuf_frame_ready(jb)) err = EAGAIN; - } out: mtx_unlock(jb->lock); return err; } + /** * Get one packet from the jitter buffer, even if it becomes depleted * @@ -621,17 +637,7 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) *hdr = f->hdr; *mem = mem_ref(f->mem); - /* decrease not equal frames */ - if (f->le.next) { - struct packet *next_f = f->le.next->data; - - if (f->hdr.ts != next_f->hdr.ts) - --jb->nf; - } - else { - --jb->nf; - } - + jbuf_update_nf(jb); packet_deref(jb, f); out: @@ -640,7 +646,7 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) } /** - * Flush all frames in the jitter buffer + * Flush all packets in the jitter buffer * * @param jb Jitter buffer */ @@ -656,12 +662,12 @@ void jbuf_flush(struct jbuf *jb) mtx_lock(jb->lock); if (jb->packetl.head) { - DEBUG_INFO("flush: %u frames\n", jb->n); + DEBUG_INFO("flush: %u packets\n", jb->n); } - /* put all buffered frames back in free list */ + /* put all buffered packets back in free list */ for (le = jb->packetl.head; le; le = jb->packetl.head) { - DEBUG_INFO(" flush frame: seq=%u\n", + DEBUG_INFO(" flush packet: seq=%u\n", ((struct packet *)(le->data))->hdr.seq); packet_deref(jb, le->data); @@ -669,6 +675,8 @@ void jbuf_flush(struct jbuf *jb) jb->n = 0; jb->nf = 0; + jb->ncf = 0; + jb->end = NULL; jb->running = false; jb->seq_get = 0; @@ -702,26 +710,6 @@ uint32_t jbuf_packets(const struct jbuf *jb) } -/** - * Get number of current frames - * - * @param jb Jitter buffer - * - * @return number of frames - */ -uint32_t jbuf_frames(const struct jbuf *jb) -{ - if (!jb) - return 0; - - mtx_lock(jb->lock); - uint32_t n = jb->nf; - mtx_unlock(jb->lock); - - return n; -} - - /** * Get jitter buffer statistics * @@ -767,8 +755,8 @@ int jbuf_debug(struct re_printf *pf, const struct jbuf *jb) mtx_lock(jb->lock); err |= mbuf_printf(mb, " running=%d", jb->running); - err |= mbuf_printf(mb, " min=%u cur=%u/%u max=%u [frames/packets]\n", - jb->min, jb->nf, jb->n, jb->max); + err |= mbuf_printf(mb, " min=%u cur=%u complete=%u max=%u [frames]\n", + jb->min, jb->nf, jb->ncf, jb->max); err |= mbuf_printf(mb, " seq_put=%u\n", jb->seq_put); #if JBUF_STAT From ffe4d12f1650f73ec0724bc960f69dcad07e7775 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Wed, 27 Sep 2023 15:18:11 +0200 Subject: [PATCH 03/15] jbuf: correct wording and var names --- src/jbuf/jbuf.c | 50 +++++++++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 40d2dac9a..009be7611 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -186,7 +186,7 @@ static void plot_jbuf_event(struct jbuf *jb, char ph) /** * Get a packet from the pool */ -static void packet_alloc(struct jbuf *jb, struct packet **f) +static void packet_alloc(struct jbuf *jb, struct packet **pp) { struct le *le; @@ -196,38 +196,38 @@ static void packet_alloc(struct jbuf *jb, struct packet **f) ++jb->n; } else { - struct packet *f0; + struct packet *p0; /* Steal an old packet */ le = jb->packetl.head; - f0 = le->data; + p0 = le->data; jbuf_update_nf(jb); #if JBUF_STAT STAT_INC(n_overflow); - DEBUG_WARNING("drop 1 old frame seq=%u (total dropped %u)\n", - f0->hdr.seq, jb->stat.n_overflow); + DEBUG_WARNING("drop 1 old packet seq=%u (total dropped %u)\n", + p0->hdr.seq, jb->stat.n_overflow); #else - DEBUG_WARNING("drop 1 old frame seq=%u\n", f0->hdr.seq); + DEBUG_WARNING("drop 1 old packet seq=%u\n", p0->hdr.seq); #endif plot_jbuf_event(jb, 'O'); - f0->mem = mem_deref(f0->mem); + p0->mem = mem_deref(p0->mem); list_unlink(le); } - *f = le->data; + *pp = le->data; } /** * Release a packet, put it back in the pool */ -static void packet_deref(struct jbuf *jb, struct packet *f) +static void packet_deref(struct jbuf *jb, struct packet *p) { - f->mem = mem_deref(f->mem); - list_unlink(&f->le); - list_append(&jb->pooll, &f->le, f); + p->mem = mem_deref(p->mem); + list_unlink(&p->le); + list_append(&jb->pooll, &p->le, p); --jb->n; } @@ -317,11 +317,11 @@ int jbuf_resize(struct jbuf *jb, uint32_t packets) return EINVAL; for (uint32_t i=jb->packets; ipooll, &f->le, f); + list_append(&jb->pooll, &p->le, p); DEBUG_INFO("alloc: adding to pool list %u\n", i); } @@ -523,7 +523,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) if (tail && ((struct packet *)tail->data)->hdr.ts != hdr->ts) jb->nf++; - /* Missing frame detection */ + /* check frame completeness */ jbuf_move_end(jb, &p->le); out: @@ -558,7 +558,7 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) if (!jbuf_frame_ready(jb)) { DEBUG_INFO("no frame ready - wait.. " - "(n=%u min=%u)\n", jb->n, jb->min); + "(nf=%u min=%u)\n", jb->nf, jb->min); STAT_INC(n_underflow); plot_jbuf_event(jb, 'U'); err = ENOENT; @@ -612,7 +612,7 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) */ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) { - struct packet *f; + struct packet *p; int err = 0; if (!jb || !hdr || !mem) @@ -625,20 +625,16 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) goto out; } - /* When we get one packet P[i], check that the next packet P[i+1] - is present and have a seq no. of seq[i] + 1. - If not, we should consider that packet lost. */ - - f = jb->packetl.head->data; + p = jb->packetl.head->data; /* Update sequence number for 'get' */ - jb->seq_get = f->hdr.seq; + jb->seq_get = p->hdr.seq; - *hdr = f->hdr; - *mem = mem_ref(f->mem); + *hdr = p->hdr; + *mem = mem_ref(p->mem); jbuf_update_nf(jb); - packet_deref(jb, f); + packet_deref(jb, p); out: mtx_unlock(jb->lock); From 98d4bdbe4491a0e661cacea9ee93cb2a93fe6044 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Wed, 27 Sep 2023 15:32:49 +0200 Subject: [PATCH 04/15] jbuf: update trace data --- src/jbuf/jbuf.c | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 009be7611..2064fbff0 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -78,7 +78,6 @@ struct jbuf { uint64_t tr; /**< Time of previous jbuf_put() */ int pt; /**< Payload type */ bool running; /**< Jitter buffer is running */ - int32_t rdiff; /**< Average out of order reverse diff */ struct tmr tmr; /**< Rdiff down timer */ mtx_t *lock; /**< Makes jitter buffer thread safe */ @@ -136,21 +135,19 @@ static inline bool seq_less(uint16_t x, uint16_t y) static void plot_jbuf(struct jbuf *jb, uint64_t tr) { uint32_t treal; - uint32_t rdiff = (uint32_t)(jb->rdiff / (float)JBUF_RDIFF_EMA_COEFF); if (!jb->tr00) jb->tr00 = tr; treal = (uint32_t) (tr - jb->tr00); re_snprintf(jb->buf, sizeof(jb->buf), - "%s, 0x%p, %u, %u, %u, %u, %u", + "%s, 0x%p, %u, %u, %u, %u", __func__, /* row 1 - grep */ jb, /* row 2 - grep optional */ treal, /* row 3 - plot x-axis */ - rdiff, /* row 4 - plot */ - jb->wish, /* row 5 - plot */ - jb->n, /* row 6 - plot */ - jb->nf); /* row 7 - plot */ + jb->n, /* row 4 - plot */ + jb->nf, /* row 5 - plot */ + jb->ncf); /* row 6 - plot */ re_trace_event("jbuf", "plot", 'P', NULL, 0, RE_TRACE_ARG_STRING_COPY, "line", jb->buf); } From 2b08fb90522509da7bb3bd807fd9987cb8b9f975 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Wed, 27 Sep 2023 15:43:13 +0200 Subject: [PATCH 05/15] test: jbuf - remove obsolete adaptive and wish --- test/jbuf.c | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/test/jbuf.c b/test/jbuf.c index d6373efec..41f76275c 100644 --- a/test/jbuf.c +++ b/test/jbuf.c @@ -148,8 +148,6 @@ int test_jbuf_adaptive(void) err = jbuf_alloc(&jb, 1, 10); TEST_ERR(err); - err = jbuf_set_type(jb, JBUF_ADAPTIVE); - TEST_ERR(err); for (i=0; i min reached, read first frame\n"); err = jbuf_get(jb, &hdr2, &mem); TEST_ERR(err); @@ -262,8 +260,6 @@ int test_jbuf_adaptive_video(void) err = jbuf_alloc(&jb, 1, 10); TEST_ERR(err); - err = jbuf_set_type(jb, JBUF_ADAPTIVE); - TEST_ERR(err); for (i=0; i wish */ + TEST_EQUALS(EAGAIN, err); /* n > min */ TEST_EQUALS(2, hdr2.seq); TEST_EQUALS(100, hdr2.ts); err = jbuf_get(jb, &hdr2, &mem); mem = mem_deref(mem); - TEST_EQUALS(EAGAIN, err); /* n > wish */ + TEST_EQUALS(EAGAIN, err); /* n > min */ err = jbuf_get(jb, &hdr2, &mem); mem = mem_deref(mem); - TEST_ERR(err); /* n == wish */ + TEST_ERR(err); /* n == min */ TEST_EQUALS(4, hdr2.seq); TEST_EQUALS(200, hdr2.ts); From a15575e008f23b6921b8ce7e2b5a6cdf23f948ba Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Wed, 27 Sep 2023 16:45:29 +0200 Subject: [PATCH 06/15] jbuf: rename n_underflow to n_waiting --- include/re_jbuf.h | 2 +- src/jbuf/jbuf.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/re_jbuf.h b/include/re_jbuf.h index 71f8703ca..902924c18 100644 --- a/include/re_jbuf.h +++ b/include/re_jbuf.h @@ -15,7 +15,7 @@ struct jbuf_stat { uint32_t n_late; /**< Number of frames arriving too late */ uint32_t n_lost; /**< Number of lost frames */ uint32_t n_overflow; /**< Number of overflows */ - uint32_t n_underflow; /**< Number of underflows */ + uint32_t n_waiting; /**< Number of read waiting */ uint32_t n_flush; /**< Number of times jitter buffer flushed */ }; diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 2064fbff0..a338980ff 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -556,7 +556,7 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) if (!jbuf_frame_ready(jb)) { DEBUG_INFO("no frame ready - wait.. " "(nf=%u min=%u)\n", jb->nf, jb->min); - STAT_INC(n_underflow); + STAT_INC(n_waiting); plot_jbuf_event(jb, 'U'); err = ENOENT; goto out; @@ -759,7 +759,7 @@ int jbuf_debug(struct re_printf *pf, const struct jbuf *jb) err |= mbuf_printf(mb, " dup=%u", jb->stat.n_dups); err |= mbuf_printf(mb, " late=%u", jb->stat.n_late); err |= mbuf_printf(mb, " or=%u", jb->stat.n_overflow); - err |= mbuf_printf(mb, " ur=%u", jb->stat.n_underflow); + err |= mbuf_printf(mb, " wait=%u", jb->stat.n_waiting); err |= mbuf_printf(mb, " flush=%u", jb->stat.n_flush); err |= mbuf_printf(mb, " put/get_ratio=%u%%", jb->stat.n_get ? 100*jb->stat.n_put/jb->stat.n_get : 0); From ebdcde7f14ab1c3bf90964bc72a94bdf71219f74 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Thu, 28 Sep 2023 13:25:47 +0200 Subject: [PATCH 07/15] jbuf: slowly reduce queued frames --- src/jbuf/jbuf.c | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index a338980ff..5844d6bc8 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -78,7 +78,8 @@ struct jbuf { uint64_t tr; /**< Time of previous jbuf_put() */ int pt; /**< Payload type */ bool running; /**< Jitter buffer is running */ - struct tmr tmr; /**< Rdiff down timer */ + struct tmr tmr; /**< Timer for EAGAIN */ + bool wait; /**< Wait flag for jbuf_get() */ mtx_t *lock; /**< Makes jitter buffer thread safe */ enum jbuf_type jbtype; /**< Jitter buffer type */ @@ -404,6 +405,24 @@ static bool jbuf_frame_ready(struct jbuf *jb) } +static void reset_wait(void *arg) +{ + struct jbuf *jb = arg; + + jb->wait = false; +} + + +static void eagain_later(struct jbuf *jb) +{ + jb->wait = true; + if (tmr_isrunning(&jb->tmr)) + return; + + tmr_start(&jb->tmr, 250, reset_wait, jb); +} + + /** * Put one packet into the jitter buffer * @@ -517,8 +536,10 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) /* Success */ p->hdr = *hdr; p->mem = mem_ref(mem); - if (tail && ((struct packet *)tail->data)->hdr.ts != hdr->ts) + if (tail && ((struct packet *)tail->data)->hdr.ts != hdr->ts) { jb->nf++; + jb->wait = false; + } /* check frame completeness */ jbuf_move_end(jb, &p->le); @@ -539,8 +560,8 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) * @param hdr Returned RTP Header * @param mem Pointer to memory object storage - referenced on success * - * @return 0 if success, EAGAIN if it should be called again in order to avoid - * a jitter buffer overflow, otherwise errorcode + * @return 0 if success, EAGAIN if it should be called again, otherwise + * errorcode */ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) { @@ -553,7 +574,7 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) mtx_lock(jb->lock); STAT_INC(n_get); - if (!jbuf_frame_ready(jb)) { + if (!jbuf_frame_ready(jb) || jb->wait) { DEBUG_INFO("no frame ready - wait.. " "(nf=%u min=%u)\n", jb->nf, jb->min); STAT_INC(n_waiting); @@ -589,8 +610,13 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) jbuf_update_nf(jb); packet_deref(jb, p); - if (jbuf_frame_ready(jb)) - err = EAGAIN; + if (jbuf_frame_ready(jb)) { + p = jb->packetl.head->data; + if (p->hdr.ts == hdr->ts) + err = EAGAIN; + else + eagain_later(jb); + } out: mtx_unlock(jb->lock); From 836b32fe57b1e4b0807be8dac30163aecb45fbbf Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Thu, 28 Sep 2023 13:51:02 +0200 Subject: [PATCH 08/15] jbuf: add again flag for audio --- src/jbuf/jbuf.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 5844d6bc8..9e4550ef5 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -80,6 +80,7 @@ struct jbuf { bool running; /**< Jitter buffer is running */ struct tmr tmr; /**< Timer for EAGAIN */ bool wait; /**< Wait flag for jbuf_get() */ + bool again; /**< Again flag for jbuf_get() */ mtx_t *lock; /**< Makes jitter buffer thread safe */ enum jbuf_type jbtype; /**< Jitter buffer type */ @@ -409,7 +410,8 @@ static void reset_wait(void *arg) { struct jbuf *jb = arg; - jb->wait = false; + jb->wait = false; + jb->again = true; } @@ -612,8 +614,10 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) if (jbuf_frame_ready(jb)) { p = jb->packetl.head->data; - if (p->hdr.ts == hdr->ts) + if (p->hdr.ts == hdr->ts || jb->again) { err = EAGAIN; + jb->again = false; + } else eagain_later(jb); } From 9965630393e09da7937ca1d69626c7684e3be382 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Thu, 28 Sep 2023 17:32:00 +0200 Subject: [PATCH 09/15] jbuf: delay buffer shrink after last out-of-order packet --- src/jbuf/jbuf.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 9e4550ef5..3325c7a81 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -38,9 +38,8 @@ #endif enum { - JBUF_RDIFF_EMA_COEFF = 1024, - JBUF_RDIFF_UP_SPEED = 512, JBUF_PUT_TIMEOUT = 400, + JBUF_WAIT_TIMEOUT = 1000, }; @@ -421,7 +420,7 @@ static void eagain_later(struct jbuf *jb) if (tmr_isrunning(&jb->tmr)) return; - tmr_start(&jb->tmr, 250, reset_wait, jb); + tmr_start(&jb->tmr, JBUF_WAIT_TIMEOUT, reset_wait, jb); } @@ -504,6 +503,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) " - inserting after seq=%u (seq=%u)\n", seq_le, seq); list_insert_after(&jb->packetl, le, &p->le, p); + tmr_start(&jb->tmr, JBUF_WAIT_TIMEOUT, reset_wait, jb); break; } else if (seq == seq_le) { /* less likely */ @@ -539,7 +539,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) p->hdr = *hdr; p->mem = mem_ref(mem); if (tail && ((struct packet *)tail->data)->hdr.ts != hdr->ts) { - jb->nf++; + ++jb->nf; jb->wait = false; } From 9a6c8c55074835e0bc660083beb13ea8f379b516 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Fri, 29 Sep 2023 09:03:19 +0200 Subject: [PATCH 10/15] jbuf: do not restart timer if oos --- src/jbuf/jbuf.c | 1 - 1 file changed, 1 deletion(-) diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 3325c7a81..5d705c01b 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -503,7 +503,6 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) " - inserting after seq=%u (seq=%u)\n", seq_le, seq); list_insert_after(&jb->packetl, le, &p->le, p); - tmr_start(&jb->tmr, JBUF_WAIT_TIMEOUT, reset_wait, jb); break; } else if (seq == seq_le) { /* less likely */ From b6b1b0b90e9ee3ed97238c64cfe46c98f2e47dfe Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Fri, 29 Sep 2023 10:00:53 +0200 Subject: [PATCH 11/15] jbuf: add jbuf_frames() again --- include/re_jbuf.h | 1 + src/jbuf/jbuf.c | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/include/re_jbuf.h b/include/re_jbuf.h index 902924c18..77ad1961e 100644 --- a/include/re_jbuf.h +++ b/include/re_jbuf.h @@ -37,3 +37,4 @@ void jbuf_flush(struct jbuf *jb); int jbuf_stats(const struct jbuf *jb, struct jbuf_stat *jstat); int jbuf_debug(struct re_printf *pf, const struct jbuf *jb); uint32_t jbuf_packets(const struct jbuf *jb); +uint32_t jbuf_frames(const struct jbuf *jb); diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 5d705c01b..6666d356d 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -732,6 +732,26 @@ uint32_t jbuf_packets(const struct jbuf *jb) } +/** + * Get number of current frames + * + * @param jb Jitter buffer + * + * @return number of frames + */ +uint32_t jbuf_frames(const struct jbuf *jb) +{ + if (!jb) + return 0; + + mtx_lock(jb->lock); + uint32_t n = jb->nf; + mtx_unlock(jb->lock); + + return n; +} + + /** * Get jitter buffer statistics * From 3d865a1b40f92ff805efff13054b410f05742e49 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Fri, 29 Sep 2023 13:59:55 +0200 Subject: [PATCH 12/15] jbuf: no frame completeness if min is zero --- src/jbuf/jbuf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 6666d356d..752642a8d 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -401,7 +401,7 @@ static bool jbuf_frame_ready(struct jbuf *jb) if (jb->nf < jb->min) return false; - return jb->ncf || jb->nf > jb->max; + return jb->ncf || !jb->min || jb->nf > jb->max; } From d123c911c9542852d43d8cd149bd1fe4d90de2f0 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Fri, 29 Sep 2023 14:00:30 +0200 Subject: [PATCH 13/15] jbuf: add getter for ncf --- include/re_jbuf.h | 1 + src/jbuf/jbuf.c | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/include/re_jbuf.h b/include/re_jbuf.h index 77ad1961e..46904eb26 100644 --- a/include/re_jbuf.h +++ b/include/re_jbuf.h @@ -38,3 +38,4 @@ int jbuf_stats(const struct jbuf *jb, struct jbuf_stat *jstat); int jbuf_debug(struct re_printf *pf, const struct jbuf *jb); uint32_t jbuf_packets(const struct jbuf *jb); uint32_t jbuf_frames(const struct jbuf *jb); +uint32_t jbuf_complete_frames(const struct jbuf *jb); diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 752642a8d..587171531 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -752,6 +752,26 @@ uint32_t jbuf_frames(const struct jbuf *jb) } +/** + * Get number of complete frames + * + * @param jb Jitter buffer + * + * @return number of frames + */ +uint32_t jbuf_complete_frames(const struct jbuf *jb) +{ + if (!jb) + return 0; + + mtx_lock(jb->lock); + uint32_t n = jb->ncf; + mtx_unlock(jb->lock); + + return n; +} + + /** * Get jitter buffer statistics * From ef2943350aa10ceef57a1fadd9cde578521e5620 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Fri, 29 Sep 2023 14:00:55 +0200 Subject: [PATCH 14/15] test: update jbuf tests --- test/jbuf.c | 173 ++++++++++++++++++++++++++++++++++++---------------- test/test.c | 4 +- test/test.h | 4 +- 3 files changed, 125 insertions(+), 56 deletions(-) diff --git a/test/jbuf.c b/test/jbuf.c index 41f76275c..7ca2eb0c0 100644 --- a/test/jbuf.c +++ b/test/jbuf.c @@ -37,11 +37,7 @@ int test_jbuf(void) /* Empty list */ DEBUG_INFO("test frame: Empty list\n"); - if (ENOENT != jbuf_get(jb, &hdr2, &mem)) { - err = EINVAL; - goto out; - } - + TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); /* One frame */ DEBUG_INFO("test frame: One frame\n"); @@ -50,12 +46,12 @@ int test_jbuf(void) hdr.ts = 1; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - if ((EALREADY != jbuf_put(jb, &hdr, frv[0]))) {err = EINVAL; goto out;} + TEST_EQUALS(EALREADY, jbuf_put(jb, &hdr, frv[0])); err = jbuf_get(jb, &hdr2, &mem); TEST_ERR(err); - if (160 != hdr2.seq) {err = EINVAL; goto out;} - if (mem != frv[0]) {err = EINVAL; goto out;} + TEST_EQUALS(160, hdr2.seq); + TEST_EQUALS(frv[0], mem); mem = mem_deref(mem); if (ENOENT != jbuf_get(jb, &hdr2, &mem)) {err = EINVAL; goto out;} @@ -72,8 +68,7 @@ int test_jbuf(void) err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_EQUALS(EAGAIN, jbuf_get(jb, &hdr2, &mem)); if (320 != hdr2.seq) {err = EINVAL; goto out;} if (mem != frv[0]) {err = EINVAL; goto out;} mem = mem_deref(mem); @@ -101,14 +96,12 @@ int test_jbuf(void) err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_EQUALS(EAGAIN, jbuf_get(jb, &hdr2, &mem)); if (640 != hdr2.seq) {err = EINVAL; goto out;} if (mem != frv[0]) {err = EINVAL; goto out;} mem = mem_deref(mem); - err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_EQUALS(EAGAIN, jbuf_get(jb, &hdr2, &mem)); if (800 != hdr2.seq) {err = EINVAL; goto out;} if (mem != frv[1]) {err = EINVAL; goto out;} mem = mem_deref(mem); @@ -132,7 +125,7 @@ int test_jbuf(void) } -int test_jbuf_adaptive(void) +int test_jbuf_frames(void) { struct rtp_header hdr, hdr2; struct jbuf *jb = NULL; @@ -169,7 +162,7 @@ int test_jbuf_adaptive(void) TEST_ERR(err); TEST_EQUALS(EALREADY, jbuf_put(jb, &hdr, frv[0])); - /* min size is not reached yet */ + /* not able to decide that frame is complete */ TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); hdr.seq = 161; @@ -177,54 +170,59 @@ int test_jbuf_adaptive(void) err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - /* min size reached */ - DEBUG_INFO("n > min reached, read first frame\n"); + /* detected complete frame */ + DEBUG_INFO("got complete frame, read first frame\n"); err = jbuf_get(jb, &hdr2, &mem); TEST_ERR(err); TEST_EQUALS(160, hdr2.seq); TEST_EQUALS(mem, frv[0]); mem = mem_deref(mem); - DEBUG_INFO("n <= min, leads to ENOENT\n"); + DEBUG_INFO("no other complete frame, leads to ENOENT\n"); TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); /* Four frames */ DEBUG_INFO("test frame: Four frames\n"); jbuf_flush(jb); - hdr.seq = 1; - hdr.ts = 100; + hdr.seq = hdr.ts = 1; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - hdr.seq = 2; - hdr.ts = 200; + hdr.seq = hdr.ts = 2; err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - hdr.seq = 3; - hdr.ts = 300; + hdr.seq = hdr.ts = 3; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - hdr.seq = 4; - hdr.ts = 400; + hdr.seq = hdr.ts = 4; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_ERR(err); TEST_EQUALS(1, hdr2.seq); TEST_EQUALS(mem, frv[0]); mem = mem_deref(mem); + /* slowly reduce buffer */ + TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); + + hdr.seq = hdr.ts = 5; + err = jbuf_put(jb, &hdr, frv[3]); + TEST_ERR(err); err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_ERR(err); TEST_EQUALS(2, hdr2.seq); TEST_EQUALS(mem, frv[1]); mem = mem_deref(mem); + hdr.seq = hdr.ts = 6; + err = jbuf_put(jb, &hdr, frv[3]); + TEST_ERR(err); err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(0, err); + TEST_ERR(err); TEST_EQUALS(3, hdr2.seq); TEST_EQUALS(mem, frv[2]); mem = mem_deref(mem); @@ -244,7 +242,7 @@ int test_jbuf_adaptive(void) } -int test_jbuf_adaptive_video(void) +int test_jbuf_video_frames(void) { struct rtp_header hdr, hdr2; struct jbuf *jb = NULL; @@ -258,7 +256,10 @@ int test_jbuf_adaptive_video(void) memset(&hdr2, 0, sizeof(hdr2)); hdr.ssrc = 1; - err = jbuf_alloc(&jb, 1, 10); + err = jbuf_alloc(&jb, 1, 3); + TEST_ERR(err); + + err = jbuf_resize(jb, 10); TEST_ERR(err); for (i=0; i min */ TEST_EQUALS(2, hdr2.seq); TEST_EQUALS(100, hdr2.ts); - err = jbuf_get(jb, &hdr2, &mem); + /* waiting */ + TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); + + hdr.seq = 6; + hdr.ts = 400; + err = jbuf_put(jb, &hdr, frv[4]); + TEST_ERR(err); + TEST_EQUALS(4, jbuf_packets(jb)); + TEST_EQUALS(2, jbuf_frames(jb)); + + /* first packet of frame 2 */ + TEST_EQUALS(EAGAIN, jbuf_get(jb, &hdr2, &mem)); mem = mem_deref(mem); - TEST_EQUALS(EAGAIN, err); /* n > min */ - err = jbuf_get(jb, &hdr2, &mem); + TEST_EQUALS(3, hdr2.seq); + TEST_EQUALS(200, hdr2.ts); + + /* second packet of frame 2 */ + TEST_ERR(jbuf_get(jb, &hdr2, &mem)); mem = mem_deref(mem); - TEST_ERR(err); /* n == min */ TEST_EQUALS(4, hdr2.seq); TEST_EQUALS(200, hdr2.ts); + /* waiting */ + TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); + /* --- Test late packet, unique frame --- */ jbuf_flush(jb); @@ -332,24 +355,32 @@ int test_jbuf_adaptive_video(void) err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); TEST_EQUALS(1, jbuf_packets(jb)); + TEST_EQUALS(0, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); hdr.seq = 2; hdr.ts = 100; /* Same frame */ err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); TEST_EQUALS(2, jbuf_packets(jb)); + TEST_EQUALS(0, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); hdr.seq = 4; hdr.ts = 300; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); TEST_EQUALS(3, jbuf_packets(jb)); + TEST_EQUALS(1, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); hdr.seq = 3; /* unordered late packet */ hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); TEST_EQUALS(4, jbuf_packets(jb)); + TEST_EQUALS(2, jbuf_frames(jb)); + TEST_EQUALS(2, jbuf_complete_frames(jb)); /* --- Test lost get --- */ jbuf_flush(jb); @@ -359,12 +390,16 @@ int test_jbuf_adaptive_video(void) err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); TEST_EQUALS(1, jbuf_packets(jb)); + TEST_EQUALS(0, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); hdr.seq = 2; hdr.ts = 100; /* Same frame */ err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); TEST_EQUALS(2, jbuf_packets(jb)); + TEST_EQUALS(0, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); /* LOST hdr.seq = 3; */ @@ -373,22 +408,56 @@ int test_jbuf_adaptive_video(void) err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); TEST_EQUALS(3, jbuf_packets(jb)); + TEST_EQUALS(1, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); hdr.seq = 5; hdr.ts = 300; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); TEST_EQUALS(4, jbuf_packets(jb)); + TEST_EQUALS(2, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); - err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + /* no complete */ + TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); + + /* 3rd frame */ + hdr.seq = 6; + hdr.ts = 400; + err = jbuf_put(jb, &hdr, frv[1]); + TEST_ERR(err); + TEST_EQUALS(5, jbuf_packets(jb)); + TEST_EQUALS(3, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); + + /* reach max frames */ + hdr.seq = 7; + hdr.ts = 500; + err = jbuf_put(jb, &hdr, frv[1]); + TEST_ERR(err); + TEST_EQUALS(6, jbuf_packets(jb)); + TEST_EQUALS(4, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); + + TEST_EQUALS(EAGAIN, jbuf_get(jb, &hdr2, &mem)); mem = mem_deref(mem); - TEST_EQUALS(3, jbuf_packets(jb)); + TEST_EQUALS(1, hdr2.seq); + TEST_EQUALS(100, hdr2.ts); + TEST_EQUALS(5, jbuf_packets(jb)); + TEST_EQUALS(4, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); - err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_ERR(jbuf_get(jb, &hdr2, &mem)); mem = mem_deref(mem); - TEST_EQUALS(2, jbuf_packets(jb)); + TEST_EQUALS(2, hdr2.seq); + TEST_EQUALS(100, hdr2.ts); + TEST_EQUALS(4, jbuf_packets(jb)); + TEST_EQUALS(3, jbuf_frames(jb)); + TEST_EQUALS(0, jbuf_complete_frames(jb)); + + /* waiting */ + TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); err = 0; diff --git a/test/test.c b/test/test.c index 92afbebc4..d0fe8d355 100644 --- a/test/test.c +++ b/test/test.c @@ -124,8 +124,8 @@ static const struct test tests[] = { TEST(test_ice_cand), TEST(test_ice_loop), TEST(test_jbuf), - TEST(test_jbuf_adaptive), - TEST(test_jbuf_adaptive_video), + TEST(test_jbuf_frames), + TEST(test_jbuf_video_frames), TEST(test_json), TEST(test_json_file), TEST(test_json_unicode), diff --git a/test/test.h b/test/test.h index 5f92e98d7..a31737351 100644 --- a/test/test.h +++ b/test/test.h @@ -228,8 +228,8 @@ int test_httpauth_digest_request(void); int test_ice_loop(void); int test_ice_cand(void); int test_jbuf(void); -int test_jbuf_adaptive(void); -int test_jbuf_adaptive_video(void); +int test_jbuf_frames(void); +int test_jbuf_video_frames(void); int test_json(void); int test_json_bad(void); int test_json_file(void); From 09383fcef2a3cd8af95ac2d5751a35a0337e2374 Mon Sep 17 00:00:00 2001 From: Christian Spielberger Date: Mon, 2 Oct 2023 14:15:23 +0200 Subject: [PATCH 15/15] jbuf: check `seq_get` if `min=0` configured --- src/jbuf/jbuf.c | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 587171531..21d5c90a8 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -113,15 +113,16 @@ static void jbuf_update_nf(struct jbuf *jb) struct packet *pn = n->data; if (p->hdr.ts != pn->hdr.ts) { + --jb->nf; if (jb->ncf) --jb->ncf; - - --jb->nf; } } - if (jb->end == le) + if (jb->end == le) { jb->end = NULL; + jb->nf = jb->ncf = 0; + } } @@ -289,7 +290,7 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) mem_destructor(jb, jbuf_destructor); /* Allocate all packets now */ - err = jbuf_resize(jb, max); + err = jbuf_resize(jb, max + 1); out: if (err) @@ -364,8 +365,12 @@ static void jbuf_move_end(struct jbuf *jb, struct le *cur) if (!cur) return; - jb->end = cur; - cur = cur->next; + struct packet *pm = cur->data; + if (!jb->seq_get || jb->seq_get + 1 == pm->hdr.seq) { + jb->end = cur; + cur = cur->next; + } + if (!cur) return; } @@ -401,7 +406,13 @@ static bool jbuf_frame_ready(struct jbuf *jb) if (jb->nf < jb->min) return false; - return jb->ncf || !jb->min || jb->nf > jb->max; + if (jb->nf > jb->max) + return true; + + if (!jb->end) + return false; + + return jb->ncf || !jb->min; }