diff --git a/include/re_jbuf.h b/include/re_jbuf.h index f9698b485..46904eb26 100644 --- a/include/re_jbuf.h +++ b/include/re_jbuf.h @@ -15,7 +15,7 @@ struct jbuf_stat { uint32_t n_late; /**< Number of frames arriving too late */ uint32_t n_lost; /**< Number of lost frames */ uint32_t n_overflow; /**< Number of overflows */ - uint32_t n_underflow; /**< Number of underflows */ + uint32_t n_waiting; /**< Number of read waiting */ uint32_t n_flush; /**< Number of times jitter buffer flushed */ }; @@ -23,12 +23,12 @@ struct jbuf_stat { /** Jitter buffer type */ enum jbuf_type { JBUF_OFF, - JBUF_FIXED, - JBUF_ADAPTIVE + JBUF_FIXED }; int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max); +int jbuf_resize(struct jbuf *jb, uint32_t packets); int jbuf_set_type(struct jbuf *jb, enum jbuf_type jbtype); int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem); int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem); @@ -36,5 +36,6 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem); void jbuf_flush(struct jbuf *jb); int jbuf_stats(const struct jbuf *jb, struct jbuf_stat *jstat); int jbuf_debug(struct re_printf *pf, const struct jbuf *jb); -uint32_t jbuf_frames(const struct jbuf *jb); uint32_t jbuf_packets(const struct jbuf *jb); +uint32_t jbuf_frames(const struct jbuf *jb); +uint32_t jbuf_complete_frames(const struct jbuf *jb); diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index d2a32dc56..21d5c90a8 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -1,9 +1,6 @@ /** * @file jbuf.c Jitter Buffer implementation * - * This is an adaptive jitter buffer implementation. See doc/jbuf for further - * details! - * * Copyright (C) 2010 Creytiv.com */ #include @@ -11,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -40,13 +38,12 @@ #endif enum { - JBUF_RDIFF_EMA_COEFF = 1024, - JBUF_RDIFF_UP_SPEED = 512, JBUF_PUT_TIMEOUT = 400, + JBUF_WAIT_TIMEOUT = 1000, }; -/** Defines a packet frame */ +/** Defines a packet */ struct packet { struct le le; /**< Linked list element */ struct rtp_header hdr; /**< RTP Header */ @@ -57,25 +54,32 @@ struct packet { /** * Defines a jitter buffer * - * The jitter buffer is for incoming RTP packets, which are sorted by - * sequence number. + * The jitter buffer is for incoming RTP packets + * + * Features: + * - re-order out-of-order packets + * - ensure frame completeness + * - specify min/max number of frames */ struct jbuf { struct list pooll; /**< List of free packets in pool */ struct list packetl; /**< List of buffered packets */ uint32_t n; /**< [# packets] Current # of packets in buffer */ uint32_t nf; /**< [# frames] Current # of frames in buffer */ + uint32_t ncf; /**< [# frames] # of complete frames in buffer */ uint32_t min; /**< [# frames] Minimum # of frames to buffer */ uint32_t max; /**< [# frames] Maximum # of frames to buffer */ - uint32_t wish; /**< [# frames] Wish size for adaptive mode */ + uint32_t packets; /**< [# packets] Size of the packet pool */ uint16_t seq_put; /**< Sequence number for last jbuf_put() */ - uint16_t seq_get; /**< Sequence number of last played frame */ + uint16_t seq_get; /**< Sequence number of last played packet */ + struct le *end; /**< End of complete sequence */ uint32_t ssrc; /**< Previous ssrc */ uint64_t tr; /**< Time of previous jbuf_put() */ int pt; /**< Payload type */ bool running; /**< Jitter buffer is running */ - int32_t rdiff; /**< Average out of order reverse diff */ - struct tmr tmr; /**< Rdiff down timer */ + struct tmr tmr; /**< Timer for EAGAIN */ + bool wait; /**< Wait flag for jbuf_get() */ + bool again; /**< Again flag for jbuf_get() */ mtx_t *lock; /**< Makes jitter buffer thread safe */ enum jbuf_type jbtype; /**< Jitter buffer type */ @@ -89,6 +93,39 @@ struct jbuf { }; +/** + * Update frame counter and sequence end list element pointer before oldest + * packet is removed + * + * @param jb Jitter buffer + */ +static void jbuf_update_nf(struct jbuf *jb) +{ + struct le *le = jb->packetl.head; + + if (!le) + return; + + struct le *n = le->next; + + if (n) { + struct packet *p = le->data; + struct packet *pn = n->data; + + if (p->hdr.ts != pn->hdr.ts) { + --jb->nf; + if (jb->ncf) + --jb->ncf; + } + } + + if (jb->end == le) { + jb->end = NULL; + jb->nf = jb->ncf = 0; + } +} + + /** Is x less than y? */ static inline bool seq_less(uint16_t x, uint16_t y) { @@ -100,21 +137,19 @@ static inline bool seq_less(uint16_t x, uint16_t y) static void plot_jbuf(struct jbuf *jb, uint64_t tr) { uint32_t treal; - uint32_t rdiff = (uint32_t)(jb->rdiff / (float)JBUF_RDIFF_EMA_COEFF); if (!jb->tr00) jb->tr00 = tr; treal = (uint32_t) (tr - jb->tr00); re_snprintf(jb->buf, sizeof(jb->buf), - "%s, 0x%p, %u, %u, %u, %u, %u", + "%s, 0x%p, %u, %u, %u, %u", __func__, /* row 1 - grep */ jb, /* row 2 - grep optional */ treal, /* row 3 - plot x-axis */ - rdiff, /* row 4 - plot */ - jb->wish, /* row 5 - plot */ - jb->n, /* row 6 - plot */ - jb->nf); /* row 7 - plot */ + jb->n, /* row 4 - plot */ + jb->nf, /* row 5 - plot */ + jb->ncf); /* row 6 - plot */ re_trace_event("jbuf", "plot", 'P', NULL, 0, RE_TRACE_ARG_STRING_COPY, "line", jb->buf); } @@ -148,9 +183,9 @@ static void plot_jbuf_event(struct jbuf *jb, char ph) /** - * Get a frame from the pool + * Get a packet from the pool */ -static void packet_alloc(struct jbuf *jb, struct packet **f) +static void packet_alloc(struct jbuf *jb, struct packet **pp) { struct le *le; @@ -160,37 +195,38 @@ static void packet_alloc(struct jbuf *jb, struct packet **f) ++jb->n; } else { - struct packet *f0; + struct packet *p0; - /* Steal an old frame */ + /* Steal an old packet */ le = jb->packetl.head; - f0 = le->data; + p0 = le->data; + jbuf_update_nf(jb); #if JBUF_STAT STAT_INC(n_overflow); - DEBUG_WARNING("drop 1 old frame seq=%u (total dropped %u)\n", - f0->hdr.seq, jb->stat.n_overflow); + DEBUG_WARNING("drop 1 old packet seq=%u (total dropped %u)\n", + p0->hdr.seq, jb->stat.n_overflow); #else - DEBUG_WARNING("drop 1 old frame seq=%u\n", f0->hdr.seq); + DEBUG_WARNING("drop 1 old packet seq=%u\n", p0->hdr.seq); #endif plot_jbuf_event(jb, 'O'); - f0->mem = mem_deref(f0->mem); + p0->mem = mem_deref(p0->mem); list_unlink(le); } - *f = le->data; + *pp = le->data; } /** * Release a packet, put it back in the pool */ -static void packet_deref(struct jbuf *jb, struct packet *f) +static void packet_deref(struct jbuf *jb, struct packet *p) { - f->mem = mem_deref(f->mem); - list_unlink(&f->le); - list_append(&jb->pooll, &f->le, f); + p->mem = mem_deref(p->mem); + list_unlink(&p->le); + list_append(&jb->pooll, &p->le, p); --jb->n; } @@ -213,14 +249,13 @@ static void jbuf_destructor(void *data) * * @param jbp Pointer to returned jitter buffer * @param min Minimum delay in [frames] - * @param max Maximum delay in [packets] + * @param max Maximum delay in [frames] * * @return 0 if success, otherwise errorcode */ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) { struct jbuf *jb; - uint32_t i; int err = 0; if (!jbp || ( min > max)) @@ -242,10 +277,10 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) jb->jbtype = JBUF_FIXED; jb->min = min; jb->max = max; - jb->wish = min; + jb->packets = 0; tmr_init(&jb->tmr); - DEBUG_INFO("alloc: delay=%u-%u frames/packets\n", min, max); + DEBUG_INFO("alloc: delay=%u-%u [frames]\n", min, max); jb->pt = -1; err = mutex_alloc(&jb->lock); @@ -255,16 +290,7 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) mem_destructor(jb, jbuf_destructor); /* Allocate all packets now */ - for (i=0; imax; i++) { - struct packet *f = mem_zalloc(sizeof(*f), NULL); - if (!f) { - err = ENOMEM; - break; - } - - list_append(&jb->pooll, &f->le, f); - DEBUG_INFO("alloc: adding to pool list %u\n", i); - } + err = jbuf_resize(jb, max + 1); out: if (err) @@ -276,6 +302,33 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) } +/** + * Resize the packet pool + * + * @param jb The jitter buffer + * @param packets Size of the packet pool in [packets]. Default: jb->max + * + * @return 0 if success, otherwise errorcode + */ +int jbuf_resize(struct jbuf *jb, uint32_t packets) +{ + if (packets <= jb->packets) + return EINVAL; + + for (uint32_t i=jb->packets; ipooll, &p->le, p); + DEBUG_INFO("alloc: adding to pool list %u\n", i); + } + + jb->packets = packets; + return 0; +} + + /** * Set jitter buffer type. * @@ -295,67 +348,90 @@ int jbuf_set_type(struct jbuf *jb, enum jbuf_type jbtype) } -static void wish_down(void *arg) +/** + * Moves end of complete sequence and counts complete frames. If `end` is NULL, + * a search starts from the head which points to the oldest packet + * + * @param jb Jitter buffer + * @param cur Current inserted list element + */ +static void jbuf_move_end(struct jbuf *jb, struct le *cur) { - struct jbuf *jb = arg; + struct le *end; + + if (!jb->end) { + jb->ncf = 0; + cur = jb->packetl.head; + if (!cur) + return; + + struct packet *pm = cur->data; + if (!jb->seq_get || jb->seq_get + 1 == pm->hdr.seq) { + jb->end = cur; + cur = cur->next; + } - if (jb->wish > jb->min) { - DEBUG_INFO("wish size changed %u --> %u\n", jb->wish, - jb->wish - 1); - --jb->wish; + if (!cur) + return; } -} - -static void calc_rdiff(struct jbuf *jb, uint16_t seq) -{ - int32_t rdiff; - int32_t adiff; - int32_t s; /**< EMA coefficient */ - float ratio = 1.0; /**< Frame packet ratio */ - uint32_t wish; - uint32_t max = jb->max; - bool down = false; - - if (jb->jbtype != JBUF_ADAPTIVE) + end = cur->prev; + if (!end) return; - if (!jb->seq_get) + /* update only if endsing packet was inserted right now */ + if (jb->end != end) return; - if (jb->nf) { - ratio = (float)jb->n / (float)jb->nf; - max = (uint32_t)(max / ratio); + for (; end->next; end = end->next) { + struct packet *pm = end->data; + struct packet *pn = end->next->data; + + if (pm->hdr.seq + 1 != pn->hdr.seq) + break; + + if (pm->hdr.ts != pn->hdr.ts) + ++jb->ncf; } - rdiff = (int16_t)(jb->seq_put + 1 - seq); - adiff = abs(rdiff * JBUF_RDIFF_EMA_COEFF); - s = adiff > jb->rdiff ? JBUF_RDIFF_UP_SPEED : - jb->wish > 2 ? 1 : - jb->wish > 1 ? 2 : 3; - jb->rdiff += (adiff - jb->rdiff) * s / JBUF_RDIFF_EMA_COEFF; + jb->end = end; +} - wish = (uint32_t)(jb->rdiff / (float)JBUF_RDIFF_EMA_COEFF / ratio); - if (wish < jb->min) - wish = jb->min; - if (max && wish >= max) - wish = max - 1; +static bool jbuf_frame_ready(struct jbuf *jb) +{ + if (!jb->packetl.head) + return false; - if (wish > jb->wish) { - DEBUG_INFO("wish size changed %u --> %u\n", jb->wish, wish); - jb->wish = wish; - } - else if (wish < jb->wish) { - uint32_t dt = wish + 1 == jb->wish ? 6000 : 1000; - if (!tmr_isrunning(&jb->tmr) || tmr_get_expire(&jb->tmr) > dt) - tmr_start(&jb->tmr, dt, wish_down, jb); + if (jb->nf < jb->min) + return false; - down = true; - } + if (jb->nf > jb->max) + return true; + + if (!jb->end) + return false; + + return jb->ncf || !jb->min; +} + + +static void reset_wait(void *arg) +{ + struct jbuf *jb = arg; + + jb->wait = false; + jb->again = true; +} + + +static void eagain_later(struct jbuf *jb) +{ + jb->wait = true; + if (tmr_isrunning(&jb->tmr)) + return; - if (!down && tmr_isrunning(&jb->tmr)) - tmr_cancel(&jb->tmr); + tmr_start(&jb->tmr, JBUF_WAIT_TIMEOUT, reset_wait, jb); } @@ -370,12 +446,10 @@ static void calc_rdiff(struct jbuf *jb, uint16_t seq) */ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) { - struct packet *f; - struct packet *fc; + struct packet *p; struct le *le, *tail; uint16_t seq; uint64_t tr, dt; - bool equal; int err = 0; if (!jb || !hdr) @@ -404,10 +478,6 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) jb->ssrc = hdr->ssrc; if (jb->running) { - - if (jb->jbtype == JBUF_ADAPTIVE) - calc_rdiff(jb, seq); - /* Packet arrived too late to be put into buffer */ if (jb->seq_get && seq_less(seq, jb->seq_get + 1)) { STAT_INC(n_late); @@ -423,7 +493,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) STAT_INC(n_put); - packet_alloc(jb, &f); + packet_alloc(jb, &p); tail = jb->packetl.tail; @@ -431,7 +501,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) Frame is later than tail -> append to tail */ if (!tail || seq_less(((struct packet *)tail->data)->hdr.seq, seq)) { - list_append(&jb->packetl, &f->le, f); + list_append(&jb->packetl, &p->le, p); goto success; } @@ -443,7 +513,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) DEBUG_PRINTF("put: out-of-sequence" " - inserting after seq=%u (seq=%u)\n", seq_le, seq); - list_insert_after(&jb->packetl, le, &f->le, f); + list_insert_after(&jb->packetl, le, &p->le, p); break; } else if (seq == seq_le) { /* less likely */ @@ -451,8 +521,8 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) DEBUG_INFO("duplicate: seq=%u\n", seq); STAT_INC(n_dups); plot_jbuf_event(jb, 'D'); - list_insert_after(&jb->packetl, le, &f->le, f); - packet_deref(jb, f); + list_insert_after(&jb->packetl, le, &p->le, p); + packet_deref(jb, p); err = EALREADY; goto out; } @@ -464,7 +534,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) if (!le) { DEBUG_PRINTF("put: out-of-sequence" " - put in head (seq=%u)\n", seq); - list_prepend(&jb->packetl, &f->le, f); + list_prepend(&jb->packetl, &p->le, p); } STAT_INC(n_oos); @@ -476,22 +546,15 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) jb->seq_put = seq; /* Success */ - f->hdr = *hdr; - f->mem = mem_ref(mem); - - equal = false; - if (f->le.prev) { - fc = f->le.prev->data; - equal = (fc->hdr.ts == f->hdr.ts); - } - - if (!equal && f->le.next) { - fc = f->le.next->data; - equal = (fc->hdr.ts == f->hdr.ts); + p->hdr = *hdr; + p->mem = mem_ref(mem); + if (tail && ((struct packet *)tail->data)->hdr.ts != hdr->ts) { + ++jb->nf; + jb->wait = false; } - if (!equal) - ++jb->nf; + /* check frame completeness */ + jbuf_move_end(jb, &p->le); out: #ifdef RE_JBUF_TRACE @@ -509,12 +572,12 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) * @param hdr Returned RTP Header * @param mem Pointer to memory object storage - referenced on success * - * @return 0 if success, EAGAIN if it should be called again in order to avoid - * a jitter buffer overflow, otherwise errorcode + * @return 0 if success, EAGAIN if it should be called again, otherwise + * errorcode */ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) { - struct packet *f; + struct packet *p; int err = 0; if (!jb || !hdr || !mem) @@ -523,61 +586,50 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) mtx_lock(jb->lock); STAT_INC(n_get); - if (jb->nf <= jb->wish || !jb->packetl.head) { - DEBUG_INFO("not enough buffer packets - wait.. " - "(n=%u wish=%u)\n", jb->n, jb->wish); - STAT_INC(n_underflow); + if (!jbuf_frame_ready(jb) || jb->wait) { + DEBUG_INFO("no frame ready - wait.. " + "(nf=%u min=%u)\n", jb->nf, jb->min); + STAT_INC(n_waiting); plot_jbuf_event(jb, 'U'); err = ENOENT; goto out; } - /* When we get one packet P[i], check that the next packet P[i+1] - is present and have a seq no. of seq[i] + 1. - If not, we should consider that packet lost. */ - - f = jb->packetl.head->data; + p = jb->packetl.head->data; #if JBUF_STAT /* Check sequence of previously played packet */ if (jb->seq_get) { - const int16_t seq_diff = f->hdr.seq - jb->seq_get; - if (seq_less(f->hdr.seq, jb->seq_get)) { - DEBUG_WARNING("get: seq=%u too late\n", f->hdr.seq); + const int16_t seq_diff = p->hdr.seq - jb->seq_get; + if (seq_less(p->hdr.seq, jb->seq_get)) { + DEBUG_WARNING("get: seq=%u too late\n", p->hdr.seq); } else if (seq_diff > 1) { STAT_ADD(n_lost, 1); plot_jbuf_event(jb, 'T'); DEBUG_INFO("get: n_lost: diff=%d,seq=%u,seq_get=%u\n", - seq_diff, f->hdr.seq, jb->seq_get); + seq_diff, p->hdr.seq, jb->seq_get); } } #endif /* Update sequence number for 'get' */ - jb->seq_get = f->hdr.seq; - - *hdr = f->hdr; - *mem = mem_ref(f->mem); - - /* decrease not equal frames */ - if (f->le.next) { - struct packet *next_f = f->le.next->data; + jb->seq_get = p->hdr.seq; - if (f->hdr.ts != next_f->hdr.ts) - --jb->nf; - } - else { - --jb->nf; - } + *hdr = p->hdr; + *mem = mem_ref(p->mem); - packet_deref(jb, f); + jbuf_update_nf(jb); + packet_deref(jb, p); - if (jb->nf > jb->wish) { - DEBUG_INFO("reducing jitter buffer " - "(nf=%u min=%u wish=%u max=%u)\n", - jb->nf, jb->min, jb->wish, jb->max); - err = EAGAIN; + if (jbuf_frame_ready(jb)) { + p = jb->packetl.head->data; + if (p->hdr.ts == hdr->ts || jb->again) { + err = EAGAIN; + jb->again = false; + } + else + eagain_later(jb); } out: @@ -585,6 +637,7 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) return err; } + /** * Get one packet from the jitter buffer, even if it becomes depleted * @@ -596,7 +649,7 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) */ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) { - struct packet *f; + struct packet *p; int err = 0; if (!jb || !hdr || !mem) @@ -609,30 +662,16 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) goto out; } - /* When we get one packet P[i], check that the next packet P[i+1] - is present and have a seq no. of seq[i] + 1. - If not, we should consider that packet lost. */ - - f = jb->packetl.head->data; + p = jb->packetl.head->data; /* Update sequence number for 'get' */ - jb->seq_get = f->hdr.seq; + jb->seq_get = p->hdr.seq; - *hdr = f->hdr; - *mem = mem_ref(f->mem); - - /* decrease not equal frames */ - if (f->le.next) { - struct packet *next_f = f->le.next->data; - - if (f->hdr.ts != next_f->hdr.ts) - --jb->nf; - } - else { - --jb->nf; - } + *hdr = p->hdr; + *mem = mem_ref(p->mem); - packet_deref(jb, f); + jbuf_update_nf(jb); + packet_deref(jb, p); out: mtx_unlock(jb->lock); @@ -640,7 +679,7 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) } /** - * Flush all frames in the jitter buffer + * Flush all packets in the jitter buffer * * @param jb Jitter buffer */ @@ -656,12 +695,12 @@ void jbuf_flush(struct jbuf *jb) mtx_lock(jb->lock); if (jb->packetl.head) { - DEBUG_INFO("flush: %u frames\n", jb->n); + DEBUG_INFO("flush: %u packets\n", jb->n); } - /* put all buffered frames back in free list */ + /* put all buffered packets back in free list */ for (le = jb->packetl.head; le; le = jb->packetl.head) { - DEBUG_INFO(" flush frame: seq=%u\n", + DEBUG_INFO(" flush packet: seq=%u\n", ((struct packet *)(le->data))->hdr.seq); packet_deref(jb, le->data); @@ -669,6 +708,8 @@ void jbuf_flush(struct jbuf *jb) jb->n = 0; jb->nf = 0; + jb->ncf = 0; + jb->end = NULL; jb->running = false; jb->seq_get = 0; @@ -711,14 +752,34 @@ uint32_t jbuf_packets(const struct jbuf *jb) */ uint32_t jbuf_frames(const struct jbuf *jb) { - if (!jb) - return 0; + if (!jb) + return 0; - mtx_lock(jb->lock); - uint32_t n = jb->nf; - mtx_unlock(jb->lock); + mtx_lock(jb->lock); + uint32_t n = jb->nf; + mtx_unlock(jb->lock); - return n; + return n; +} + + +/** + * Get number of complete frames + * + * @param jb Jitter buffer + * + * @return number of frames + */ +uint32_t jbuf_complete_frames(const struct jbuf *jb) +{ + if (!jb) + return 0; + + mtx_lock(jb->lock); + uint32_t n = jb->ncf; + mtx_unlock(jb->lock); + + return n; } @@ -767,8 +828,8 @@ int jbuf_debug(struct re_printf *pf, const struct jbuf *jb) mtx_lock(jb->lock); err |= mbuf_printf(mb, " running=%d", jb->running); - err |= mbuf_printf(mb, " min=%u cur=%u/%u max=%u [frames/packets]\n", - jb->min, jb->nf, jb->n, jb->max); + err |= mbuf_printf(mb, " min=%u cur=%u complete=%u max=%u [frames]\n", + jb->min, jb->nf, jb->ncf, jb->max); err |= mbuf_printf(mb, " seq_put=%u\n", jb->seq_put); #if JBUF_STAT @@ -778,7 +839,7 @@ int jbuf_debug(struct re_printf *pf, const struct jbuf *jb) err |= mbuf_printf(mb, " dup=%u", jb->stat.n_dups); err |= mbuf_printf(mb, " late=%u", jb->stat.n_late); err |= mbuf_printf(mb, " or=%u", jb->stat.n_overflow); - err |= mbuf_printf(mb, " ur=%u", jb->stat.n_underflow); + err |= mbuf_printf(mb, " wait=%u", jb->stat.n_waiting); err |= mbuf_printf(mb, " flush=%u", jb->stat.n_flush); err |= mbuf_printf(mb, " put/get_ratio=%u%%", jb->stat.n_get ? 100*jb->stat.n_put/jb->stat.n_get : 0); diff --git a/test/jbuf.c b/test/jbuf.c index e7d3dad12..7ca2eb0c0 100644 --- a/test/jbuf.c +++ b/test/jbuf.c @@ -37,11 +37,7 @@ int test_jbuf(void) /* Empty list */ DEBUG_INFO("test frame: Empty list\n"); - if (ENOENT != jbuf_get(jb, &hdr2, &mem)) { - err = EINVAL; - goto out; - } - + TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); /* One frame */ DEBUG_INFO("test frame: One frame\n"); @@ -50,12 +46,12 @@ int test_jbuf(void) hdr.ts = 1; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - if ((EALREADY != jbuf_put(jb, &hdr, frv[0]))) {err = EINVAL; goto out;} + TEST_EQUALS(EALREADY, jbuf_put(jb, &hdr, frv[0])); err = jbuf_get(jb, &hdr2, &mem); TEST_ERR(err); - if (160 != hdr2.seq) {err = EINVAL; goto out;} - if (mem != frv[0]) {err = EINVAL; goto out;} + TEST_EQUALS(160, hdr2.seq); + TEST_EQUALS(frv[0], mem); mem = mem_deref(mem); if (ENOENT != jbuf_get(jb, &hdr2, &mem)) {err = EINVAL; goto out;} @@ -72,8 +68,7 @@ int test_jbuf(void) err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_EQUALS(EAGAIN, jbuf_get(jb, &hdr2, &mem)); if (320 != hdr2.seq) {err = EINVAL; goto out;} if (mem != frv[0]) {err = EINVAL; goto out;} mem = mem_deref(mem); @@ -101,14 +96,12 @@ int test_jbuf(void) err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_EQUALS(EAGAIN, jbuf_get(jb, &hdr2, &mem)); if (640 != hdr2.seq) {err = EINVAL; goto out;} if (mem != frv[0]) {err = EINVAL; goto out;} mem = mem_deref(mem); - err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_EQUALS(EAGAIN, jbuf_get(jb, &hdr2, &mem)); if (800 != hdr2.seq) {err = EINVAL; goto out;} if (mem != frv[1]) {err = EINVAL; goto out;} mem = mem_deref(mem); @@ -132,7 +125,7 @@ int test_jbuf(void) } -int test_jbuf_adaptive(void) +int test_jbuf_frames(void) { struct rtp_header hdr, hdr2; struct jbuf *jb = NULL; @@ -148,8 +141,6 @@ int test_jbuf_adaptive(void) err = jbuf_alloc(&jb, 1, 10); TEST_ERR(err); - err = jbuf_set_type(jb, JBUF_ADAPTIVE); - TEST_ERR(err); for (i=0; i min reached, read first frame\n"); + /* detected complete frame */ + DEBUG_INFO("got complete frame, read first frame\n"); err = jbuf_get(jb, &hdr2, &mem); TEST_ERR(err); TEST_EQUALS(160, hdr2.seq); TEST_EQUALS(mem, frv[0]); mem = mem_deref(mem); - DEBUG_INFO("n <= min, leads to ENOENT\n"); + DEBUG_INFO("no other complete frame, leads to ENOENT\n"); TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); /* Four frames */ DEBUG_INFO("test frame: Four frames\n"); jbuf_flush(jb); - hdr.seq = 1; - hdr.ts = 100; + hdr.seq = hdr.ts = 1; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - hdr.seq = 2; - hdr.ts = 200; + hdr.seq = hdr.ts = 2; err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - hdr.seq = 3; - hdr.ts = 300; + hdr.seq = hdr.ts = 3; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - hdr.seq = 4; - hdr.ts = 400; + hdr.seq = hdr.ts = 4; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_ERR(err); TEST_EQUALS(1, hdr2.seq); TEST_EQUALS(mem, frv[0]); mem = mem_deref(mem); + /* slowly reduce buffer */ + TEST_EQUALS(ENOENT, jbuf_get(jb, &hdr2, &mem)); + + hdr.seq = hdr.ts = 5; + err = jbuf_put(jb, &hdr, frv[3]); + TEST_ERR(err); err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(EAGAIN, err); + TEST_ERR(err); TEST_EQUALS(2, hdr2.seq); TEST_EQUALS(mem, frv[1]); mem = mem_deref(mem); + hdr.seq = hdr.ts = 6; + err = jbuf_put(jb, &hdr, frv[3]); + TEST_ERR(err); err = jbuf_get(jb, &hdr2, &mem); - TEST_EQUALS(0, err); + TEST_ERR(err); TEST_EQUALS(3, hdr2.seq); TEST_EQUALS(mem, frv[2]); mem = mem_deref(mem); @@ -246,7 +242,7 @@ int test_jbuf_adaptive(void) } -int test_jbuf_adaptive_video(void) +int test_jbuf_video_frames(void) { struct rtp_header hdr, hdr2; struct jbuf *jb = NULL; @@ -260,9 +256,10 @@ int test_jbuf_adaptive_video(void) memset(&hdr2, 0, sizeof(hdr2)); hdr.ssrc = 1; - err = jbuf_alloc(&jb, 1, 10); + err = jbuf_alloc(&jb, 1, 3); TEST_ERR(err); - err = jbuf_set_type(jb, JBUF_ADAPTIVE); + + err = jbuf_resize(jb, 10); TEST_ERR(err); for (i=0; i