diff --git a/cmake/modules.cmake b/cmake/modules.cmake index a4215d1..11bdb18 100644 --- a/cmake/modules.cmake +++ b/cmake/modules.cmake @@ -14,6 +14,7 @@ set(MODULES vidloop parcall qualify + multicast ) if(FVAD_FOUND) diff --git a/modules/multicast/CMakeLists.txt b/modules/multicast/CMakeLists.txt new file mode 100644 index 0000000..d88abe4 --- /dev/null +++ b/modules/multicast/CMakeLists.txt @@ -0,0 +1,12 @@ +project(multicast) + +list(APPEND MODULES_DETECTED ${PROJECT_NAME}) +set(MODULES_DETECTED ${MODULES_DETECTED} PARENT_SCOPE) + +set(SRCS multicast.c player.c receiver.c sender.c source.c) + +if(STATIC) + add_library(${PROJECT_NAME} OBJECT ${SRCS}) +else() + add_library(${PROJECT_NAME} MODULE ${SRCS}) +endif() diff --git a/modules/multicast/multicast.c b/modules/multicast/multicast.c new file mode 100644 index 0000000..42e5f8c --- /dev/null +++ b/modules/multicast/multicast.c @@ -0,0 +1,716 @@ +/** + * @file multicast.c + * + * @note supported codecs are PCMU, PCMA, G722 + * + * Copyright (C) 2021 Commend.com - c.huber@commend.com + */ + +#include +#include + +#include "multicast.h" + +#define DEBUG_MODULE "multicast" +#define DEBUG_LEVEL 6 +#include + + +struct mccfg { + uint32_t callprio; + uint32_t ttl; + uint32_t tfade; +}; + +static struct mccfg mccfg = { + 0, + 1, + 125, +}; + + +/** + * Decode IP-address : + * + * @param pladdr Parameter string + * @param addr Address ptr + * + * @return 0 if success, otherwise errorcode + */ +static int decode_addr(struct pl *pladdr, struct sa *addr) +{ + int err = 0; + + err = sa_decode(addr, pladdr->p, pladdr->l); + if (err) + warning ("multicast: address decode (%m)\n", err); + + + if (sa_port(addr) % 2) + warning("multicast: address port for RTP should be even" + " (%d)\n" , sa_port(addr)); + + return err; +} + + +/** + * Decode audiocodec + * + * @param plcodec Parameter string + * @param codecptr Codec ptr + * + * @return 0 if success, otherwise errorcode + */ +static int decode_codec(struct pl *plcodec, struct aucodec **codecptr) +{ + int err = 0; + struct list *acodeclist = baresip_aucodecl(); + struct aucodec *codec = NULL; + struct le *le; + + LIST_FOREACH(acodeclist, le) { + codec = list_ledata(le); + if (0 == pl_strcasecmp(plcodec, codec->name)) + break; + + codec = NULL; + } + + if (!codec) { + err = EINVAL; + warning ("multicast: codec not found (%r)\n", plcodec); + } + + *codecptr = codec; + return err; +} + + +/** + * Check audio encoder RTP payload type + * + * @param ac Audiocodec object + * + * @return 0 if success, otherwise errorcode + */ +static int check_rtp_pt(struct aucodec *ac) +{ + if (!ac) + return EINVAL; + + return ac->pt ? 0 : ENOTSUP; +} + + +/** + * Getter for the call priority + * + * @return uint8_t call priority + */ +uint8_t multicast_callprio(void) +{ + return mccfg.callprio; +} + + +/** + * Getter for configurable multicast TTL + * + * @return uint8_t multicast TTL + */ +uint8_t multicast_ttl(void) +{ + return mccfg.ttl; +} + + +/** + * Getter for configurable multicast fade in/out time + * + * @return uint32_t multicast fade time + */ +uint32_t multicast_fade_time(void) +{ + return mccfg.tfade; +} + + +/** + * Create a new multicast sender + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcsend(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl pladdr, plcodec; + struct sa addr; + struct aucodec *codec = NULL; + + err = re_regex(carg->prm, str_len(carg->prm), + "addr=[^ ]* codec=[^ ]*", &pladdr, &plcodec); + if (err) + goto out; + + err = decode_addr(&pladdr, &addr); + err |= decode_codec(&plcodec, &codec); + if (err) + goto out; + + err = check_rtp_pt(codec); + if (err) + goto out; + + err = mcsender_alloc(&addr, codec); + + out: + if (err) + re_hprintf(pf, + "usage: /mcsend addr=: codec=\n"); + + return err; +} + + +/** + * Enable / Disable all multicast sender without removing it + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcsenden(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl plenable; + bool enable; + + err = re_regex(carg->prm, str_len(carg->prm), + "enable=[^ ]*", &plenable); + if (err) + goto out; + + enable = pl_u32(&plenable); + mcsender_enable(enable); + + out: + if (err) + re_hprintf(pf, "usage: /mcsenden enable=<0,1>\n"); + + return err; +} + + +/** + * Stop all multicast sender + * + * @param pf Printer + * @param arg Command arguments + * + * @return always 0 + */ +static int cmd_mcstopall(struct re_printf *pf, void *arg) +{ + (void) pf; + (void) arg; + + mcsender_stopall(); + return 0; +} + + +/** + * Stop a specified multicast sender + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcstop(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl pladdr; + struct sa addr; + + err = re_regex(carg->prm, str_len(carg->prm), + "addr=[^ ]*", &pladdr); + if (err) + goto out; + + err = decode_addr(&pladdr, &addr); + if (err) + goto out; + + mcsender_stop(&addr); + + out: + if (err) + re_hprintf(pf, "usage: /mcstop addr=:\n"); + + return err; +} + + +/** + * Print all multicast information + * + * @param pf Printer + * @param arg Command arguments + * + * @return alwasys 0 + */ +static int cmd_mcinfo(struct re_printf *pf, void *arg) +{ + (void)pf; + (void)arg; + + mcsender_print(pf); + mcreceiver_print(pf); + + return 0; +} + + +/** + * Create a new multicast listener with prio + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcreg(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl pladdr, plprio; + struct sa addr; + uint32_t prio; + + err = re_regex(carg->prm, str_len(carg->prm), "addr=[^ ]* prio=[^ ]*", + &pladdr, &plprio); + if (err) + goto out; + + prio = pl_u32(&plprio); + err = decode_addr(&pladdr, &addr); + if (err || !prio) { + if (!prio) + err = EINVAL; + goto out; + } + + err = mcreceiver_alloc(&addr, prio); + + out: + if (err) + re_hprintf(pf, "usage: /mcreg addr=: " + "prio=<1-255>\n"); + + return err; +} + + +/** + * Un-register a multicast listener + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcunreg(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl pladdr; + struct sa addr; + + err = re_regex(carg->prm, str_len(carg->prm), + "addr=[^ ]*", &pladdr); + if (err) + goto out; + + err = decode_addr(&pladdr, &addr); + if (err) + goto out; + + mcreceiver_unreg(&addr); + + out: + if (err) + re_hprintf(pf, "usage: /mcunreg addr=:\n"); + + return err; +} + + +/** + * Un-register all multicast listener + * + * @param pf Printer + * @param arg Command arguments + * + * @return always 0 + */ +static int cmd_mcunregall(struct re_printf *pf, void *arg) +{ + (void) pf; + (void) arg; + + mcreceiver_unregall(); + return 0; +} + + +/** + * Change priority of existing multicast listener + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcchprio(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl pladdr, plprio; + uint32_t prio; + struct sa addr; + + err = re_regex(carg->prm, str_len(carg->prm), + "addr=[^ ]* prio=[^ ]*", &pladdr, &plprio); + if (err) + goto out; + + err = decode_addr(&pladdr, &addr); + if (err) + goto out; + + prio = pl_u32(&plprio); + + err = mcreceiver_chprio(&addr, prio); + + out: + if (err) + re_hprintf(pf, "usage: /mcchprio addr=: " + "prio=<1-255>\n"); + + return err; +} + + +/** + * Enables all multicast listener with prio <= given prio and + * disables those with prio > given pri + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcprioen(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl plprio; + uint32_t prio; + + err = re_regex(carg->prm, str_len(carg->prm), + "prio=[^ ]*", &plprio); + if (err) + goto out; + + prio = pl_u32(&plprio); + mcreceiver_enprio(prio); + + out: + if (err) + re_hprintf(pf, "usage: /mcprioen prio=<1-255>\n"); + + return err; +} + + +/** + * Enable / Disable a certain range of priorities + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcprioren(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl plpriol, plprioh, plenable; + uint32_t priol = 0, prioh = 0; + bool enable = false; + + err = re_regex(carg->prm, str_len(carg->prm), + "range=[0-9]*-[0-9]* enable=[0-1]1", &plpriol, &plprioh, + &plenable); + if (err) + goto out; + + priol = pl_u32(&plpriol); + prioh = pl_u32(&plprioh); + enable = pl_u32(&plenable); + + if (priol > prioh) { + err = EINVAL; + goto out; + } + + mcreceiver_enrangeprio(priol, prioh, enable); + + out: + if (err) + re_hprintf(pf, "usage: /mcprioren range=<1-255>-<1-255>" + " enable=<0,1>\n"); + + return err; +} + + +/** + * Set specified multicast as ignored + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + + */ +static int cmd_mcignore(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl plprio; + uint32_t prio = 0; + + err = re_regex(carg->prm, str_len(carg->prm), + "prio=[^ ]*", &plprio); + if (err) + goto out; + + prio = pl_u32(&plprio); + + if (!prio) { + err = EINVAL; + goto out; + } + + err = mcreceiver_prioignore(prio); + + out: + if (err) + re_hprintf(pf, "usage: /mcignore prio=<1-255>\n"); + + return err; +} + + +/** + * Toggle mute of multicast + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcmute(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl plprio; + uint32_t prio = 0; + + err = re_regex(carg->prm, str_len(carg->prm), "prio=[^ ]*", &plprio); + if (err) + goto out; + + prio = pl_u32(&plprio); + + if (!prio) { + err = EINVAL; + goto out; + } + + err = mcreceiver_mute(prio); + + out: + if (err) + re_hprintf(pf, "usage: /mcmute prio=<1-255>\n"); + + return err; +} + + +/** + * Enable / Disable all multicast receiver without removing it + * + * @param pf Printer + * @param arg Command arguments + * + * @return 0 if success, otherwise errorcode + */ +static int cmd_mcregen(struct re_printf *pf, void *arg) +{ + int err = 0; + const struct cmd_arg *carg = arg; + struct pl plenable; + bool enable; + + err = re_regex(carg->prm, str_len(carg->prm), + "enable=[^ ]*", &plenable); + if (err) + goto out; + + enable = pl_u32(&plenable); + mcreceiver_enable(enable); + + out: + if (err) + re_hprintf(pf, "usage: /mcregen enable=<0,1>"); + + return err; +} + + +/** + * config handler: call this handler foreach line given by @conf_apply function + * + * @param pl PL containing the parameter of the config line + * @param arg (int*) external priority counter + * + * @return 0 if success, otherwise errorcode + */ +static int module_read_config_handler(const struct pl *pl, void *arg) +{ + struct cmd_arg cmd_arg; + char buf[64]; + int err = 0; + int n = 0; + int *prio = (int *) arg; + + if (pl_strchr(pl, '-')) + goto out; + + n = re_snprintf(buf, sizeof(buf), "addr=%r prio=%d", pl, *prio); + if (n < 0) + goto out; + + cmd_arg.prm = buf; + err = cmd_mcreg(NULL, &cmd_arg); + + out: + if (!err) + (*prio)++; + + return err; +} + + +/** + * Read the config lines for configured multicast addresses + * + * @return 0 if success, otherwise errorcode + */ +static int module_read_config(void) +{ + int err = 0, prio = 1; + struct sa laddr; + + (void)conf_get_u32(conf_cur(), "multicast_call_prio", &mccfg.callprio); + if (mccfg.callprio > 255) + mccfg.callprio = 255; + + (void)conf_get_u32(conf_cur(), "multicast_ttl", &mccfg.ttl); + if (mccfg.ttl > 255) + mccfg.ttl = 255; + + (void)conf_get_u32(conf_cur(), "multicast_fade_time", &mccfg.tfade); + if (mccfg.tfade > 2000) + mccfg.tfade = 2000; + + sa_init(&laddr, AF_INET); + err = conf_apply(conf_cur(), "multicast_listener", + module_read_config_handler, &prio); + if (err) + warning("Could not parse multicast config from file"); + + return err; +} + + +static const struct cmd cmdv[] = { + {"mcinfo", 0, CMD_PRM, "Show multicast information", cmd_mcinfo }, + + {"mcsend", 0, CMD_PRM, "Send multicast" , cmd_mcsend }, + {"mcstop", 0, CMD_PRM, "Stop multicast" , cmd_mcstop }, + {"mcstopall", 0, CMD_PRM, "Stop all multicast" , cmd_mcstopall}, + {"mcsenden", 0, CMD_PRM, "Enable/Disable all sender" , cmd_mcsenden }, + + {"mcreg", 0, CMD_PRM, "Reg. multicast listener" , cmd_mcreg }, + {"mcunreg", 0, CMD_PRM, "Unreg. multicast listener" , cmd_mcunreg }, + {"mcunregall",0, CMD_PRM, "Unreg. all multicast listener", + cmd_mcunregall}, + {"mcchprio" ,0, CMD_PRM, "Change priority" , cmd_mcchprio }, + {"mcprioen" ,0, CMD_PRM, "Enable Listener Prio >=" , cmd_mcprioen }, + {"mcprioren", 0, CMD_PRM, "Enable Listener Prio range", cmd_mcprioren}, + {"mcignore", 0, CMD_PRM, "Ignore stream priority" , cmd_mcignore }, + {"mcmute", 0, CMD_PRM, "Mute stream priority" , cmd_mcmute }, + {"mcregen" ,0, CMD_PRM, "Enable / Disable all listener", + cmd_mcregen}, +}; + + +static int module_init(void) +{ + int err = 0; + + err = module_read_config(); + err |= cmd_register(baresip_commands(), cmdv, RE_ARRAY_SIZE(cmdv)); + + err |= mcsource_init(); + err |= mcplayer_init(); + + if (!err) + info("multicast: module init\n"); + + return err; +} + + +static int module_close(void) +{ + mcsender_stopall(); + mcreceiver_unregall(); + + cmd_unregister(baresip_commands(), cmdv); + + mcsource_terminate(); + mcplayer_terminate(); + + return 0; +} + + +const struct mod_export DECL_EXPORTS(multicast) = { + "multicast", + "application", + module_init, + module_close +}; diff --git a/modules/multicast/multicast.h b/modules/multicast/multicast.h new file mode 100644 index 0000000..eaf4119 --- /dev/null +++ b/modules/multicast/multicast.h @@ -0,0 +1,68 @@ +/** + * @file multicast.h Private multicast interface + * + * Copyright (c) 2020 Commend.com - c.huber@commend.com + */ + + +/* Multicast */ +enum { + MAX_SRATE = 48000, /* Maximum sample rate in [Hz] */ + MAX_CHANNELS = 2, /* Maximum number of channels */ + MAX_PTIME = 60, /* Maximum packet time in [ms] */ + + STREAM_PRESZ = RTP_HEADER_SIZE + 4,/* same as RTP_HEADER_SIZE */ + + AUDIO_SAMPSZ = MAX_SRATE * MAX_CHANNELS * MAX_PTIME / 1000, + PTIME = 20, +}; + + +uint8_t multicast_callprio(void); +uint8_t multicast_ttl(void); +uint32_t multicast_fade_time(void); + + +/* Sender */ +typedef int (mcsender_send_h)(size_t ext_len, bool marker, uint32_t rtp_ts, + struct mbuf *mb, void *arg); + +int mcsender_alloc(struct sa *addr, const struct aucodec *codec); +void mcsender_stopall(void); +void mcsender_stop(struct sa *addr); +void mcsender_enable(bool enable); + +void mcsender_print(struct re_printf *pf); + +/* Receiver */ +int mcreceiver_alloc(struct sa *addr, uint8_t prio); +void mcreceiver_unregall(void); +void mcreceiver_unreg(struct sa *addr); +int mcreceiver_chprio(struct sa *addr, uint32_t prio); +void mcreceiver_enprio(uint32_t prio); +void mcreceiver_enrangeprio(uint32_t priol, uint32_t prioh, bool en); +int mcreceiver_prioignore(uint32_t prio); +int mcreceiver_mute(uint32_t prio); +void mcreceiver_enable(bool enable); + +void mcreceiver_print(struct re_printf *pf); + +/* Player */ +int mcplayer_start(const struct aucodec *ac); +void mcplayer_stop(void); +void mcplayer_fadeout(void); +void mcplayer_fadein(bool restart); +bool mcplayer_fadeout_done(void); +int mcplayer_decode(const struct rtp_header *hdr, struct mbuf *mb, bool drop); + +int mcplayer_init(void); +void mcplayer_terminate(void); + +/* Source */ +struct mcsource; +int mcsource_start(struct mcsource **srcp, const struct aucodec *ac, + mcsender_send_h *sendh, void *arg); +void mcsource_stop(struct mcsource *src); + +int mcsource_init(void); +void mcsource_terminate(void); diff --git a/modules/multicast/player.c b/modules/multicast/player.c new file mode 100644 index 0000000..fdb77a1 --- /dev/null +++ b/modules/multicast/player.c @@ -0,0 +1,503 @@ +/** + * @file multicast/player.c + * + * Copyright (C) 2021 Commend.com - c.huber@commend.com + */ + +#include +#include +#include + +#include "multicast.h" + + +#define DEBUG_MODULE "mcplayer" +#define DEBUG_LEVEL 6 +#include + + +enum fade_state { + FM_IDLE, + FM_FADEIN, + FM_FADEINDONE, + FM_FADEOUT, + FM_FADEOUTDONE, +}; + + +/** + * Multicast player struct + * + * Contains configuration of the audio player and buffer for the audio data + */ +struct mcplayer { + struct config_audio *cfg; + + struct auplay_st *auplay; + struct auplay_prm auplay_prm; + const struct aucodec *ac; + struct audec_state *dec; + struct aubuf *aubuf; + uint32_t ssrc; + + struct list filterl; + char *module; + char *device; + void *sampv; + uint32_t ptime; + enum aufmt play_fmt; + enum aufmt dec_fmt; + + enum fade_state fades; + uint32_t fade_cmax; + uint32_t fade_c; + float fade_dbstart; + float fade_delta; +}; + + +static struct mcplayer *player; + + +static void mcplayer_destructor(void *arg) +{ + (void) arg; + + mem_deref(player->auplay); + + mem_deref(player->module); + mem_deref(player->device); + mem_deref(player->dec); + + mem_deref(player->sampv); + mem_deref(player->aubuf); + list_flush(&player->filterl); +} + + +static void fade_process(struct auframe *af) +{ + size_t i; + int16_t *sampv_ptr = af->sampv; + float db_value; + + if (af->fmt != AUFMT_S16LE) + return; + + switch (player->fades) { + case FM_FADEIN: + if (player->fade_c == player->fade_cmax) { + player->fades = FM_FADEINDONE; + return; + } + + for (i = 0; i < af->sampc; i++) { + db_value = player->fade_dbstart + + (player->fade_c * player->fade_delta); + *(sampv_ptr) = *(sampv_ptr) * db_value; + ++sampv_ptr; + if (player->fade_c < player->fade_cmax) + ++player->fade_c; + } + + break; + + case FM_FADEOUT: + for (i = 0; i < af->sampc; i++) { + db_value = player->fade_dbstart + + (player->fade_c * player->fade_delta); + *(sampv_ptr) = *(sampv_ptr) * db_value; + ++sampv_ptr; + + if (player->fade_c > 0) + --player->fade_c; + } + + if (!player->fade_c) { + player->fades = FM_FADEOUTDONE; + return; + } + + break; + + case FM_FADEOUTDONE: + for (i = 0; i < af->sampc; i++) { + db_value = 1. - ((player->fade_cmax - 1) * + player->fade_delta); + *(sampv_ptr) = *(sampv_ptr) * db_value; + ++sampv_ptr; + } + + break; + + default: + break; + + } + + return; +} + + +/** + * Decode the payload of the RTP packet + * + * @param hdr RTP header + * @param mb RTP payload + * @param drop True if the jbuf returned EAGAIN + * + * @return 0 if success, otherwise errorcode + */ +int mcplayer_decode(const struct rtp_header *hdr, struct mbuf *mb, bool drop) +{ + struct auframe af; + struct le *le; + size_t sampc = AUDIO_SAMPSZ; + bool marker = hdr->m; + int err = 0; + + if (!player) + return EINVAL; + + if (!player->ac) + return 0; + + if (hdr->ext && hdr->x.len && mb) + return ENOTSUP; + + if (player->ssrc != hdr->ssrc) + aubuf_flush(player->aubuf); + + player->ssrc = hdr->ssrc; + if (mbuf_get_left(mb)) { + err = player->ac->dech(player->dec, player->dec_fmt, + player->sampv, &sampc, marker, + mbuf_buf(mb), mbuf_get_left(mb)); + if (err) + goto out; + } + else if (player->ac->plch && player->dec_fmt == AUFMT_S16LE) { + err = player->ac->plch(player->dec, player->dec_fmt, + player->sampv, &sampc, + mbuf_buf(mb), mbuf_get_left(mb)); + if (err) + goto out; + } + else { + /* no PLC in the codec, might be done in filters below */ + sampc = 0; + } + + auframe_init(&af, player->dec_fmt, player->sampv, sampc, + player->ac->srate, player->ac->ch); + af.timestamp = ((uint64_t) hdr->ts) * AUDIO_TIMEBASE / + player->ac->crate; + + for (le = player->filterl.tail; le; le = le->prev) { + struct aufilt_dec_st *st = le->data; + + if (st->af && st->af->dech) + err |= st->af->dech(st, &af); + + } + + if (!player->aubuf) + goto out; + + if (af.fmt != player->play_fmt) { + warning("multicast player: invalid sample formats (%s -> %s)." + " %s\n", + aufmt_name(af.fmt), aufmt_name(player->play_fmt), + player->play_fmt == AUFMT_S16LE ? + "Use module auconv!" : ""); + } + + if (player->auplay_prm.srate != af.srate || + player->auplay_prm.ch != af.ch) { + warning("multicast: srate/ch of frame %u/%u vs " + "player %u/%u. Use module auresamp!\n", + af.srate, af.ch, + player->auplay_prm.srate, player->auplay_prm.ch); + } + + if (drop) { + aubuf_drop_auframe(player->aubuf, &af); + goto out; + } + + fade_process(&af); + err = aubuf_write_auframe(player->aubuf, &af); + + out: + + return err; +} + + +/** + * Audio player write handler + * + * @param af Audio frame (af.sampv, af.sampc and af.fmt needed) + * @param arg unused + */ +static void auplay_write_handler(struct auframe *af, void *arg) +{ + (void) arg; + + if (!player) + return; + + aubuf_read_auframe(player->aubuf, af); +} + + +/** + * Setup all available audio filter for the decoder + * + * @param aufiltl List of audio filter + * + * @return 0 if success, otherwise errorcode + */ +static int aufilt_setup(struct list *aufiltl) +{ + struct aufilt_prm prm; + struct le *le; + int err = 0; + + if (!player->ac) + return 0; + + if (!list_isempty(&player->filterl)) + return 0; + + prm.srate = player->ac->srate; + prm.ch = player->ac->ch; + prm.fmt = player->dec_fmt; + + for (le = list_head(aufiltl); le; le = le->next) { + struct aufilt *af = le->data; + struct aufilt_dec_st *decst = NULL; + void *ctx = NULL; + + if (af->decupdh) { + err = af->decupdh(&decst, &ctx, af, &prm, NULL); + if (err) { + warning("multicast player: error in decoder" + "autio-filter '%s' (%m)\n", + af->name, err); + } + else { + decst->af = af; + list_append(&player->filterl, &decst->le, + decst); + } + } + + if (err) { + warning("multicast player: audio-filter '%s' " + "update failed (%m)\n", af->name, err); + break; + } + } + + return err; +} + + +/** + * Allocate and start a media player for the multicast + * + * @note singleton + * + * @param ac Audio codec + * + * @return 0 if success, otherwise errorcode + */ +int mcplayer_start(const struct aucodec *ac) +{ + int err = 0; + struct config_audio *cfg = &conf_config()->audio; + uint32_t srate_dsp; + uint32_t channels_dsp; + struct auplay_prm prm; + + if (!ac) + return EINVAL; + + if (player && + (player->fades == FM_FADEOUT || player->fades == FM_FADEIN)) + return EINPROGRESS; + + player = mem_deref(player); + player = mem_zalloc(sizeof(*player), mcplayer_destructor); + if (!player) + return ENOMEM; + + player->cfg = cfg; + player->ac = ac; + player->play_fmt = cfg->play_fmt; + player->dec_fmt = cfg->dec_fmt; + + err = str_dup(&player->module, cfg->play_mod); + err |= str_dup(&player->device, cfg->play_dev); + if (err) + goto out; + + player->sampv = mem_zalloc(AUDIO_SAMPSZ * + aufmt_sample_size(player->dec_fmt), NULL); + if (!player->sampv) { + err = ENOMEM; + goto out; + } + + player->ptime = PTIME; + if (player->ac->decupdh) { + err = player->ac->decupdh(&player->dec, player->ac, NULL); + if (err) { + warning ("multicast player: alloc decoder(%m)\n", + err); + goto out; + } + } + + srate_dsp = player->ac->srate; + channels_dsp = player->ac->ch; + + prm.srate = srate_dsp; + prm.ch = channels_dsp; + prm.ptime = player->ptime; + prm.fmt = player->play_fmt; + + if (multicast_fade_time()) { + player->fade_cmax = (multicast_fade_time() * prm.srate) / 1000; + player->fade_dbstart = 0.001; /*-60dB*/ + player->fade_delta = (1. - player->fade_dbstart) / + player->fade_cmax; + player->fades = FM_FADEIN; + } + + if (!player->aubuf) { + const size_t sz = aufmt_sample_size(player->play_fmt); + const size_t ptime_min = cfg->buffer.min; + const size_t ptime_max = cfg->buffer.max; + size_t min_sz; + size_t max_sz; + + if (!ptime_min || !ptime_max) { + err = EINVAL; + goto out; + } + + min_sz = sz * calc_nsamp(prm.srate, prm.ch, ptime_min); + max_sz = sz * calc_nsamp(prm.srate, prm.ch, ptime_max); + + err = aubuf_alloc(&player->aubuf, min_sz, max_sz); + if (err) { + warning("multicast player: aubuf alloc error (%m)\n", + err); + goto out; + } + + aubuf_set_mode(player->aubuf, cfg->adaptive ? + AUBUF_ADAPTIVE : AUBUF_FIXED); + aubuf_set_silence(player->aubuf, cfg->silence); + } + + err = aufilt_setup(baresip_aufiltl()); + if (err) + { + warning("multicast player: aufilt setup error (%m)\n)", err); + goto out; + } + + err = auplay_alloc(&player->auplay, baresip_auplayl(), player->module, + &prm, player->device, auplay_write_handler, player); + if (err) { + warning("multicast player: start of %s.%s failed (%m)\n", + player->module, player->device, err); + goto out; + } + + player->auplay_prm = prm; + + out: + if (err) + player = mem_deref(player); + + return err; +} + + +/** + * Stop multicast player + */ +void mcplayer_stop(void) +{ + player = mem_deref(player); +} + + +/** + * Fade-out active player + * + */ +void mcplayer_fadeout(void) +{ + if (!player) + return; + + if (player->fades == FM_FADEOUT || player->fades == FM_FADEOUTDONE) + return; + + player->fades = FM_FADEOUT; +} + + +/** + * @return True if the fade-out finished + */ +bool mcplayer_fadeout_done(void) +{ + if (!player) + return false; + + return player->fades == FM_FADEOUTDONE; +} + + +/** + * Fade-in active player + * + * @param restart If true the fade-in restarts with silence level + */ +void mcplayer_fadein(bool restart) +{ + if (!player) + return; + + if (restart) + player->fade_c = 0; + else if (player->fades == FM_FADEINDONE) + return; + + player->fades = FM_FADEIN; +} + +/** + * Initialize everything needed for the player beforhand + * + * @return 0 if success, otherwise errorcode + */ +int mcplayer_init(void) +{ + return 0; +} + +/** + * Terminate everything needed for the player afterwards + * + */ +void mcplayer_terminate(void) +{ + return; +} diff --git a/modules/multicast/receiver.c b/modules/multicast/receiver.c new file mode 100644 index 0000000..6176e31 --- /dev/null +++ b/modules/multicast/receiver.c @@ -0,0 +1,898 @@ +/** + * @file receiver.c + * + * Copyright (C) 2021 Commend.com - c.huber@commend.com + */ + +#include +#include +#include + +#include "multicast.h" + +#define DEBUG_MODULE "mcreceiver" +#define DEBUG_LEVEL 6 +#include + + +struct list mcreceivl = LIST_INIT; +static mtx_t mcreceivl_lock; + + +enum { + TIMEOUT = 1000, +}; + +enum state { + LISTENING, + RECEIVING, + RUNNING, + IGNORED, +}; + +/** + * Multicast receiver struct + * + * Contains data to collect and controll all listeners + */ +struct mcreceiver { + struct le le; + struct sa addr; + uint8_t prio; + + struct udp_sock *rtp; + uint32_t ssrc; + struct jbuf *jbuf; + + const struct aucodec *ac; + + struct tmr timeout; + + enum state state; + bool muted; + bool enable; +}; + + +static void resume_uag_state(void); + + +static char* state_str(enum state s) { + switch (s) { + case LISTENING: + return "listening"; + case RECEIVING: + return "receiving"; + case RUNNING: + return "running"; + case IGNORED: + return "ignored"; + default: + return "???"; + } +} + + +static void mcreceiver_destructor(void *arg) +{ + struct mcreceiver *mcreceiver = arg; + + tmr_cancel(&mcreceiver->timeout); + + if (mcreceiver->state == RUNNING) + mcplayer_stop(); + + mcreceiver->ssrc = 0; + + mcreceiver->rtp = mem_deref(mcreceiver->rtp); + mcreceiver->jbuf = mem_deref(mcreceiver->jbuf); +} + + +/** + * Multicast address comparison + * + * @param le List element (mcreceiver) + * @param arg Argument (address) + * + * @return true if mcreceiver->addr == address + * @return false if mcreceiver->addr != address + */ +static bool mcreceiver_addr_cmp(struct le *le, void *arg) +{ + struct mcreceiver *mcreceiver = le->data; + struct sa *addr = arg; + + return sa_cmp(&mcreceiver->addr, addr, SA_ALL); +} + + +/** + * Multicast receiver priority comparison + * + * @param le List element (mcreceiver) + * @param arg Argument (priority) + * + * @return true if mcreceiver->prio == prio + * @return false if mcreceiver->prio != prio + */ +static bool mcreceiver_prio_cmp(struct le *le, void *arg) +{ + struct mcreceiver *mcreceiver = le->data; + uint32_t *prio = (uint32_t *)arg; + + return mcreceiver->prio == *prio; +} + + +/** + * Get running multicast receiver + * + * @param le Multicast receiver list element + * @param arg Unused + * + * @return true if multicast receiver is running + * @return false if multicast receiver is not running + */ +static bool mcreceiver_running(struct le *le, void *arg) +{ + struct mcreceiver *mcreceiver = le->data; + (void) arg; + + return mcreceiver->state == RUNNING; +} + + +/** + * Convert std rtp codec payload type to audio codec + * + * @param hdr RTP header object + * + * @return struct aucodec* + */ +static const struct aucodec *pt2codec(const struct rtp_header *hdr) +{ + const struct aucodec *codec = NULL; + + switch (hdr->pt) { + case 0: + codec = aucodec_find(baresip_aucodecl(), "PCMU", 0, 1); + break; + + case 8: + codec = aucodec_find(baresip_aucodecl(), "PCMA", 0, 1); + break; + + case 9: + codec = aucodec_find(baresip_aucodecl(), "G722", 0, 1); + break; + + default: + warning ("multicast receiver: RTP Payload " + "Type %d not found.\n", hdr->pt); + break; + } + + return codec; +} + + +/** + * Resume to the pre-multicast uag state if no other high priority + * multicasts are running + */ +static void resume_uag_state(void) +{ + uint8_t h = 255; + struct le *le= NULL; + struct mcreceiver *mcreceiver = NULL; + + for (le = list_head(&mcreceivl); le; le = le->next) { + mcreceiver = le->data; + + if (mcreceiver->state == RUNNING && mcreceiver->prio < h) + h = mcreceiver->prio; + } + + if (h > multicast_callprio()) { + uag_set_dnd(false); + uag_set_nodial(false); + uag_hold_resume(NULL); + } +} + + +/** + * Stops, flush, start player + * + * @param mcreceiver Multicast receiver object + * + * @return int 0 if success, errorcode otherwise + */ +static int player_stop_start(struct mcreceiver *mcreceiver) +{ + mcplayer_fadeout(); + return mcplayer_start(mcreceiver->ac); +} + + +static void mcreceiver_stop(struct mcreceiver *mcreceiver) +{ + mcreceiver->state = RECEIVING; + + module_event("multicast", + "receiver stopped playing", NULL, NULL, + "addr=%J prio=%d enabled=%d state=%s", + &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, + state_str(mcreceiver->state)); + + jbuf_flush(mcreceiver->jbuf); +} + + +/** + * Multicast Priority handling + * + * @param mcreceiver Multicast receiver object + * @param ssrc SSRC of received RTP packet + * + * @return int 0 if success, errorcode otherwise + */ +static int prio_handling(struct mcreceiver *mcreceiver, uint32_t ssrc) +{ + int err = 0; + struct le *le; + struct mcreceiver *hprio = NULL; + + if (!mcreceiver) + return EINVAL; + + err = mtx_trylock(&mcreceivl_lock) != thrd_success; + if (err) + return ENOMEM; + + if (mcreceiver->state == LISTENING) { + mcreceiver->state = RECEIVING; + + info ("multicast receiver: start addr=%J prio=%d enabled=%d " + "state=%s\n", &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, state_str(mcreceiver->state)); + + module_event("multicast", "receiver start", NULL, NULL, + "addr=%J prio=%d enabled=%d state=%s", + &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, state_str(mcreceiver->state)); + + } + + if (!mcreceiver->enable) { + mcreceiver->state = RECEIVING; + err = ECANCELED; + goto out; + } + + if (mcreceiver->state == IGNORED) { + err = ECANCELED; + goto out; + } + + if (mcreceiver->prio >= multicast_callprio() && uag_call_count()) { + if (mcreceiver->state == RUNNING) { + mcreceiver_stop(mcreceiver); + mcplayer_stop(); + } + goto out; + } + else if (mcreceiver->prio < multicast_callprio()) { + struct le *leua; + struct ua *ua; + + uag_set_dnd(true); + uag_set_nodial(true); + + for (leua = list_head(uag_list()); leua; leua = leua->next) { + struct le *lecall; + ua = leua->data; + lecall = list_head(ua_calls(ua)); + while (lecall) { + struct call *call = lecall->data; + lecall = lecall->next; + + if (call_state(call) != + CALL_STATE_ESTABLISHED) { + ua_hangup(ua, call, 0, NULL); + continue; + } + + if (!call_is_onhold(call)) + call_hold(call, true); + } + } + } + + le = list_apply(&mcreceivl, true, mcreceiver_running, NULL); + if (!le) { + err = player_stop_start(mcreceiver); + if (err) + goto out; + + mcreceiver->state = RUNNING; + mcreceiver->ssrc = ssrc; + + info ("multicast receiver: start addr=%J prio=%d enabled=%d " + "state=%s\n", &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, state_str(mcreceiver->state)); + + module_event("multicast", "receiver start", NULL, NULL, + "addr=%J prio=%d enabled=%d state=%s", + &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, state_str(mcreceiver->state)); + + goto out; + } + + hprio = le->data; + if (hprio->prio < mcreceiver->prio) { + goto out; + } + + if (hprio->prio == mcreceiver->prio && mcreceiver->ssrc != ssrc) { + if (hprio->state == IGNORED) + hprio->state = RUNNING; + + mcplayer_fadein(true); + mcreceiver->ssrc = ssrc; + + info ("multicast receiver: restart addr=%J prio=%d enabled=%d " + "state=%s\n", &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, state_str(mcreceiver->state)); + + module_event("multicast", "receiver restart", NULL, NULL, + "addr=%J prio=%d enabled=%d state=%s", + &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, state_str(mcreceiver->state)); + + goto out; + } + else if (hprio->prio == mcreceiver->prio) { + goto out; + } + + err = player_stop_start(mcreceiver); + if (err) + goto out; + + hprio->state = RECEIVING; + jbuf_flush(hprio->jbuf); + mcreceiver->state = RUNNING; + mcreceiver->ssrc = ssrc; + + + info ("multicast receiver: start addr=%J prio=%d enabled=%d " + "state=%s\n", &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, state_str(mcreceiver->state)); + + module_event("multicast", "receiver start", NULL, NULL, + "addr=%J prio=%d enabled=%d state=%s", + &mcreceiver->addr, mcreceiver->prio, mcreceiver->enable, + state_str(mcreceiver->state)); + + out: + mtx_unlock(&mcreceivl_lock); + return err; +} + + +/** + * RTP timeout handler + * + * @param arg Multicast receiver object + */ +static void timeout_handler(void *arg) +{ + struct mcreceiver *mcreceiver = arg; + info ("multicast receiver: EOS addr=%J prio=%d enabled=%d state=%s\n", + &mcreceiver->addr, mcreceiver->prio, mcreceiver->enable, + state_str(mcreceiver->state)); + + module_event("multicast", "receiver EOS", NULL, NULL, + "addr=%J prio=%d enabled=%d state=%s", + &mcreceiver->addr, mcreceiver->prio, mcreceiver->enable, + state_str(mcreceiver->state)); + + mtx_lock(&mcreceivl_lock); + if (mcreceiver->state == RUNNING) { + mcplayer_stop(); + jbuf_flush(mcreceiver->jbuf); + } + + mcreceiver->state = LISTENING; + mcreceiver->muted = false; + mcreceiver->ssrc = 0; + mcreceiver->ac = 0; + resume_uag_state(); + + mtx_unlock(&mcreceivl_lock); + return; +} + + +/** + * Decode RTP packet + * + * @return 0 if success, otherwise errorcode + */ +static int player_decode(struct mcreceiver *mcreceiver) +{ + void *mb = NULL; + struct rtp_header hdr; + int jerr; + int err; + + jerr = jbuf_get(mcreceiver->jbuf, &hdr, &mb); + if (jerr && jerr != EAGAIN) + return jerr; + + err = mcplayer_decode(&hdr, mb, jerr == EAGAIN); + mb = mem_deref(mb); + if (err) + return err; + + return jerr; +} + + +/** + * Handle incoming RTP packages + * + * @param src Source address + * @param hdr RTP headers + * @param mb RTP payload + * @param arg Multicast receiver object + */ +static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, + struct mbuf *mb, void *arg) +{ + int err = 0; + struct mcreceiver *mcreceiver = arg; + + (void) src; + (void) mb; + + mcreceiver->ac = pt2codec(hdr); + if (!mcreceiver->ac) + goto out; + + if (!mbuf_get_left(mb)) + goto out; + + err = prio_handling(mcreceiver, hdr->ssrc); + if (err) + goto out; + + if (mcreceiver->state == RUNNING) { + if (mcreceiver->muted && mcplayer_fadeout_done()) { + mcplayer_stop(); + jbuf_flush(mcreceiver->jbuf); + goto out; + } + + err = jbuf_put(mcreceiver->jbuf, hdr, mb); + if (err) + return; + + if (player_decode(mcreceiver) == EAGAIN) { + (void) player_decode(mcreceiver); + } + } + + out: + tmr_start(&mcreceiver->timeout, TIMEOUT, timeout_handler, mcreceiver); + + return; +} + + +/** + * udp receive handler + * + * @note This is a wrapper function for the RTP receive handler to allow an + * any port number as receiving port. + * RTP socket pointer of 0xdeadbeef is a dummy address. The function rtp_decode + * does nothing on the socket pointer. + * + * @param src src address + * @param mb payload buffer + * @param arg rtp_handler argument + */ +static void rtp_handler_wrapper(const struct sa *src, + struct mbuf *mb, void *arg) +{ + int err = 0; + struct rtp_header hdr; + + err = rtp_decode((struct rtp_sock*)0xdeadbeef, mb, &hdr); + if (err) { + warning("multicast receiver: Decoding of rtp (%m)\n", err); + return; + } + + rtp_handler(src, &hdr, mb, arg); +} + + +/** + * Enable / Disable all mcreceiver with prio > (argument)prio + * + * @param prio Priority + */ +void mcreceiver_enprio(uint32_t prio) +{ + struct le *le; + struct mcreceiver *mcreceiver; + + if (!prio) + return; + + mtx_lock(&mcreceivl_lock); + LIST_FOREACH(&mcreceivl, le) { + mcreceiver = le->data; + + if (mcreceiver->prio <= prio) { + mcreceiver->enable = true; + } + else { + mcreceiver->enable = false; + + if (mcreceiver->state == RUNNING) { + mcreceiver_stop(mcreceiver); + mcplayer_stop(); + } + } + } + + mtx_unlock(&mcreceivl_lock); + resume_uag_state(); +} + + +/** + * Enable / Disable a certain priority range + * + * @param priol Lower priority boundary + * @param prioh Higher priority boundary + * @param en Enable / Disable flag + */ +void mcreceiver_enrangeprio(uint32_t priol, uint32_t prioh, bool en) +{ + struct le *le; + struct mcreceiver *mcreceiver; + + if (!priol || !prioh) + return; + + mtx_lock(&mcreceivl_lock); + LIST_FOREACH(&mcreceivl, le) { + mcreceiver = le->data; + + if (mcreceiver->prio >=priol && mcreceiver->prio <= prioh) { + mcreceiver->enable = en; + + if (mcreceiver->state == RUNNING) { + mcreceiver_stop(mcreceiver); + mcplayer_stop(); + } + } + } + + mtx_unlock(&mcreceivl_lock); + resume_uag_state(); +} + + +/** + * Enable / Disable all multicast receiver + * + * @param enable + */ +void mcreceiver_enable(bool enable) +{ + struct le *le; + struct mcreceiver *mcreceiver; + + mtx_lock(&mcreceivl_lock); + LIST_FOREACH(&mcreceivl, le) { + mcreceiver = le->data; + mcreceiver->enable = enable; + if (mcreceiver->state == RUNNING) + mcreceiver_stop(mcreceiver); + } + + mtx_unlock(&mcreceivl_lock); + mcplayer_stop(); + resume_uag_state(); +} + + +/** + * Change the priority of a multicast receiver + * + * @param addr Listen address + * @param prio Priority + * + * @return int 0 if success, errorcode otherwise + */ +int mcreceiver_chprio(struct sa *addr, uint32_t prio) +{ + struct le *le; + struct mcreceiver *mcreceiver; + + if (!addr || !prio) + return EINVAL; + + le = list_apply(&mcreceivl, true, mcreceiver_addr_cmp, addr); + if (!le) { + warning ("multicast receiver: receiver %J not found\n", addr); + return EINVAL; + } + + if (list_apply(&mcreceivl, true, mcreceiver_prio_cmp, &prio)) { + warning ("multicast receiver: priority %d already in use\n", + prio); + return EADDRINUSE; + } + + mcreceiver = le->data; + mtx_lock(&mcreceivl_lock); + mcreceiver->prio = prio; + mtx_unlock(&mcreceivl_lock); + resume_uag_state(); + return 0; +} + + +/** + * Search and set the ignore flag of the given priority multicast receiver + * + * @param prio Priority + * + * @return int 0 if success, errorcode otherwise + */ +int mcreceiver_prioignore(uint32_t prio) +{ + struct le *le; + struct mcreceiver *mcreceiver; + int err = 0; + + if (!prio) + return EINVAL; + + le = list_apply(&mcreceivl, true, mcreceiver_prio_cmp, &prio); + if (!le) { + warning ("multicast receiver: priority %d not found\n", prio); + return EINVAL; + } + + mcreceiver = le->data; + if (mcreceiver->state == IGNORED) + return 0; + + mtx_lock(&mcreceivl_lock); + switch (mcreceiver->state) { + case RUNNING: + mcreceiver->state = IGNORED; + mcplayer_stop(); + jbuf_flush(mcreceiver->jbuf); + break; + case RECEIVING: + mcreceiver->state = IGNORED; + break; + default: + err = EPERM; + warning ("multicast receiver: priority %d not" + " running or receiving(%m)\n", prio, err); + break; + } + + mtx_unlock(&mcreceivl_lock); + resume_uag_state(); + return err; +} + + +/** + * Toggle mute flag of the given priority multicast receiver + * + * @param prio Priority + * + * @return int 0 if success, errorcode otherwise + */ +int mcreceiver_mute(uint32_t prio) +{ + struct le *le; + struct mcreceiver *mcreceiver; + int err = 0; + + if (!prio) + return EINVAL; + + le = list_apply(&mcreceivl, true, mcreceiver_prio_cmp, &prio); + if (!le) { + warning ("multicast receiver: priority %d not found\n", prio); + return EINVAL; + } + + mcreceiver = le->data; + mtx_lock(&mcreceivl_lock); + mcreceiver->muted = !mcreceiver->muted; + if (mcreceiver->state == RUNNING) { + if (mcreceiver->muted) { + mcplayer_fadeout(); + } + else { + mcplayer_fadein(false); + err = mcplayer_start(mcreceiver->ac); + if (err == EINPROGRESS) + err = 0; + } + } + mtx_unlock(&mcreceivl_lock); + return err; +} + + +/** + * Un-register all multicast listener + */ +void mcreceiver_unregall(void) +{ + mtx_lock(&mcreceivl_lock); + list_flush(&mcreceivl); + mtx_unlock(&mcreceivl_lock); + resume_uag_state(); + mtx_destroy(&mcreceivl_lock); +} + + +/** + * Un-register a multicast listener + * + * @param addr Listen address + */ +void mcreceiver_unreg(struct sa *addr){ + struct mcreceiver *mcreceiver = NULL; + struct le *le; + + le = list_apply(&mcreceivl, true, mcreceiver_addr_cmp, addr); + if (!le) { + warning ("multicast: multicast receiver %J not found\n", addr); + return; + } + + mcreceiver = le->data; + mtx_lock(&mcreceivl_lock); + list_unlink(&mcreceiver->le); + mtx_unlock(&mcreceivl_lock); + mem_deref(mcreceiver); + resume_uag_state(); + + if (list_isempty(&mcreceivl)) + mtx_destroy(&mcreceivl_lock); +} + + +/** + * Allocate a new multicast receiver object + * + * @param addr Listen address + * @param prio Listener priority + * + * @return int 0 if success, errorcode otherwise + */ +int mcreceiver_alloc(struct sa *addr, uint8_t prio) +{ + int err = 0; + uint16_t port; + struct mcreceiver *mcreceiver = NULL; + struct config_avt *cfg = &conf_config()->avt; + struct range jbuf_del; + enum jbuf_type jbtype; + struct pl pl; + + if (!addr || !prio) + return EINVAL; + + if (list_apply(&mcreceivl, true, mcreceiver_addr_cmp, addr)) { + warning ("multicast receiver: address %J already in use\n", + addr); + return EADDRINUSE; + } + + if (list_apply(&mcreceivl, true, mcreceiver_prio_cmp, &prio)) { + warning ("multicast receiver: priority %d already in use\n", + prio); + return EADDRINUSE; + } + + mcreceiver = mem_zalloc(sizeof(*mcreceiver), mcreceiver_destructor); + if (!mcreceiver) + return ENOMEM; + + if (list_isempty(&mcreceivl)) { + err = mtx_init(&mcreceivl_lock, mtx_plain) != thrd_success; + if (err) { + err = ENOMEM; + goto out; + } + } + + sa_cpy(&mcreceiver->addr, addr); + port = sa_port(&mcreceiver->addr); + mcreceiver->prio = prio; + + mcreceiver->enable = true; + mcreceiver->muted = false; + mcreceiver->state = LISTENING; + + jbuf_del = cfg->audio.jbuf_del; + jbtype = cfg->audio.jbtype; + (void)conf_get_range(conf_cur(), "multicast_jbuf_delay", &jbuf_del); + if (0 == conf_get(conf_cur(), "multicast_jbuf_type", &pl)) + jbtype = conf_get_jbuf_type(&pl); + + err = jbuf_alloc(&mcreceiver->jbuf, jbuf_del.min, jbuf_del.max); + err |= jbuf_set_type(mcreceiver->jbuf, jbtype); + if (err) + goto out; + + err = udp_listen(&mcreceiver->rtp, &mcreceiver->addr, + rtp_handler_wrapper, mcreceiver); + if (err) { + warning("multicast receiver: udp listen failed:" + "af=%s port=%u-%u (%m)\n", net_af2name(sa_af(addr)), + port, port + 1, err); + goto out; + } + + if (IN_MULTICAST(sa_in(&mcreceiver->addr))) { + err = udp_multicast_join((struct udp_sock *) + mcreceiver->rtp, &mcreceiver->addr); + if (err) { + warning ("multicast recevier: join multicast group " + "failed %J (%m)\n", &mcreceiver->addr, err); + goto out; + } + } + + mtx_lock(&mcreceivl_lock); + list_append(&mcreceivl, &mcreceiver->le, mcreceiver); + mtx_unlock(&mcreceivl_lock); + + out: + if (err) + mem_deref(mcreceiver); + + return err; +} + + +/** + * Print all available multicast receiver + * + * @param pf Printer + */ +void mcreceiver_print(struct re_printf *pf) +{ + struct le *le = NULL; + struct mcreceiver *mcreceiver = NULL; + + re_hprintf(pf, "Multicast Receiver List:\n"); + LIST_FOREACH(&mcreceivl, le) { + mcreceiver = le->data; + re_hprintf(pf, " addr=%J prio=%d enabled=%d muted=%d " + "state=%s\n", &mcreceiver->addr, mcreceiver->prio, + mcreceiver->enable, mcreceiver->muted, + state_str(mcreceiver->state)); + } +} diff --git a/modules/multicast/sender.c b/modules/multicast/sender.c new file mode 100644 index 0000000..b30b6e9 --- /dev/null +++ b/modules/multicast/sender.c @@ -0,0 +1,222 @@ +/** + * @file sender.c + * + * Copyright (C) 2021 Commend.com - c.huber@commend.com + */ + +#include +#include +#include + +#include + +#include "multicast.h" + +#define DEBUG_MODULE "mcsend" +#define DEBUG_LEVEL 6 +#include + + +static struct list mcsenderl = LIST_INIT; + + +/** + * Multicast sender struct + * + * Contains data to send audio stream to the network + */ +struct mcsender { + struct le le; + + struct sa addr; + struct rtp_sock *rtp; + + struct config_audio *cfg; + const struct aucodec *ac; + + struct mcsource *src; + bool enable; +}; + + +static void mcsender_destructor(void *arg) +{ + struct mcsender *mcsender = arg; + + mcsource_stop(mcsender->src); + mcsender->src = mem_deref(mcsender->src); + mcsender->rtp = mem_deref(mcsender->rtp); +} + + +/** + * Multicast address comparison + * + * @param le List element (mcsender) + * @param arg Argument (address) + * + * @return true if mcsender->addr == address + * @return false if mcsender->addr != address + */ +static bool mcsender_addr_cmp(struct le *le, void *arg) +{ + struct mcsender *mcsender = le->data; + struct sa *addr = arg; + + return sa_cmp(&mcsender->addr, addr, SA_ALL); +} + + +/** + * Multicast send handler + * + * @param ext_len RTP extension header Length + * @param marker RTP marker + * @param mb Data to send + * + * @return 0 if success, otherwise errorcode + */ +static int mcsender_send_handler(size_t ext_len, bool marker, + uint32_t rtp_ts, struct mbuf *mb, void *arg) +{ + struct mcsender *mcsender = arg; + struct pl placpt = PL_INIT; + int err = 0; + + if (!mb) + return EINVAL; + + if (!mcsender->enable) + return 0; + + if (uag_call_count()) + return 0; + + pl_set_str(&placpt, mcsender->ac->pt); + err = rtp_send(mcsender->rtp, &mcsender->addr, ext_len != 0, marker, + pl_u32(&placpt), rtp_ts, tmr_jiffies_rt_usec(), mb); + + return err; +} + + +/** + * Enable / Disable all existing sender + * + * @param enable + */ +void mcsender_enable(bool enable) +{ + struct le *le; + struct mcsender *mcsender; + + LIST_FOREACH(&mcsenderl, le) { + mcsender = le->data; + mcsender->enable = enable; + } +} + + +/** + * Stop all existing multicast sender + */ +void mcsender_stopall(void) +{ + list_flush(&mcsenderl); +} + + +/** + * Stop the multicast sender with addr + * + * @param addr Address + */ +void mcsender_stop(struct sa *addr) +{ + struct mcsender *mcsender = NULL; + struct le *le; + + le = list_apply(&mcsenderl, true, mcsender_addr_cmp, addr); + if (!le) { + warning ("multicast: multicast sender %J not found\n", addr); + return; + } + + mcsender = le->data; + list_unlink(&mcsender->le); + mem_deref(mcsender); +} + + +/** + * Allocate a new multicast sender object + * + * @param addr Destination address + * @param codec Used audio codec + * + * @return 0 if success, otherwise errorcode + */ +int mcsender_alloc(struct sa *addr, const struct aucodec *codec) +{ + int err = 0; + struct mcsender *mcsender = NULL; + uint8_t ttl = multicast_ttl(); + + if (!addr || !codec) + return EINVAL; + + if (list_apply(&mcsenderl, true, mcsender_addr_cmp, addr)) + return EADDRINUSE; + + + mcsender = mem_zalloc(sizeof(*mcsender), mcsender_destructor); + if (!mcsender) + return ENOMEM; + + sa_cpy(&mcsender->addr, addr); + mcsender->ac = codec; + mcsender->enable = true; + + err = rtp_open(&mcsender->rtp, sa_af(&mcsender->addr)); + if (err) + goto out; + + if (ttl > 1) { + struct udp_sock *sock; + + sock = (struct udp_sock *) rtp_sock(mcsender->rtp); + udp_setsockopt(sock, IPPROTO_IP, + IP_MULTICAST_TTL, &ttl, sizeof(ttl)); + } + + err = mcsource_start(&mcsender->src, mcsender->ac, + mcsender_send_handler, mcsender); + + list_append(&mcsenderl, &mcsender->le, mcsender); + + out: + if (err) + mem_deref(mcsender); + + return err; +} + + +/** + * Print all available multicast sender + * + * @param pf Printer + */ +void mcsender_print(struct re_printf *pf) +{ + struct le *le = NULL; + struct mcsender *mcsender = NULL; + + re_hprintf(pf, "Multicast Sender List:\n"); + LIST_FOREACH(&mcsenderl, le) { + mcsender = le->data; + re_hprintf(pf, " %J - %s%s\n", &mcsender->addr, + mcsender->ac->name, + mcsender->enable ? " (enabled)" : " (disabled)"); + } +} diff --git a/modules/multicast/source.c b/modules/multicast/source.c new file mode 100644 index 0000000..533d38e --- /dev/null +++ b/modules/multicast/source.c @@ -0,0 +1,607 @@ +/** + * @file source.c + * + * Copyright (C) 2021 Commend.com - c.huber@commend.com + */ + +#include +#include +#include +#include + +#include + +#include "multicast.h" + +#define DEBUG_MODULE "mcsource" +#define DEBUG_LEVEL 6 +#include + + +/** + * Multicast source struct + * + * Contains configuration of the audio source and buffer for the audio data + */ +struct mcsource { + struct config_audio *cfg; + struct ausrc_st *ausrc; + struct ausrc_prm ausrc_prm; + const struct aucodec *ac; + struct auenc_state *enc; + enum aufmt src_fmt; + enum aufmt enc_fmt; + + void *sampv; + struct aubuf *aubuf; + size_t aubuf_maxsz; + volatile bool aubuf_started; + struct auresamp resamp; + int16_t *sampv_rs; + struct list filtl; + + struct mbuf *mb; + uint32_t ptime; + uint64_t ts_ext; + uint32_t ts_base; + size_t psize; + bool marker; + + char *module; + char *device; + + mcsender_send_h *sendh; + void *arg; + + struct { + thrd_t tid; + RE_ATOMIC bool run; + } thr; +}; + + +static void mcsource_destructor(void *arg) +{ + struct mcsource *src = arg; + + switch (src->cfg->txmode) { + case AUDIO_MODE_THREAD: + if (re_atomic_rlx(&src->thr.run)) { + re_atomic_rlx_set(&src->thr.run, false); + thrd_join(src->thr.tid, NULL); + } + default: + break; + } + + src->ausrc = mem_deref(src->ausrc); + src->aubuf = mem_deref(src->aubuf); + list_flush(&src->filtl); + + src->enc = mem_deref(src->enc); + src->mb = mem_deref(src->mb); + src->sampv = mem_deref(src->sampv); + src->sampv_rs = mem_deref(src->sampv_rs); + + src->module = mem_deref(src->module); + src->device = mem_deref(src->device); +} + + +/** + * Encode and send audio data via multicast send handler of src + * + * @note This function has REAL-TIME properties + * + * @param src Multicast source object + * @param sampv Samplebuffer + * @param sampc Samplecounter + */ +static void encode_rtp_send(struct mcsource *src, uint16_t *sampv, + size_t sampc) +{ + size_t frame_size; + size_t sampc_rtp; + size_t len; + + size_t ext_len = 0; + uint32_t ts_delta = 0; + int err = 0; + + if (!src->ac || !src->ac->ench) + return; + + src->mb->pos = src->mb->end = STREAM_PRESZ; + + len = mbuf_get_space(src->mb); + err = src->ac->ench(src->enc, &src->marker, mbuf_buf(src->mb), &len, + src->enc_fmt, sampv, sampc); + + if ((err & 0xffff0000) == 0x00010000) { + ts_delta = err & 0xffff; + sampc = 0; + } + else if (err) { + warning ("multicast send: &s encode error: &d samples (%m)\n", + src->ac->name, sampc, err); + goto out; + } + + src->mb->pos = STREAM_PRESZ; + src->mb->end = STREAM_PRESZ + ext_len + len; + + if (mbuf_get_left(src->mb)) { + uint32_t rtp_ts = src->ts_ext & 0xffffffff; + + if (len) { + err = src->sendh(ext_len, src->marker, + rtp_ts, src->mb, src->arg); + if (err) + goto out; + } + + if (ts_delta) { + src->ts_ext += ts_delta; + goto out; + } + } + + sampc_rtp = sampc * src->ac->crate / src->ac->srate; + frame_size = sampc_rtp / src->ac->ch; + src->ts_ext += (uint32_t) frame_size; + + out: + src->marker = false; +} + + +/** + * Poll timed read from audio buffer + * + * @note This function has REAL-TIME properties + * + * @param src Multicast source object + */ +static void poll_aubuf_tx(struct mcsource *src) +{ + struct auframe af; + int16_t *sampv = src->sampv; + size_t sampc; + size_t sz; + size_t num_bytes; + struct le *le; + uint32_t srate; + uint8_t ch; + int err = 0; + + sz = aufmt_sample_size(src->src_fmt); + if (!sz) + return; + + num_bytes = src->psize; + sampc = num_bytes / sz; + + if (src->src_fmt == AUFMT_S16LE) { + aubuf_read(src->aubuf, (uint8_t *)sampv, num_bytes); + } + else if (src->enc_fmt == AUFMT_S16LE) { + void *tmp_sampv = NULL; + + tmp_sampv = mem_zalloc(num_bytes, NULL); + if (!tmp_sampv) + return; + + aubuf_read(src->aubuf, tmp_sampv, num_bytes); + auconv_to_s16(sampv, src->src_fmt, tmp_sampv, sampc); + mem_deref(tmp_sampv); + } + else { + warning("multicast send: invalid sample formats (%s -> %s)\n", + aufmt_name(src->src_fmt), + aufmt_name(src->enc_fmt)); + } + + if (src->resamp.resample) { + size_t sampc_rs = AUDIO_SAMPSZ; + + if (src->enc_fmt != AUFMT_S16LE) { + warning("multicast send: skipping resampler due to" + " incompatible format (%s)\n", + aufmt_name(src->enc_fmt)); + return; + } + + err = auresamp(&src->resamp, src->sampv_rs, &sampc_rs, + src->sampv, sampc); + if (err) + return; + + sampv = src->sampv_rs; + sampc = sampc_rs; + } + + if (src->resamp.resample) { + srate = src->resamp.irate; + ch = src->resamp.ich; + } + else { + srate = src->ausrc_prm.srate; + ch = src->ausrc_prm.ch; + } + + auframe_init(&af, src->enc_fmt, sampv, sampc, srate, ch); + + /* process exactly one audio-frame in list order */ + for (le = src->filtl.head; le; le = le->next) { + struct aufilt_enc_st * st = le->data; + + if (st->af && st->af->ench) + err |= st->af->ench(st, &af); + } + + if (err) + warning("multicast source: aufilter encode (%m)\n", err); + + encode_rtp_send(src, af.sampv, af.sampc); +} + + +/** + * Audio source error handler + * + * @param err Error code + * @param str Error string + * @param arg Multicast source object + */ +static void ausrc_error_handler(int err, const char *str, void *arg) +{ + (void) err; + (void) str; + (void) arg; +} + + +/** + * Audio source read handler + * + * @note This function has REAL-TIME properties + * + * @param af Audio frame + * @param arg Multicast source object + */ +static void ausrc_read_handler(struct auframe *af, void *arg) +{ + struct mcsource *src = arg; + size_t num_bytes = auframe_size(af); + + if (src->src_fmt != af->fmt) { + warning ("multicast source: ausrc format mismatch: " + "expected=%d(%s), actual=%d(%s)\n", + src->src_fmt, aufmt_name(src->src_fmt), + af->fmt, aufmt_name(af->fmt)); + return; + } + + (void) aubuf_write(src->aubuf, af->sampv, num_bytes); + src->aubuf_started = true; + + if (src->cfg->txmode == AUDIO_MODE_POLL) { + unsigned i; + + for (i = 0; i < 16; i++) { + if (aubuf_cur_size(src->aubuf) < src->psize) + break; + + poll_aubuf_tx(src); + } + } +} + + +/** + * Standalone transmitter thread function + * + * @param arg Multicast source object + * + * @return NULL + */ +static int tx_thread(void *arg) +{ + struct mcsource *src = arg; + uint64_t ts = 0; + + while (re_atomic_rlx(&src->thr.run)) { + uint64_t now; + sys_msleep(4); + + if (!src->aubuf_started) + continue; + + if (!re_atomic_rlx(&src->thr.run)) + break; + + now = tmr_jiffies(); + if (!ts) + ts = now; + + if (ts > now) + continue; + + if (aubuf_cur_size(src->aubuf) >= src->psize) + poll_aubuf_tx(src); + + ts += src->ptime; + } + + return 0; +} + + +/** + * Start audio source + * + * @param src Multicast source object + * + * @return 0 if success, otherwise errorcode + */ +static int start_source(struct mcsource *src) +{ + int err = 0; + uint32_t srate_dsp; + uint32_t channels_dsp; + bool resamp = false; + + if (!src) + return EINVAL; + + srate_dsp = src->ac->srate; + channels_dsp = src->ac->ch; + + if (src->cfg->srate_src && src->cfg->srate_src != srate_dsp) { + resamp = true; + srate_dsp = src->cfg->srate_src; + } + if (src->cfg->channels_src && src->cfg->channels_src != channels_dsp) { + resamp = true; + channels_dsp = src->cfg->channels_src; + } + + if (resamp && src->sampv_rs) { + src->sampv_rs = mem_zalloc( + AUDIO_SAMPSZ * sizeof(int16_t), NULL); + if (!src->sampv_rs) + return ENOMEM; + + err = auresamp_setup(&src->resamp, srate_dsp, channels_dsp, + src->ac->srate, src->ac->ch); + if (err) { + warning ("multicast source: could not setup ausrc " + "resample (%m)\n", err); + return err; + } + } + + if (!src->ausrc && ausrc_find(baresip_ausrcl(), NULL)) { + struct ausrc_prm prm; + size_t sz; + + prm.srate = srate_dsp; + prm.ch = channels_dsp; + prm.ptime = src->ptime; + prm.fmt = src->src_fmt; + + sz = aufmt_sample_size(src->src_fmt); + src->psize = sz * (prm.srate * prm.ch * prm.ptime / 1000); + src->aubuf_maxsz = src->psize * 30; + if (!src->aubuf) { + err = aubuf_alloc(&src->aubuf, src->psize, + src->aubuf_maxsz); + if (err) + return err; + } + + err = ausrc_alloc(&src->ausrc, baresip_ausrcl(), + src->module, &prm, src->device, + ausrc_read_handler, ausrc_error_handler, src); + if (err) { + warning ("multicast source: start_source faild (%s-%s)" + " (%m)\n", src->module, src->device, err); + return err; + } + + switch (src->cfg->txmode) { + case AUDIO_MODE_POLL: + break; + case AUDIO_MODE_THREAD: + if (!re_atomic_rlx(&src->thr.run)) { + re_atomic_rlx_set(&src->thr.run, true); + err = thread_create_name(&src->thr.tid, + "multicast", tx_thread, src); + if (err) { + re_atomic_rlx_set( + &src->thr.run, false); + return err; + } + } + break; + + default: + warning ("multicast source: tx mode " + "not supported (%d)\n", + src->cfg->txmode); + return ENOTSUP; + } + + src->ausrc_prm = prm; + info ("multicast source: source started with sample format " + "%s\n", aufmt_name(src->src_fmt)); + } + + return err; +} + + +/** + * Setup all available audio filter for the encoder + * + * @param src Multicast source object + * @param aufiltl List of audio filter + * + * @return 0 if success, otherwise errorcode + */ +static int aufilt_setup(struct mcsource *src, struct list *aufiltl) +{ + struct aufilt_prm prm; + struct le *le; + int err = 0; + + if (!src->ac) + return 0; + + if (!list_isempty(&src->filtl)) + return 0; + + prm.srate = src->ac->srate; + prm.ch = src->ac->ch; + prm.fmt = src->enc_fmt; + + for (le = list_head(aufiltl); le; le = le->next) { + struct aufilt *af = le->data; + struct aufilt_enc_st *encst = NULL; + void *ctx = NULL; + + if (af->encupdh) { + err = af->encupdh(&encst, &ctx, af, &prm, NULL); + if (err) { + warning("multicast source: erro in encoder" + "autio-filter '%s' (%m)\n", + af->name, err); + } + else { + encst->af = af; + list_append(&src->filtl, &encst->le, + encst); + } + } + + if (err) { + warning("multicast source: audio-filter '%s' " + "update failed (%m)\n", af->name, err); + break; + } + } + + return err; +} + + +/** + * Start multicast source + * + * @param srcp Multicast source ptr + * @param ac Audio codec + * @param sendh Send handler ptr + * @param arg Send handler Argument + * + * @return 0 if success, otherwise errorcode + */ +int mcsource_start(struct mcsource **srcp, const struct aucodec *ac, + mcsender_send_h *sendh, void *arg) +{ + int err = 0; + struct mcsource *src = NULL; + struct config_audio *cfg = &conf_config()->audio; + + if (!srcp || !ac) + return EINVAL; + + src = mem_zalloc(sizeof(*src), mcsource_destructor); + if (!src) + return ENOMEM; + + src->cfg = cfg; + src->sendh = sendh; + src->arg = arg; + + src->src_fmt = cfg->src_fmt; + src->enc_fmt = cfg->enc_fmt; + src->mb = mbuf_alloc(STREAM_PRESZ + 4096); + src->sampv = mem_zalloc( + AUDIO_SAMPSZ * aufmt_sample_size(src->enc_fmt), NULL); + if (!src->mb || !src->sampv) { + err = ENOMEM; + goto out; + } + + auresamp_init(&src->resamp); + src->ptime = PTIME; + src->ts_ext = src->ts_base = rand_u16(); + src->marker = true; + + err = str_dup(&src->module, cfg->src_mod); + err |= str_dup(&src->device, cfg->src_dev); + if (err) + goto out; + + src->ac = ac; + if (src->ac->encupdh) { + struct auenc_param prm; + + prm.bitrate = 0; + + err = src->ac->encupdh(&src->enc, src->ac, &prm, NULL); + if (err) { + warning ("multicast source: alloc encoder (%m)\n", + err); + goto out; + } + } + + err = aufilt_setup(src, baresip_aufiltl()); + if (err) + goto out; + + err = start_source(src); + if (err) + goto out; + + out: + if (err) + mem_deref(src); + else + *srcp = src; + + return err; +} + + +/** + * Stop one multicast source. + * + * @param unused Multicast audio source object + */ +void mcsource_stop(struct mcsource *unused) +{ + (void) unused; +} + + +/** + * Initialize everything needed for the source beforhand + * + * @return 0 if success, otherwise errorcode + */ +int mcsource_init(void) +{ + return 0; +} + + +/** + * Terminate everything needed for the source afterwards + * + */ +void mcsource_terminate(void) +{ + return; +}