From 0ac8c32716f04bc6f28c556396b8092027b02c2e Mon Sep 17 00:00:00 2001 From: Martin Pulec Date: Tue, 26 Nov 2024 15:43:11 +0100 Subject: [PATCH] aplay/mixer: allow selection of single participant Allow selection of a single participant to be sent back with a control socket (docuented in wiki). --- src/audio/playback/mixer.cpp | 72 +++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/src/audio/playback/mixer.cpp b/src/audio/playback/mixer.cpp index 65875dea3..c178e603f 100644 --- a/src/audio/playback/mixer.cpp +++ b/src/audio/playback/mixer.cpp @@ -60,11 +60,13 @@ #include "debug.h" #include "host.h" // for get_commandline_param, uv_argv #include "lib_common.h" +#include "messaging.h" #include "module.h" #include "rtp/rtp.h" #include "transmit.h" #include "types.h" // for tx_media_type #include "utils/audio_buffer.h" +#include "utils/macros.h" #include "utils/net.h" // for get_sockaddr_addr_str #include "utils/thread.h" @@ -255,6 +257,8 @@ struct state_audio_mixer final { state_audio_mixer(const struct audio_playback_opts *opts) { parse_opts(opts); + only_sender.ss_family = AF_UNSPEC; + struct audio_codec_state *audio_coder = audio_codec_init_cfg(audio_codec.c_str(), AUDIO_CODER); if (!audio_coder) { @@ -264,26 +268,83 @@ struct state_audio_mixer final { audio_codec_done(audio_coder); } + module_init_default(&mod); + mod.cls = MODULE_CLASS_DATA; + module_register(&mod, opts->parent); + thread_id = thread(&state_audio_mixer::worker, this); } ~state_audio_mixer() { thread_id.join(); + module_done(&mod); } bool should_exit = false; state_audio_mixer(state_audio_mixer const&) = delete; state_audio_mixer& operator=(state_audio_mixer const&) = delete; void worker(); + void check_messages(); map participants; mutex participants_lock; struct socket_udp_local *recv_socket{}; string audio_codec{"PCM"}; + sockaddr_storage + only_sender; ///< if !AF_UNSPEC, use stream just from this sender private: + struct module mod; thread thread_id; unique_ptr> mixing_algorithm{new linear_mix_algo()}; }; +void +state_audio_mixer::check_messages() +{ + struct message *msg = nullptr; + while ((msg = check_message(&mod))) { + auto *msg_univ = reinterpret_cast(msg); + MSG(VERBOSE, "Received message: %s\n", msg_univ->text); + if (strcmp(msg_univ->text, "help") == 0) { + printf("Syntax:\n" + "\trestrict \n" + "\trestrict flush\n" + "eg.:\n" + "\trestrict [::ffff:10.0.1.20]:65426\n"); + free_message(msg, new_response(RESPONSE_OK, nullptr)); + continue; + } + if (strstr(msg_univ->text, "restrict ") != msg_univ->text) { + MSG(ERROR, + "Unknown message: %s!\nSend message \"help\" for " + "syntax.\n", + msg_univ->text); + char resp_msg[sizeof msg_univ->text + 20]; + snprintf_ch(resp_msg, "unknown request: %s", + msg_univ->text); + free_message( + msg, new_response(RESPONSE_BAD_REQUEST, resp_msg)); + continue; + } + const char *val = msg_univ->text + strlen("restrict "); + if (strcmp(val, "flush") == 0) { + MSG(INFO, "flushing the address restriction (defaulting to mix all)\n"); + only_sender.ss_family = AF_UNSPEC; + } else { + MSG(INFO, "restricting mixer to: %s\n", val); + only_sender = get_sockaddr(val, 0); + if (participants.find(only_sender) == + participants.end()) { + MSG(WARNING, + "The requested participant %s is not yet " + "present...\n", + val); + } + } + + free_message(msg, new_response(RESPONSE_OK, nullptr)); + } +} + void state_audio_mixer::worker() { set_thread_name(__func__); @@ -304,6 +365,7 @@ void state_audio_mixer::worker() } unique_lock plk(participants_lock); + check_messages(); // check timeouts for (auto it = participants.cbegin(); it != participants.cend(); ) { @@ -433,8 +495,16 @@ static void audio_play_mixer_put_frame(void *state, const struct audio_frame *fr s->participants.emplace(ss, am_participant{s->recv_socket, &ss, s->audio_codec}); } - audio_buffer_write(s->participants.at(ss).m_buffer, frame->data, frame->data_len); s->participants.at(ss).last_seen = chrono::steady_clock::now(); + + // if mixer restricted to a single sender and this isn't me + if (s->only_sender.ss_family != AF_UNSPEC && + sockaddr_compare((const sockaddr *) &ss, + (const sockaddr *) &s->only_sender) != 0) { + return; + } + + audio_buffer_write(s->participants.at(ss).m_buffer, frame->data, frame->data_len); } static void audio_play_mixer_done(void *state)