Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/pr/211'
Browse files Browse the repository at this point in the history
* origin/pr/211:
  Watch qdb entry being set and modified on start or while playing
  • Loading branch information
marmarek committed Apr 26, 2024
2 parents 2922778 + 29fc016 commit 9becd06
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 64 deletions.
1 change: 1 addition & 0 deletions archlinux/PKGBUILD.in
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ makedepends=(
xorg-util-macros
libxcomposite
libxt
libxdamage
pixman
lsb-release
qubes-vm-gui-common
Expand Down
225 changes: 161 additions & 64 deletions pipewire/qubes-pw-module.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,31 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
"[ stream.sink.props=<properties> ] " \
"[ stream.source.props=<properties> ] "

// FIXME: this should be a Qubes-wide domID parsing function
static int parse_number(const char *const str,
size_t max_value, size_t *res, const char *const msg)
{
char *endptr = (void *)1;
errno = *res = 0;
unsigned long long value = strtoull(str, &endptr, 0);
if (errno) {
int i = errno;
pw_log_error("Invalid %s \"%s\": %m", msg, str);
return -i;
} else if (*endptr) {
pw_log_error("Invalid %s \"%s\": trailing junk (\"%s\")",
msg, str, endptr);
return -EINVAL;
} else if (value > max_value) {
pw_log_error("Invalid %s \"%s\": exceeds maximum %s %zu",
msg, str, msg, max_value);
return -ERANGE;
} else {
*res = (size_t)value;
return 0;
}
}

static const struct spa_dict_item module_props[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <[email protected]>, "
"Demi Marie Obenour <[email protected]>" },
Expand Down Expand Up @@ -316,7 +341,11 @@ struct impl {

struct qubes_stream stream[2];

uint32_t frame_size, domid;
qdb_handle_t qdb;
struct spa_source qdb_watch_source;

uint32_t frame_size;
int domid;

bool do_disconnect;
#if !QUBES_PW_HAS_SCHEDULE_DESTROY
Expand All @@ -325,8 +354,48 @@ struct impl {
#endif

atomic_uint_fast64_t reference_count;

const char *args;
};

int get_domid_from_props(struct pw_properties *props) {
size_t domid;
const char *peer_domain_prop = NULL;
if ((peer_domain_prop = pw_properties_get(props, QUBES_AUDIOVM_PW_KEY)) == NULL) {
return -EINVAL;
}
if (parse_number(peer_domain_prop, INT_MAX / 2, &domid, "domain ID")) {
pw_log_debug("Cannot parse domid");
return -errno;
}
return domid;
}

int get_domid_from_qdb(qdb_handle_t qdb) {
size_t domid;
int res;
char *qdb_entry = qdb_read(qdb, QUBES_AUDIOVM_QUBESDB_ENTRY, NULL);

if (qdb_entry != NULL) {
if (parse_number(qdb_entry, INT_MAX / 2, &domid, "domain ID")) {
pw_log_error("Cannot parse domid");
res = -errno;
} else {
res = (int)domid;
}
} else {
res = -errno;
if (res == -ENOENT)
pw_log_error("no %s entry in QubesDB", QUBES_AUDIOVM_QUBESDB_ENTRY);
else
pw_log_error("unable to obtain %s entry from QubesDB", QUBES_AUDIOVM_QUBESDB_ENTRY);
}

free(qdb_entry);

return res;
}

static inline void
impl_incref(struct impl *impl)
{
Expand Down Expand Up @@ -378,6 +447,7 @@ static void stop_watching_vchan(struct qubes_stream *stream)
stream->vchan = NULL;
}
stream->is_open = false;
stream->source.fd = -1;
// Update the main-thread state asynchronously, but only if the stream
// is not being torn down.
if (!stream->dead) {
Expand Down Expand Up @@ -588,8 +658,13 @@ static int add_stream(struct spa_loop *loop,
static int connect_stream(struct qubes_stream *stream)
{
const char *msg = qubes_stream_is_capture(stream) ? "capture" : "playback";
uint16_t domid = stream->impl->domid;
int32_t domid = (int32_t)stream->impl->domid;
int status;
if (domid<0) {
pw_log_warn("unknown peer domain, cannot create stream");
return 0;
}
pw_log_info("module %p: new (%s), peer id is %" PRIi32, stream->impl, stream->impl->args, domid);

spa_assert_se(stream->vchan == NULL);
spa_assert_se(stream->closed_vchan == NULL);
Expand Down Expand Up @@ -857,6 +932,67 @@ static void vchan_ready(struct spa_source *source)
}
}

static int vchan_reconnect_cb(struct spa_loop *loop,
bool async,
uint32_t seq,
const void *data,
size_t size,
void *user_data)
{
struct qubes_stream *stream = user_data;
stop_watching_vchan(stream);
return 0;
}

static void qdb_cb(struct spa_source *source)
{
struct impl *impl = source->data;
int domid;

pw_log_debug("Received event from QubesDB");

domid = get_domid_from_qdb(impl->qdb);

if (domid != impl->domid) {
if (domid >= 0)
pw_log_info("Setting new peer domain ID %d", domid);
else if (domid == -ENOENT) // not an error, AudioVM unset
pw_log_info("AudioVM unset, disconnecting from %d", impl->domid);
else {
errno = -domid;
pw_log_error("Cannot obtain new peer domain ID (%m), disconnecting from %d", impl->domid);
}
impl->domid = domid;
for (int i = 0; i < 2; ++i) {
if (impl->stream[i].source.fd != -1)
pw_log_info("Closing %d", impl->stream[i].source.fd);
spa_loop_invoke(impl->data_loop, vchan_reconnect_cb, 0, NULL, 0, true, &impl->stream[i]);
}
}
}

static int add_qdb_cb(struct spa_loop *loop, struct impl *impl)
{
int qdb_fd;
if (!qdb_watch(impl->qdb, QUBES_AUDIOVM_QUBESDB_ENTRY)) {
int err = -errno;
pw_log_error("Failed to setup watch on %s: %m\n", QUBES_AUDIOVM_QUBESDB_ENTRY);
return err;
}

qdb_fd = qdb_watch_fd(impl->qdb);
if (qdb_fd < 0)
return -EPIPE;

impl->qdb_watch_source.loop = loop;
impl->qdb_watch_source.mask = SPA_IO_IN;
impl->qdb_watch_source.data = impl;
impl->qdb_watch_source.fd = qdb_fd;
impl->qdb_watch_source.func = qdb_cb;

return spa_loop_add_source(loop, &impl->qdb_watch_source);
}

/**
* Called on the realtime thread to discard unwanted data from the daemon.
*/
Expand Down Expand Up @@ -1412,30 +1548,7 @@ static const struct spa_dict_item playback_props[] = {

static const struct spa_dict playback_dict = SPA_DICT_INIT_ARRAY(playback_props);

// FIXME: this should be a Qubes-wide domID parsing function
static int parse_number(const char *const str,
size_t max_value, size_t *res, const char *const msg)
{
char *endptr = (void *)1;
errno = *res = 0;
unsigned long long value = strtoull(str, &endptr, 0);
if (errno) {
int i = errno;
pw_log_error("Invalid %s \"%s\": %m", msg, str);
return -i;
} else if (*endptr) {
pw_log_error("Invalid %s \"%s\": trailing junk (\"%s\")",
msg, str, endptr);
return -EINVAL;
} else if (value > max_value) {
pw_log_error("Invalid %s \"%s\": exceeds maximum %s %zu",
msg, str, msg, max_value);
return -ERANGE;
} else {
*res = (size_t)value;
return 0;
}
}


static int
create_stream(struct impl *impl, enum spa_direction direction)
Expand Down Expand Up @@ -1468,7 +1581,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
const struct pw_properties *global_props = NULL;
struct impl *impl;
const char *str;
const char *peer_domain_prop = NULL;
int res = -EFAULT; /* should never be returned, modulo bugs */

#ifdef PW_LOG_TOPIC_INIT
Expand All @@ -1488,6 +1600,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (args == NULL)
args = "";

impl->args = args;
impl->module = module;
impl->context = context;
#if !QUBES_PW_HAS_SCHEDULE_DESTROY
Expand Down Expand Up @@ -1566,43 +1679,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
goto error;
}

if ((peer_domain_prop = pw_properties_get(props, QUBES_AUDIOVM_PW_KEY)) == NULL) {
qdb_handle_t qdb = qdb_open(NULL);
if (!qdb) {
res = -errno;
pw_log_error("Could not open QubesDB to get %s property: %m",
QUBES_AUDIOVM_PW_KEY);
goto error;
}

char *qdb_entry = qdb_read(qdb, QUBES_AUDIOVM_QUBESDB_ENTRY, NULL);
if (qdb_entry == NULL) {
res = -errno;
if (res == -ENOENT)
pw_log_error("%s not specified, and no %s entry in QubesDB",
QUBES_AUDIOVM_PW_KEY, QUBES_AUDIOVM_QUBESDB_ENTRY);
else
pw_log_error("%s not specified, and unable to obtain %s entry from QubesDB: %m",
QUBES_AUDIOVM_PW_KEY, QUBES_AUDIOVM_QUBESDB_ENTRY);
qdb_close(qdb);
goto error;
}
qdb_close(qdb);
pw_properties_set(props, QUBES_AUDIOVM_PW_KEY, qdb_entry);
free(qdb_entry);
if (!(peer_domain_prop = pw_properties_get(props, QUBES_AUDIOVM_PW_KEY))) {
pw_log_error("Failed to set %s key - out of memory?", QUBES_AUDIOVM_PW_KEY);
res = -ENOMEM;
goto error;
}
}

{
size_t domid;
if ((res = parse_number(peer_domain_prop, INT_MAX / 2, &domid, "domain ID")))
goto error;
impl->domid = domid;

size_t read_min = 0x100000, write_min = 0x10000;
const char *record_size = pw_properties_get(props, QUBES_PW_KEY_RECORD_BUFFER_SPACE);
const char *record_buffer_fill = pw_properties_get(props, QUBES_PW_KEY_CAPTURE_TARGET_BUFFER_FILL);
Expand Down Expand Up @@ -1647,7 +1724,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->stream[PW_DIRECTION_INPUT].rm.target_buffer = write_min - playback_target_buffer_fill;
}

pw_log_info("module %p: new (%s), peer id is %d", impl, args, (int)impl->domid);
pw_log_info("module %p: record buffer size 0x%zx, playback buffer size 0x%zx",
impl,
impl->stream[PW_DIRECTION_OUTPUT].buffer_size,
Expand Down Expand Up @@ -1713,6 +1789,27 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((res = pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props))) < 0)
goto error;

{
int domid_from_props = get_domid_from_props(props);
// If domid is not specified from pipewire module context,
// then start the qubesdb watcher
if (domid_from_props<0) {
qdb_handle_t qdb = qdb_open(NULL);
if (!qdb) {
res = -errno;
pw_log_error("Could not open QubesDB");
goto error;
}
impl->qdb = qdb;
if ((res = add_qdb_cb(impl->main_loop, impl) != 0)) {
errno = -res;
pw_log_error("can't create qubesdb watcher: %m");
goto error;
}
impl->domid = get_domid_from_qdb(impl->qdb);
}
}

for (uint8_t i = 0; i < 2; ++i)
if ((res = connect_stream(&impl->stream[i])) != 0)
goto error;
Expand Down

0 comments on commit 9becd06

Please sign in to comment.