diff --git a/include/re_jbuf.h b/include/re_jbuf.h index f9698b485..71f8703ca 100644 --- a/include/re_jbuf.h +++ b/include/re_jbuf.h @@ -23,12 +23,12 @@ struct jbuf_stat { /** Jitter buffer type */ enum jbuf_type { JBUF_OFF, - JBUF_FIXED, - JBUF_ADAPTIVE + JBUF_FIXED }; int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max); +int jbuf_resize(struct jbuf *jb, uint32_t packets); int jbuf_set_type(struct jbuf *jb, enum jbuf_type jbtype); int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem); int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem); @@ -36,5 +36,4 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem); void jbuf_flush(struct jbuf *jb); int jbuf_stats(const struct jbuf *jb, struct jbuf_stat *jstat); int jbuf_debug(struct re_printf *pf, const struct jbuf *jb); -uint32_t jbuf_frames(const struct jbuf *jb); uint32_t jbuf_packets(const struct jbuf *jb); diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 89a69487a..f9f8af6d6 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -1,8 +1,9 @@ /** * @file jbuf.c Jitter Buffer implementation * - * This is an adaptive jitter buffer implementation. See doc/jbuf for further - * details! + * This is an adaptive jitter buffer implementation. Features: + * - re-order out-of-order packets + * - ensure frame completeness * * Copyright (C) 2010 Creytiv.com */ @@ -11,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -45,7 +47,7 @@ enum { }; -/** Defines a packet frame */ +/** Defines a packet */ struct packet { struct le le; /**< Linked list element */ struct rtp_header hdr; /**< RTP Header */ @@ -64,11 +66,13 @@ struct jbuf { struct list packetl; /**< List of buffered packets */ uint32_t n; /**< [# packets] Current # of packets in buffer */ uint32_t nf; /**< [# frames] Current # of frames in buffer */ + uint32_t ncf; /**< [# frames] # of complete frames in buffer */ uint32_t min; /**< [# frames] Minimum # of frames to buffer */ uint32_t max; /**< [# frames] Maximum # of frames to buffer */ - uint32_t wish; /**< [# frames] Wish size for adaptive mode */ + uint32_t packets; /**< [# packets] Size of the packet pool */ uint16_t seq_put; /**< Sequence number for last jbuf_put() */ - uint16_t seq_get; /**< Sequence number of last played frame */ + uint16_t seq_get; /**< Sequence number of last played packet */ + struct le *end; /**< End of complete sequence */ uint32_t ssrc; /**< Previous ssrc */ uint64_t tr; /**< Time of previous jbuf_put() */ int pt; /**< Payload type */ @@ -84,6 +88,38 @@ struct jbuf { }; +/** + * Update frame counter and sequence end list element pointer before oldest + * packet is removed + * + * @param jb Jitter buffer + */ +static void jbuf_update_nf(struct jbuf *jb) +{ + struct le *le = jb->packetl.head; + + if (!le) + return; + + struct le *n = le->next; + + if (n) { + struct packet *p = le->data; + struct packet *pn = n->data; + + if (p->hdr.ts != pn->hdr.ts) { + if (jb->ncf) + --jb->ncf; + + --jb->nf; + } + } + + if (jb->end == le) + jb->end = NULL; +} + + /** Is x less than y? */ static inline bool seq_less(uint16_t x, uint16_t y) { @@ -91,8 +127,32 @@ static inline bool seq_less(uint16_t x, uint16_t y) } +static struct le *packet_drop(struct jbuf *jb) +{ + struct le *le; + struct packet *f0; + + /* Steal an old packet */ + le = jb->packetl.head; + f0 = le->data; + jbuf_update_nf(jb); + +#if JBUF_STAT + DEBUG_WARNING("drop 1 old packet seq=%u (total dropped %u)\n", + f0->hdr.seq, jb->stat.n_overflow); +#else + DEBUG_WARNING("drop 1 old packet seq=%u\n", f0->hdr.seq); +#endif + + f0->mem = mem_deref(f0->mem); + list_unlink(le); + + return le; +} + + /** - * Get a frame from the pool + * Get a packet from the pool */ static void packet_alloc(struct jbuf *jb, struct packet **f) { @@ -104,22 +164,8 @@ static void packet_alloc(struct jbuf *jb, struct packet **f) ++jb->n; } else { - struct packet *f0; - - /* Steal an old frame */ - le = jb->packetl.head; - f0 = le->data; - -#if JBUF_STAT STAT_INC(n_overflow); - DEBUG_WARNING("drop 1 old frame seq=%u (total dropped %u)\n", - f0->hdr.seq, jb->stat.n_overflow); -#else - DEBUG_WARNING("drop 1 old frame seq=%u\n", f0->hdr.seq); -#endif - - f0->mem = mem_deref(f0->mem); - list_unlink(le); + le = packet_drop(jb); } *f = le->data; @@ -156,14 +202,13 @@ static void jbuf_destructor(void *data) * * @param jbp Pointer to returned jitter buffer * @param min Minimum delay in [frames] - * @param max Maximum delay in [packets] + * @param max Maximum delay in [frames] * * @return 0 if success, otherwise errorcode */ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) { struct jbuf *jb; - uint32_t i; int err = 0; if (!jbp || ( min > max)) @@ -185,10 +230,10 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) jb->jbtype = JBUF_FIXED; jb->min = min; jb->max = max; - jb->wish = min; + jb->packets = 0; tmr_init(&jb->tmr); - DEBUG_INFO("alloc: delay=%u-%u frames/packets\n", min, max); + DEBUG_INFO("alloc: delay=%u-%u [frames]\n", min, max); jb->pt = -1; err = mutex_alloc(&jb->lock); @@ -198,16 +243,7 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) mem_destructor(jb, jbuf_destructor); /* Allocate all packets now */ - for (i=0; imax; i++) { - struct packet *f = mem_zalloc(sizeof(*f), NULL); - if (!f) { - err = ENOMEM; - break; - } - - list_append(&jb->pooll, &f->le, f); - DEBUG_INFO("alloc: adding to pool list %u\n", i); - } + err = jbuf_resize(jb, max); out: if (err) @@ -219,6 +255,33 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) } +/** + * Resize the packet pool + * + * @param jb The jitter buffer + * @param packets Size of the packet pool in [packets]. Default: jb->max + * + * @return 0 if success, otherwise errorcode + */ +int jbuf_resize(struct jbuf *jb, uint32_t packets) +{ + if (packets <= jb->packets) + return EINVAL; + + for (uint32_t i=jb->packets; ipooll, &f->le, f); + DEBUG_INFO("alloc: adding to pool list %u\n", i); + } + + jb->packets = packets; + return 0; +} + + /** * Set jitter buffer type. * @@ -238,67 +301,61 @@ int jbuf_set_type(struct jbuf *jb, enum jbuf_type jbtype) } -static void wish_down(void *arg) +/** + * Moves end of complete sequence and counts complete frames. If `end` is NULL, + * a search starts from the head which points to the oldest packet + * + * @param jb Jitter buffer + * @param cur Current inserted list element + */ +static void jbuf_move_end(struct jbuf *jb, struct le *cur) { - struct jbuf *jb = arg; - - if (jb->wish > jb->min) { - DEBUG_INFO("wish size changed %u --> %u\n", jb->wish, - jb->wish - 1); - --jb->wish; + struct le *end; + + if (!jb->end) { + jb->ncf = 0; + cur = jb->packetl.head; + if (!cur) + return; + + jb->end = cur; + cur = cur->next; + if (!cur) + return; } -} - -static void calc_rdiff(struct jbuf *jb, uint16_t seq) -{ - int32_t rdiff; - int32_t adiff; - int32_t s; /**< EMA coefficient */ - float ratio = 1.0; /**< Frame packet ratio */ - uint32_t wish; - uint32_t max = jb->max; - bool down = false; - - if (jb->jbtype != JBUF_ADAPTIVE) + end = cur->prev; + if (!end) return; - if (!jb->seq_get) + /* update only if endsing packet was inserted right now */ + if (jb->end != end) return; - if (jb->nf) { - ratio = (float)jb->n / (float)jb->nf; - max = (uint32_t)(max / ratio); - } + for (; end->next; end = end->next) { + struct packet *pm = end->data; + struct packet *pn = end->next->data; - rdiff = (int16_t)(jb->seq_put + 1 - seq); - adiff = abs(rdiff * JBUF_RDIFF_EMA_COEFF); - s = adiff > jb->rdiff ? JBUF_RDIFF_UP_SPEED : - jb->wish > 2 ? 1 : - jb->wish > 1 ? 2 : 3; - jb->rdiff += (adiff - jb->rdiff) * s / JBUF_RDIFF_EMA_COEFF; + if (pm->hdr.seq + 1 != pn->hdr.seq) + break; - wish = (uint32_t)(jb->rdiff / (float)JBUF_RDIFF_EMA_COEFF / ratio); - if (wish < jb->min) - wish = jb->min; + if (pm->hdr.ts != pn->hdr.ts) + ++jb->ncf; + } - if (max && wish >= max) - wish = max - 1; + jb->end = end; +} - if (wish > jb->wish) { - DEBUG_INFO("wish size changed %u --> %u\n", jb->wish, wish); - jb->wish = wish; - } - else if (wish < jb->wish) { - uint32_t dt = wish + 1 == jb->wish ? 6000 : 1000; - if (!tmr_isrunning(&jb->tmr) || tmr_get_expire(&jb->tmr) > dt) - tmr_start(&jb->tmr, dt, wish_down, jb); - down = true; - } +static bool jbuf_enough_packets(struct jbuf *jb) +{ + if (!jb->packetl.head) + return false; - if (!down && tmr_isrunning(&jb->tmr)) - tmr_cancel(&jb->tmr); + if (jb->nf < jb->min) + return false; + + return jb->ncf || jb->nf > jb->max; } @@ -313,12 +370,10 @@ static void calc_rdiff(struct jbuf *jb, uint16_t seq) */ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) { - struct packet *f; - struct packet *fc; + struct packet *p; struct le *le, *tail; uint16_t seq; uint64_t tr, dt; - bool equal; int err = 0; if (!jb || !hdr) @@ -347,10 +402,6 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) jb->ssrc = hdr->ssrc; if (jb->running) { - - if (jb->jbtype == JBUF_ADAPTIVE) - calc_rdiff(jb, seq); - /* Packet arrived too late to be put into buffer */ if (jb->seq_get && seq_less(seq, jb->seq_get + 1)) { STAT_INC(n_late); @@ -365,7 +416,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) STAT_INC(n_put); - packet_alloc(jb, &f); + packet_alloc(jb, &p); tail = jb->packetl.tail; @@ -373,7 +424,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) Frame is later than tail -> append to tail */ if (!tail || seq_less(((struct packet *)tail->data)->hdr.seq, seq)) { - list_append(&jb->packetl, &f->le, f); + list_append(&jb->packetl, &p->le, p); goto success; } @@ -385,15 +436,15 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) DEBUG_PRINTF("put: out-of-sequence" " - inserting after seq=%u (seq=%u)\n", seq_le, seq); - list_insert_after(&jb->packetl, le, &f->le, f); + list_insert_after(&jb->packetl, le, &p->le, p); break; } else if (seq == seq_le) { /* less likely */ /* Detect duplicates */ DEBUG_INFO("duplicate: seq=%u\n", seq); STAT_INC(n_dups); - list_insert_after(&jb->packetl, le, &f->le, f); - packet_deref(jb, f); + list_insert_after(&jb->packetl, le, &p->le, p); + packet_deref(jb, p); err = EALREADY; goto out; } @@ -405,7 +456,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) if (!le) { DEBUG_PRINTF("put: out-of-sequence" " - put in head (seq=%u)\n", seq); - list_prepend(&jb->packetl, &f->le, f); + list_prepend(&jb->packetl, &p->le, p); } STAT_INC(n_oos); @@ -416,22 +467,13 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) jb->seq_put = seq; /* Success */ - f->hdr = *hdr; - f->mem = mem_ref(mem); - - equal = false; - if (f->le.prev) { - fc = f->le.prev->data; - equal = (fc->hdr.ts == f->hdr.ts); - } + p->hdr = *hdr; + p->mem = mem_ref(mem); + if (tail && ((struct packet *)tail->data)->hdr.ts != hdr->ts) + jb->nf++; - if (!equal && f->le.next) { - fc = f->le.next->data; - equal = (fc->hdr.ts == f->hdr.ts); - } - - if (!equal) - ++jb->nf; + /* Missing frame detection */ + jbuf_move_end(jb, &p->le); out: mtx_unlock(jb->lock); @@ -451,7 +493,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) */ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) { - struct packet *f; + struct packet *p; int err = 0; if (!jb || !hdr || !mem) @@ -460,66 +502,50 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) mtx_lock(jb->lock); STAT_INC(n_get); - if (jb->nf <= jb->wish || !jb->packetl.head) { + if (!jbuf_enough_packets(jb)) { DEBUG_INFO("not enough buffer packets - wait.. " - "(n=%u wish=%u)\n", jb->n, jb->wish); + "(n=%u min=%u)\n", jb->n, jb->min); STAT_INC(n_underflow); err = ENOENT; goto out; } - /* When we get one packet P[i], check that the next packet P[i+1] - is present and have a seq no. of seq[i] + 1. - If not, we should consider that packet lost. */ - - f = jb->packetl.head->data; + p = jb->packetl.head->data; #if JBUF_STAT /* Check sequence of previously played packet */ if (jb->seq_get) { - const int16_t seq_diff = f->hdr.seq - jb->seq_get; - if (seq_less(f->hdr.seq, jb->seq_get)) { - DEBUG_WARNING("get: seq=%u too late\n", f->hdr.seq); + const int16_t seq_diff = p->hdr.seq - jb->seq_get; + if (seq_less(p->hdr.seq, jb->seq_get)) { + DEBUG_WARNING("get: seq=%u too late\n", p->hdr.seq); } else if (seq_diff > 1) { + /* should not happen */ STAT_ADD(n_lost, 1); DEBUG_INFO("get: n_lost: diff=%d,seq=%u,seq_get=%u\n", - seq_diff, f->hdr.seq, jb->seq_get); + seq_diff, p->hdr.seq, jb->seq_get); } } #endif /* Update sequence number for 'get' */ - jb->seq_get = f->hdr.seq; - - *hdr = f->hdr; - *mem = mem_ref(f->mem); - - /* decrease not equal frames */ - if (f->le.next) { - struct packet *next_f = f->le.next->data; + jb->seq_get = p->hdr.seq; - if (f->hdr.ts != next_f->hdr.ts) - --jb->nf; - } - else { - --jb->nf; - } + *hdr = p->hdr; + *mem = mem_ref(p->mem); - packet_deref(jb, f); + jbuf_update_nf(jb); + packet_deref(jb, p); - if (jb->nf > jb->wish) { - DEBUG_INFO("reducing jitter buffer " - "(nf=%u min=%u wish=%u max=%u)\n", - jb->nf, jb->min, jb->wish, jb->max); + if (jbuf_enough_packets(jb)) err = EAGAIN; - } out: mtx_unlock(jb->lock); return err; } + /** * Get one packet from the jitter buffer, even if it becomes depleted * @@ -556,17 +582,7 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) *hdr = f->hdr; *mem = mem_ref(f->mem); - /* decrease not equal frames */ - if (f->le.next) { - struct packet *next_f = f->le.next->data; - - if (f->hdr.ts != next_f->hdr.ts) - --jb->nf; - } - else { - --jb->nf; - } - + jbuf_update_nf(jb); packet_deref(jb, f); out: @@ -575,7 +591,7 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) } /** - * Flush all frames in the jitter buffer + * Flush all packets in the jitter buffer * * @param jb Jitter buffer */ @@ -591,12 +607,12 @@ void jbuf_flush(struct jbuf *jb) mtx_lock(jb->lock); if (jb->packetl.head) { - DEBUG_INFO("flush: %u frames\n", jb->n); + DEBUG_INFO("flush: %u packets\n", jb->n); } - /* put all buffered frames back in free list */ + /* put all buffered packets back in free list */ for (le = jb->packetl.head; le; le = jb->packetl.head) { - DEBUG_INFO(" flush frame: seq=%u\n", + DEBUG_INFO(" flush packet: seq=%u\n", ((struct packet *)(le->data))->hdr.seq); packet_deref(jb, le->data); @@ -604,6 +620,8 @@ void jbuf_flush(struct jbuf *jb) jb->n = 0; jb->nf = 0; + jb->ncf = 0; + jb->end = NULL; jb->running = false; jb->seq_get = 0; @@ -636,26 +654,6 @@ uint32_t jbuf_packets(const struct jbuf *jb) } -/** - * Get number of current frames - * - * @param jb Jitter buffer - * - * @return number of frames - */ -uint32_t jbuf_frames(const struct jbuf *jb) -{ - if (!jb) - return 0; - - mtx_lock(jb->lock); - uint32_t n = jb->nf; - mtx_unlock(jb->lock); - - return n; -} - - /** * Get jitter buffer statistics * @@ -701,8 +699,8 @@ int jbuf_debug(struct re_printf *pf, const struct jbuf *jb) mtx_lock(jb->lock); err |= mbuf_printf(mb, " running=%d", jb->running); - err |= mbuf_printf(mb, " min=%u cur=%u/%u max=%u [frames/packets]\n", - jb->min, jb->nf, jb->n, jb->max); + err |= mbuf_printf(mb, " min=%u cur=%u max=%u [packets]\n", + jb->min, jb->n, jb->max); err |= mbuf_printf(mb, " seq_put=%u\n", jb->seq_put); #if JBUF_STAT