Skip to content

Commit

Permalink
rtp: lock more fields from rtcp_sess (#1039)
Browse files Browse the repository at this point in the history
* rtp: lock more rtcp_sess fields and code

* rtp: replace rtp_source lock by extended rtp_sess lock
  • Loading branch information
cspiel1 authored Jan 5, 2024
1 parent 34cce57 commit b57a240
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 88 deletions.
1 change: 0 additions & 1 deletion src/rtp/fb.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <re_sys.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
1 change: 0 additions & 1 deletion src/rtp/member.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <re_hash.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
1 change: 0 additions & 1 deletion src/rtp/pkt.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <re_sys.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
1 change: 0 additions & 1 deletion src/rtp/rr.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <re_sys.h>
#include <re_net.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
1 change: 0 additions & 1 deletion src/rtp/rtcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <re_list.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
4 changes: 0 additions & 4 deletions src/rtp/rtcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ struct rtp_source {
uint32_t last_rtp_ts; /**< Last RTP timestamp */
uint32_t psent; /**< RTP packets sent */
uint32_t osent; /**< RTP octets sent */

mtx_t *lock; /**< Lock for this struct */
};

/** RTP Member */
Expand All @@ -73,8 +71,6 @@ void source_calc_jitter(struct rtp_source *s, uint32_t rtp_ts,
uint32_t arrival);
int source_calc_lost(const struct rtp_source *s);
uint8_t source_calc_fraction_lost(struct rtp_source *s);
int source_lock(struct rtp_source *s);
int source_unlock(struct rtp_source *s);

/* RR (Reception report) */
int rtcp_rr_alloc(struct rtcp_rr **rrp, size_t count);
Expand Down
1 change: 0 additions & 1 deletion src/rtp/rtp.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <re_sys.h>
#include <re_net.h>
#include <re_udp.h>
#include <re_thread.h>
#include <re_rtp.h>
#include <re_atomic.h>
#include "rtcp.h"
Expand Down
1 change: 0 additions & 1 deletion src/rtp/sdes.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <re_list.h>
#include <re_sa.h>
#include <re_rtp.h>
#include <re_thread.h>
#include "rtcp.h"


Expand Down
110 changes: 46 additions & 64 deletions src/rtp/sess.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ struct rtcp_sess {
uint32_t srate_tx; /**< Transmit sampling rate */
uint32_t srate_rx; /**< Receive sampling rate */
uint32_t interval; /**< RTCP interval in [ms] */
mtx_t *lock; /**< Lock for rtcp_sess */

/* stats */
mtx_t *lock; /**< Lock for txstat */
struct txstat txstat; /**< Local transmit statistics */
};

Expand All @@ -84,14 +84,6 @@ static void sess_destructor(void *data)
}


static void source_destructor(void *data)
{
struct rtp_source *s = data;

mem_deref(s->lock);
}


static struct rtp_member *get_member(struct rtcp_sess *sess, uint32_t src)
{
struct rtp_member *mbr;
Expand Down Expand Up @@ -186,7 +178,6 @@ static void handle_incoming_sr(struct rtcp_sess *sess,

if (mbr->s) {
/* Save time when SR was received */
source_lock(mbr->s);
mbr->s->sr_recv = tmr_jiffies();

/* Save NTP timestamp from SR */
Expand All @@ -195,7 +186,6 @@ static void handle_incoming_sr(struct rtcp_sess *sess,
mbr->s->rtp_ts = msg->r.sr.rtp_ts;
mbr->s->psent = msg->r.sr.psent;
mbr->s->osent = msg->r.sr.osent;
source_unlock(mbr->s);
}

for (i=0; i<msg->hdr.count; i++)
Expand Down Expand Up @@ -229,6 +219,7 @@ void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg)
if (!sess || !msg)
return;

mtx_lock(sess->lock);
switch (msg->hdr.pt) {

case RTCP_SR:
Expand All @@ -246,6 +237,8 @@ void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg)
default:
break;
}

mtx_unlock(sess->lock);
}


Expand Down Expand Up @@ -357,8 +350,10 @@ int rtcp_enable(struct rtcp_sess *sess, bool enabled, const char *cname)
if (!sess)
return EINVAL;

mtx_lock(sess->lock);
sess->cname = mem_deref(sess->cname);
err = str_dup(&sess->cname, cname);
mtx_unlock(sess->lock);
if (err)
return err;

Expand Down Expand Up @@ -402,14 +397,12 @@ static bool sender_apply_handler(struct le *le, void *arg)

/* Initialise the members */
rr.ssrc = mbr->src;
source_lock(s);
rr.fraction = source_calc_fraction_lost(s);
rr.lost = source_calc_lost(s);
rr.last_seq = s->cycles | s->max_seq;
rr.jitter = s->jitter >> 4;
rr.lsr = calc_lsr(&s->last_sr);
rr.dlsr = calc_dlsr(s->sr_recv);
source_unlock(s);

return 0 != rtcp_rr_encode(mb, &rr);
}
Expand Down Expand Up @@ -583,38 +576,28 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr,
if (!sess)
return;

mtx_lock(sess->lock);
mbr = get_member(sess, hdr->ssrc);
if (!mbr) {
DEBUG_NOTICE("could not add member: 0x%08x\n", hdr->ssrc);
return;
goto out;
}

if (!mbr->s) {
int err;

mbr->s = mem_zalloc(sizeof(*mbr->s), source_destructor);
if (!mbr->s)
err = ENOMEM;
else
err = mutex_alloc(&mbr->s->lock);

if (err) {
mbr->s = mem_zalloc(sizeof(*mbr->s), NULL);
if (!mbr->s) {
DEBUG_NOTICE("could not add sender: 0x%08x\n",
hdr->ssrc);
mbr->s = mem_deref(mbr->s);
return;
goto out;
}

/* first packet - init sequence number */
source_lock(mbr->s);
source_init_seq(mbr->s, hdr->seq);
/* probation not used */
sa_cpy(&mbr->s->rtp_peer, peer);
source_unlock(mbr->s);
++sess->senderc;
}

source_lock(mbr->s);
if (!source_update_seq(mbr->s, hdr->seq)) {
DEBUG_WARNING("rtp_update_seq() returned 0\n");
}
Expand All @@ -635,7 +618,8 @@ void rtcp_sess_rx_rtp(struct rtcp_sess *sess, struct rtp_header *hdr,

mbr->s->last_rtp_ts = hdr->ts;
mbr->s->rtp_rx_bytes += payload_size;
source_unlock(mbr->s);
out:
mtx_unlock(sess->lock);
}


Expand All @@ -652,17 +636,19 @@ int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats)
{
const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
struct rtp_member *mbr;
int err = 0;

if (!sess || !stats)
return EINVAL;

mtx_lock(sess->lock);
mbr = member_find(sess->members, ssrc);
if (!mbr)
return ENOENT;
if (!mbr) {
err = ENOENT;
goto out;
}

mtx_lock(sess->lock);
stats->tx.sent = sess->txstat.psent;
mtx_unlock(sess->lock);

stats->tx.lost = mbr->cum_lost;
stats->tx.jit = mbr->jit;
Expand All @@ -671,52 +657,36 @@ int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats)

if (!mbr->s) {
memset(&stats->rx, 0, sizeof(stats->rx));
return 0;
goto out;
}

source_lock(mbr->s);
stats->rx.sent = mbr->s->received;
stats->rx.lost = source_calc_lost(mbr->s);
stats->rx.jit = sess->srate_rx ?
1000000 * (mbr->s->jitter>>4) / sess->srate_rx : 0;
source_unlock(mbr->s);

return 0;
out:
mtx_unlock(sess->lock);
return err;
}


static bool debug_handler(struct le *le, void *arg)
{
const struct rtp_member *mbr = le->data;
struct re_printf *pf = arg;
struct mbuf *mb = NULL;
struct mbuf *mb = arg;
int err;

err = re_hprintf(pf, " member 0x%08x: lost=%d Jitter=%.1fms"
err = mbuf_printf(mb, " member 0x%08x: lost=%d Jitter=%.1fms"
" RTT=%.1fms\n", mbr->src, mbr->cum_lost,
(double)mbr->jit/1000, (double)mbr->rtt/1000);
if (err)
return true;

if (mbr->s) {
mb = mbuf_alloc(64);
if (!mb)
return true;

source_lock(mbr->s);
err = mbuf_printf(mb,
" IP=%J psent=%u rcvd=%u\n",
&mbr->s->rtp_peer, mbr->s->psent,
mbr->s->received);
source_unlock(mbr->s);
if (err)
goto out;

re_hprintf(pf, "%b", mb->buf, mb->pos);
err |= mbuf_printf(mb,
" IP=%J psent=%u rcvd=%u\n",
&mbr->s->rtp_peer, mbr->s->psent,
mbr->s->received);
}

out:
mem_deref(mb);
return err != 0;
}

Expand All @@ -732,23 +702,35 @@ static bool debug_handler(struct le *le, void *arg)
int rtcp_debug(struct re_printf *pf, const struct rtp_sock *rs)
{
const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
struct mbuf *mb;
int err = 0;

if (!sess)
return 0;

err |= re_hprintf(pf, "----- RTCP Session: -----\n");
err |= re_hprintf(pf, " cname=%s SSRC=0x%08x/%u rx=%uHz\n",
mb = mbuf_alloc(64);
if (!mb)
return ENOMEM;

err |= mbuf_printf(mb, "----- RTCP Session: -----\n");
mtx_lock(sess->lock);
err |= mbuf_printf(mb, " cname=%s SSRC=0x%08x/%u rx=%uHz\n",
sess->cname,
rtp_sess_ssrc(sess->rs), rtp_sess_ssrc(sess->rs),
sess->srate_rx);

hash_apply(sess->members, debug_handler, pf);
hash_apply(sess->members, debug_handler, mb);

mtx_lock(sess->lock);
err |= re_hprintf(pf, " TX: packets=%u, octets=%u\n",
err |= mbuf_printf(mb, " TX: packets=%u, octets=%u\n",
sess->txstat.psent, sess->txstat.osent);
mtx_unlock(sess->lock);

if (err)
goto out;

err = re_hprintf(pf, "%b", mb->buf, mb->pos);

out:
mem_deref(mb);
return err;
}
13 changes: 0 additions & 13 deletions src/rtp/source.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <re_list.h>
#include <re_hash.h>
#include <re_sa.h>
#include <re_thread.h>
#include <re_rtp.h>
#include "rtcp.h"

Expand Down Expand Up @@ -176,15 +175,3 @@ uint8_t source_calc_fraction_lost(struct rtp_source *s)

return fraction;
}


int source_lock(struct rtp_source *s)
{
return mtx_lock(s->lock);
}


int source_unlock(struct rtp_source *s)
{
return mtx_unlock(s->lock);
}

0 comments on commit b57a240

Please sign in to comment.