Skip to content

Commit

Permalink
aplay/mixer: allow selection of single participant
Browse files Browse the repository at this point in the history
Allow selection of a single participant to be sent back with a control
socket (docuented in wiki).
  • Loading branch information
MartinPulec committed Nov 28, 2024
1 parent d90d283 commit 0ac8c32
Showing 1 changed file with 71 additions and 1 deletion.
72 changes: 71 additions & 1 deletion src/audio/playback/mixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@
#include "debug.h"
#include "host.h" // for get_commandline_param, uv_argv
#include "lib_common.h"
#include "messaging.h"
#include "module.h"
#include "rtp/rtp.h"
#include "transmit.h"
#include "types.h" // for tx_media_type
#include "utils/audio_buffer.h"
#include "utils/macros.h"
#include "utils/net.h" // for get_sockaddr_addr_str
#include "utils/thread.h"

Expand Down Expand Up @@ -255,6 +257,8 @@ struct state_audio_mixer final {
state_audio_mixer(const struct audio_playback_opts *opts) {
parse_opts(opts);

only_sender.ss_family = AF_UNSPEC;

struct audio_codec_state *audio_coder =
audio_codec_init_cfg(audio_codec.c_str(), AUDIO_CODER);
if (!audio_coder) {
Expand All @@ -264,26 +268,83 @@ struct state_audio_mixer final {
audio_codec_done(audio_coder);
}

module_init_default(&mod);
mod.cls = MODULE_CLASS_DATA;
module_register(&mod, opts->parent);

thread_id = thread(&state_audio_mixer::worker, this);
}
~state_audio_mixer() {
thread_id.join();
module_done(&mod);
}
bool should_exit = false;
state_audio_mixer(state_audio_mixer const&) = delete;
state_audio_mixer& operator=(state_audio_mixer const&) = delete;
void worker();
void check_messages();

map<sockaddr_storage, am_participant, sockaddr_storage_less> participants;
mutex participants_lock;

struct socket_udp_local *recv_socket{};
string audio_codec{"PCM"};
sockaddr_storage
only_sender; ///< if !AF_UNSPEC, use stream just from this sender
private:
struct module mod;
thread thread_id;
unique_ptr<generic_mix_algo<sample_type_source, sample_type_mixed>> mixing_algorithm{new linear_mix_algo<sample_type_source, sample_type_mixed>()};
};

void
state_audio_mixer::check_messages()
{
struct message *msg = nullptr;
while ((msg = check_message(&mod))) {
auto *msg_univ = reinterpret_cast<struct msg_universal *>(msg);
MSG(VERBOSE, "Received message: %s\n", msg_univ->text);
if (strcmp(msg_univ->text, "help") == 0) {
printf("Syntax:\n"
"\trestrict <addr>\n"
"\trestrict flush\n"
"eg.:\n"
"\trestrict [::ffff:10.0.1.20]:65426\n");
free_message(msg, new_response(RESPONSE_OK, nullptr));
continue;
}
if (strstr(msg_univ->text, "restrict ") != msg_univ->text) {
MSG(ERROR,
"Unknown message: %s!\nSend message \"help\" for "
"syntax.\n",
msg_univ->text);
char resp_msg[sizeof msg_univ->text + 20];
snprintf_ch(resp_msg, "unknown request: %s",
msg_univ->text);
free_message(
msg, new_response(RESPONSE_BAD_REQUEST, resp_msg));
continue;
}
const char *val = msg_univ->text + strlen("restrict ");
if (strcmp(val, "flush") == 0) {
MSG(INFO, "flushing the address restriction (defaulting to mix all)\n");
only_sender.ss_family = AF_UNSPEC;
} else {
MSG(INFO, "restricting mixer to: %s\n", val);
only_sender = get_sockaddr(val, 0);
if (participants.find(only_sender) ==
participants.end()) {
MSG(WARNING,
"The requested participant %s is not yet "
"present...\n",
val);
}
}

free_message(msg, new_response(RESPONSE_OK, nullptr));
}
}

void state_audio_mixer::worker()
{
set_thread_name(__func__);
Expand All @@ -304,6 +365,7 @@ void state_audio_mixer::worker()
}

unique_lock<mutex> plk(participants_lock);
check_messages();
// check timeouts
for (auto it = participants.cbegin(); it != participants.cend(); )
{
Expand Down Expand Up @@ -433,8 +495,16 @@ static void audio_play_mixer_put_frame(void *state, const struct audio_frame *fr
s->participants.emplace(ss, am_participant{s->recv_socket, &ss, s->audio_codec});
}

audio_buffer_write(s->participants.at(ss).m_buffer, frame->data, frame->data_len);
s->participants.at(ss).last_seen = chrono::steady_clock::now();

// if mixer restricted to a single sender and this isn't me
if (s->only_sender.ss_family != AF_UNSPEC &&
sockaddr_compare((const sockaddr *) &ss,
(const sockaddr *) &s->only_sender) != 0) {
return;
}

audio_buffer_write(s->participants.at(ss).m_buffer, frame->data, frame->data_len);
}

static void audio_play_mixer_done(void *state)
Expand Down

0 comments on commit 0ac8c32

Please sign in to comment.