diff --git a/include/re_jbuf.h b/include/re_jbuf.h index f9698b485..6152cff56 100644 --- a/include/re_jbuf.h +++ b/include/re_jbuf.h @@ -36,5 +36,4 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem); void jbuf_flush(struct jbuf *jb); int jbuf_stats(const struct jbuf *jb, struct jbuf_stat *jstat); int jbuf_debug(struct re_printf *pf, const struct jbuf *jb); -uint32_t jbuf_frames(const struct jbuf *jb); uint32_t jbuf_packets(const struct jbuf *jb); diff --git a/src/jbuf/jbuf.c b/src/jbuf/jbuf.c index 89a69487a..d7c824fd8 100644 --- a/src/jbuf/jbuf.c +++ b/src/jbuf/jbuf.c @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -45,7 +46,7 @@ enum { }; -/** Defines a packet frame */ +/** Defines a packet */ struct packet { struct le le; /**< Linked list element */ struct rtp_header hdr; /**< RTP Header */ @@ -63,12 +64,11 @@ struct jbuf { struct list pooll; /**< List of free packets in pool */ struct list packetl; /**< List of buffered packets */ uint32_t n; /**< [# packets] Current # of packets in buffer */ - uint32_t nf; /**< [# frames] Current # of frames in buffer */ - uint32_t min; /**< [# frames] Minimum # of frames to buffer */ - uint32_t max; /**< [# frames] Maximum # of frames to buffer */ - uint32_t wish; /**< [# frames] Wish size for adaptive mode */ + uint32_t min; /**< [# packets] Minimum # of packets to buffer */ + uint32_t max; /**< [# packets] Maximum # of packets to buffer */ + uint32_t wish; /**< [# packets] Wish size for adaptive mode */ uint16_t seq_put; /**< Sequence number for last jbuf_put() */ - uint16_t seq_get; /**< Sequence number of last played frame */ + uint16_t seq_get; /**< Sequence number of last played */ uint32_t ssrc; /**< Previous ssrc */ uint64_t tr; /**< Time of previous jbuf_put() */ int pt; /**< Payload type */ @@ -81,6 +81,10 @@ struct jbuf { #if JBUF_STAT struct jbuf_stat stat; /**< Jitter buffer Statistics */ #endif +#if DEBUG_LEVEL >= 6 + uint64_t tr00; /**< Arrival of first packet */ + char buf[136]; /**< Buffer for trace */ +#endif }; @@ -91,8 +95,58 @@ static inline bool seq_less(uint16_t x, uint16_t y) } +#if DEBUG_LEVEL >= 6 +static void plot_jbuf(struct jbuf *jb, uint64_t tr) +{ + uint32_t treal; + uint32_t rdiff = (uint32_t)(jb->rdiff / (float)JBUF_RDIFF_EMA_COEFF); + + if (!jb->tr00) + jb->tr00 = tr; + + treal = (uint32_t) (tr - jb->tr00); + re_snprintf(jb->buf, sizeof(jb->buf), + "%s, 0x%p, %u, %u, %u, %u", + __func__, /* row 1 - grep */ + jb, /* row 2 - grep optional */ + treal, /* row 3 - plot x-axis */ + rdiff, /* row 4 - plot */ + jb->wish, /* row 5 - plot */ + jb->n); /* row 6 - plot */ + re_trace_event("jbuf", "plot", 'P', NULL, 0, RE_TRACE_ARG_STRING_COPY, + "line", jb->buf); +} + + +static void plot_jbuf_event(struct jbuf *jb, char ph) +{ + uint32_t treal; + uint64_t tr; + + tr = tmr_jiffies(); + if (!jb->tr00) + jb->tr00 = tr; + + treal = (uint32_t) (tr - jb->tr00); + re_snprintf(jb->buf, sizeof(jb->buf), "%s, 0x%p, %u, %i", + __func__, /* row 1 - grep */ + jb, /* row 2 - grep optional */ + treal, /* row 3 - plot x-axis */ + 1); /* row 4 - plot */ + re_trace_event("jbuf", "plot", ph, NULL, 0, RE_TRACE_ARG_STRING_COPY, + "line", jb->buf); +} +#else +static void plot_jbuf_event(struct jbuf *jb, char ph) +{ + (void)jb; + (void)ph; +} +#endif + + /** - * Get a frame from the pool + * Get a packet from the pool */ static void packet_alloc(struct jbuf *jb, struct packet **f) { @@ -106,18 +160,19 @@ static void packet_alloc(struct jbuf *jb, struct packet **f) else { struct packet *f0; - /* Steal an old frame */ + /* Steal an old packet */ le = jb->packetl.head; f0 = le->data; #if JBUF_STAT STAT_INC(n_overflow); - DEBUG_WARNING("drop 1 old frame seq=%u (total dropped %u)\n", + DEBUG_WARNING("drop 1 old packet seq=%u (total dropped %u)\n", f0->hdr.seq, jb->stat.n_overflow); #else - DEBUG_WARNING("drop 1 old frame seq=%u\n", f0->hdr.seq); + DEBUG_WARNING("drop 1 old packet seq=%u\n", f0->hdr.seq); #endif + plot_jbuf_event(jb, 'O'); f0->mem = mem_deref(f0->mem); list_unlink(le); } @@ -148,6 +203,9 @@ static void jbuf_destructor(void *data) /* Free all packets in the pool list */ list_flush(&jb->pooll); mem_deref(jb->lock); +#if DEBUG_LEVEL >= 6 + (void)re_trace_close(); +#endif } @@ -155,7 +213,7 @@ static void jbuf_destructor(void *data) * Allocate a new jitter buffer * * @param jbp Pointer to returned jitter buffer - * @param min Minimum delay in [frames] + * @param min Minimum delay in [packets] * @param max Maximum delay in [packets] * * @return 0 if success, otherwise errorcode @@ -188,7 +246,7 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) jb->wish = min; tmr_init(&jb->tmr); - DEBUG_INFO("alloc: delay=%u-%u frames/packets\n", min, max); + DEBUG_INFO("alloc: delay=%u-%u [packets]\n", min, max); jb->pt = -1; err = mutex_alloc(&jb->lock); @@ -209,6 +267,10 @@ int jbuf_alloc(struct jbuf **jbp, uint32_t min, uint32_t max) DEBUG_INFO("alloc: adding to pool list %u\n", i); } +#if DEBUG_LEVEL >= 6 + (void)re_trace_init("jbuf.json"); +#endif + out: if (err) mem_deref(jb); @@ -255,7 +317,6 @@ static void calc_rdiff(struct jbuf *jb, uint16_t seq) int32_t rdiff; int32_t adiff; int32_t s; /**< EMA coefficient */ - float ratio = 1.0; /**< Frame packet ratio */ uint32_t wish; uint32_t max = jb->max; bool down = false; @@ -266,19 +327,12 @@ static void calc_rdiff(struct jbuf *jb, uint16_t seq) if (!jb->seq_get) return; - if (jb->nf) { - ratio = (float)jb->n / (float)jb->nf; - max = (uint32_t)(max / ratio); - } - rdiff = (int16_t)(jb->seq_put + 1 - seq); adiff = abs(rdiff * JBUF_RDIFF_EMA_COEFF); - s = adiff > jb->rdiff ? JBUF_RDIFF_UP_SPEED : - jb->wish > 2 ? 1 : - jb->wish > 1 ? 2 : 3; + s = adiff > jb->rdiff ? JBUF_RDIFF_UP_SPEED : 1; jb->rdiff += (adiff - jb->rdiff) * s / JBUF_RDIFF_EMA_COEFF; - wish = (uint32_t)(jb->rdiff / (float)JBUF_RDIFF_EMA_COEFF / ratio); + wish = (uint32_t)(jb->rdiff / (float)JBUF_RDIFF_EMA_COEFF); if (wish < jb->min) wish = jb->min; @@ -290,7 +344,8 @@ static void calc_rdiff(struct jbuf *jb, uint16_t seq) jb->wish = wish; } else if (wish < jb->wish) { - uint32_t dt = wish + 1 == jb->wish ? 6000 : 1000; + uint32_t dt = wish + 1 == jb->wish ? 6000 : + wish < jb->wish / 2 ? 100 : 1000; if (!tmr_isrunning(&jb->tmr) || tmr_get_expire(&jb->tmr) > dt) tmr_start(&jb->tmr, dt, wish_down, jb); @@ -314,11 +369,9 @@ static void calc_rdiff(struct jbuf *jb, uint16_t seq) int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) { struct packet *f; - struct packet *fc; struct le *le, *tail; uint16_t seq; uint64_t tr, dt; - bool equal; int err = 0; if (!jb || !hdr) @@ -354,6 +407,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) /* Packet arrived too late to be put into buffer */ if (jb->seq_get && seq_less(seq, jb->seq_get + 1)) { STAT_INC(n_late); + plot_jbuf_event(jb, 'L'); DEBUG_INFO("packet too late: seq=%u " "(seq_put=%u seq_get=%u)\n", seq, jb->seq_put, jb->seq_get); @@ -392,6 +446,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) /* Detect duplicates */ DEBUG_INFO("duplicate: seq=%u\n", seq); STAT_INC(n_dups); + plot_jbuf_event(jb, 'D'); list_insert_after(&jb->packetl, le, &f->le, f); packet_deref(jb, f); err = EALREADY; @@ -409,6 +464,7 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) } STAT_INC(n_oos); + plot_jbuf_event(jb, 'S'); success: /* Update last sequence */ @@ -419,21 +475,10 @@ int jbuf_put(struct jbuf *jb, const struct rtp_header *hdr, void *mem) f->hdr = *hdr; f->mem = mem_ref(mem); - equal = false; - if (f->le.prev) { - fc = f->le.prev->data; - equal = (fc->hdr.ts == f->hdr.ts); - } - - if (!equal && f->le.next) { - fc = f->le.next->data; - equal = (fc->hdr.ts == f->hdr.ts); - } - - if (!equal) - ++jb->nf; - out: +#if DEBUG_LEVEL >= 6 + plot_jbuf(jb, tr); +#endif mtx_unlock(jb->lock); return err; } @@ -460,10 +505,11 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) mtx_lock(jb->lock); STAT_INC(n_get); - if (jb->nf <= jb->wish || !jb->packetl.head) { + if (jb->n <= jb->wish || !jb->packetl.head) { DEBUG_INFO("not enough buffer packets - wait.. " "(n=%u wish=%u)\n", jb->n, jb->wish); STAT_INC(n_underflow); + plot_jbuf_event(jb, 'U'); err = ENOENT; goto out; } @@ -483,6 +529,7 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) } else if (seq_diff > 1) { STAT_ADD(n_lost, 1); + plot_jbuf_event(jb, 'T'); DEBUG_INFO("get: n_lost: diff=%d,seq=%u,seq_get=%u\n", seq_diff, f->hdr.seq, jb->seq_get); } @@ -495,24 +542,11 @@ int jbuf_get(struct jbuf *jb, struct rtp_header *hdr, void **mem) *hdr = f->hdr; *mem = mem_ref(f->mem); - /* decrease not equal frames */ - if (f->le.next) { - struct packet *next_f = f->le.next->data; - - if (f->hdr.ts != next_f->hdr.ts) - --jb->nf; - } - else { - --jb->nf; - } - packet_deref(jb, f); - - if (jb->nf > jb->wish) { - DEBUG_INFO("reducing jitter buffer " - "(nf=%u min=%u wish=%u max=%u)\n", - jb->nf, jb->min, jb->wish, jb->max); - err = EAGAIN; + if (jb->packetl.head) { + f = jb->packetl.head->data; + if (jb->n > jb->wish) + err = EAGAIN; } out: @@ -556,17 +590,6 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) *hdr = f->hdr; *mem = mem_ref(f->mem); - /* decrease not equal frames */ - if (f->le.next) { - struct packet *next_f = f->le.next->data; - - if (f->hdr.ts != next_f->hdr.ts) - --jb->nf; - } - else { - --jb->nf; - } - packet_deref(jb, f); out: @@ -575,7 +598,7 @@ int jbuf_drain(struct jbuf *jb, struct rtp_header *hdr, void **mem) } /** - * Flush all frames in the jitter buffer + * Flush all packets in the jitter buffer * * @param jb Jitter buffer */ @@ -591,19 +614,18 @@ void jbuf_flush(struct jbuf *jb) mtx_lock(jb->lock); if (jb->packetl.head) { - DEBUG_INFO("flush: %u frames\n", jb->n); + DEBUG_INFO("flush: %u packets\n", jb->n); } - /* put all buffered frames back in free list */ + /* put all buffered packets back in free list */ for (le = jb->packetl.head; le; le = jb->packetl.head) { - DEBUG_INFO(" flush frame: seq=%u\n", + DEBUG_INFO(" flush packet: seq=%u\n", ((struct packet *)(le->data))->hdr.seq); packet_deref(jb, le->data); } jb->n = 0; - jb->nf = 0; jb->running = false; jb->seq_get = 0; @@ -611,6 +633,7 @@ void jbuf_flush(struct jbuf *jb) n_flush = STAT_INC(n_flush); memset(&jb->stat, 0, sizeof(jb->stat)); jb->stat.n_flush = n_flush; + plot_jbuf_event(jb, 'F'); #endif mtx_unlock(jb->lock); } @@ -636,26 +659,6 @@ uint32_t jbuf_packets(const struct jbuf *jb) } -/** - * Get number of current frames - * - * @param jb Jitter buffer - * - * @return number of frames - */ -uint32_t jbuf_frames(const struct jbuf *jb) -{ - if (!jb) - return 0; - - mtx_lock(jb->lock); - uint32_t n = jb->nf; - mtx_unlock(jb->lock); - - return n; -} - - /** * Get jitter buffer statistics * @@ -701,8 +704,8 @@ int jbuf_debug(struct re_printf *pf, const struct jbuf *jb) mtx_lock(jb->lock); err |= mbuf_printf(mb, " running=%d", jb->running); - err |= mbuf_printf(mb, " min=%u cur=%u/%u max=%u [frames/packets]\n", - jb->min, jb->nf, jb->n, jb->max); + err |= mbuf_printf(mb, " min=%u cur=%u max=%u [packets]\n", + jb->min, jb->n, jb->max); err |= mbuf_printf(mb, " seq_put=%u\n", jb->seq_put); #if JBUF_STAT diff --git a/test/jbuf.c b/test/jbuf.c index e7d3dad12..d6373efec 100644 --- a/test/jbuf.c +++ b/test/jbuf.c @@ -281,37 +281,53 @@ int test_jbuf_adaptive_video(void) hdr.ts = 100; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(1, jbuf_packets(jb)); hdr.seq = 2; hdr.ts = 100; /* Same frame */ err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(2, jbuf_packets(jb)); hdr.seq = 4; hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(3, jbuf_packets(jb)); hdr.seq = 3; /* unordered late packet */ hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(4, jbuf_packets(jb)); hdr.seq = 5; hdr.ts = 300; err = jbuf_put(jb, &hdr, frv[4]); TEST_ERR(err); - TEST_EQUALS(3, jbuf_frames(jb)); TEST_EQUALS(5, jbuf_packets(jb)); + err = jbuf_get(jb, &hdr2, &mem); /* first packet with unique frame */ + mem = mem_deref(mem); + TEST_EQUALS(EAGAIN, err); + TEST_EQUALS(1, hdr2.seq); + TEST_EQUALS(100, hdr2.ts); + + err = jbuf_get(jb, &hdr2, &mem); /* second packet with unique frame */ + mem = mem_deref(mem); + TEST_EQUALS(EAGAIN, err); /* n > wish */ + TEST_EQUALS(2, hdr2.seq); + TEST_EQUALS(100, hdr2.ts); + + err = jbuf_get(jb, &hdr2, &mem); + mem = mem_deref(mem); + TEST_EQUALS(EAGAIN, err); /* n > wish */ + err = jbuf_get(jb, &hdr2, &mem); + mem = mem_deref(mem); + TEST_ERR(err); /* n == wish */ + TEST_EQUALS(4, hdr2.seq); + TEST_EQUALS(200, hdr2.ts); + /* --- Test late packet, unique frame --- */ jbuf_flush(jb); @@ -319,28 +335,24 @@ int test_jbuf_adaptive_video(void) hdr.ts = 100; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(1, jbuf_packets(jb)); hdr.seq = 2; hdr.ts = 100; /* Same frame */ err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(2, jbuf_packets(jb)); hdr.seq = 4; hdr.ts = 300; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(3, jbuf_packets(jb)); hdr.seq = 3; /* unordered late packet */ hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); - TEST_EQUALS(3, jbuf_frames(jb)); TEST_EQUALS(4, jbuf_packets(jb)); /* --- Test lost get --- */ @@ -350,14 +362,12 @@ int test_jbuf_adaptive_video(void) hdr.ts = 100; err = jbuf_put(jb, &hdr, frv[0]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(1, jbuf_packets(jb)); hdr.seq = 2; hdr.ts = 100; /* Same frame */ err = jbuf_put(jb, &hdr, frv[1]); TEST_ERR(err); - TEST_EQUALS(1, jbuf_frames(jb)); TEST_EQUALS(2, jbuf_packets(jb)); /* LOST hdr.seq = 3; */ @@ -366,26 +376,22 @@ int test_jbuf_adaptive_video(void) hdr.ts = 200; err = jbuf_put(jb, &hdr, frv[2]); TEST_ERR(err); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(3, jbuf_packets(jb)); hdr.seq = 5; hdr.ts = 300; err = jbuf_put(jb, &hdr, frv[3]); TEST_ERR(err); - TEST_EQUALS(3, jbuf_frames(jb)); TEST_EQUALS(4, jbuf_packets(jb)); err = jbuf_get(jb, &hdr2, &mem); TEST_EQUALS(EAGAIN, err); mem = mem_deref(mem); - TEST_EQUALS(3, jbuf_frames(jb)); TEST_EQUALS(3, jbuf_packets(jb)); err = jbuf_get(jb, &hdr2, &mem); TEST_EQUALS(EAGAIN, err); mem = mem_deref(mem); - TEST_EQUALS(2, jbuf_frames(jb)); TEST_EQUALS(2, jbuf_packets(jb)); err = 0;