Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Allow subscribing with SVC params. #543

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion talk/owt/sdk/conference/conferenceclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,95 @@ void ConferenceClient::Subscribe(
},
on_failure);
}
void ConferenceClient::UnPublish(
void ConferenceClient::Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions2& options,
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) {
RTC_LOG(LS_ERROR) << "Remote stream cannot be nullptr.";
return;
}
if (added_stream_type_.find(stream->Id()) == added_stream_type_.end()) {
std::string failure_message(
"Subscribing an invalid stream. Please check whether this stream is "
"removed.");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
if (!stream->VideoEnabled()) {
std::string failure_message(
"Stream without video is not allowed to be subcribed with simulcast/SVC constraints");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
// Avoid subscribing the same stream twice.
{
std::lock_guard<std::mutex> lock(subscribe_pcs_mutex_);
// Search subscirbe pcs
auto it = std::find_if(
subscribe_pcs_.begin(), subscribe_pcs_.end(),
[&](std::shared_ptr<ConferencePeerConnectionChannel> o) -> bool {
return o->GetSubStreamId() == stream->Id();
});
if (it != subscribe_pcs_.end()) {
std::string failure_message(
"The same remote stream has already been subscribed. Subcribe after "
"it is unsubscribed");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
}
// Reorder SDP according to perference list.
PeerConnectionChannelConfiguration config =
GetPeerConnectionChannelConfiguration();
for (auto codec : options.audio.codecs) {
config.audio.push_back(AudioEncodingParameters(codec, 0));
}
std::shared_ptr<ConferencePeerConnectionChannel> pcc(
new ConferencePeerConnectionChannel(config, signaling_channel_,
event_queue_));
pcc->AddObserver(*this);
{
std::lock_guard<std::mutex> lock(subscribe_pcs_mutex_);
subscribe_pcs_.push_back(pcc);
}
std::weak_ptr<ConferenceClient> weak_this = shared_from_this();
std::string stream_id = stream->Id();
pcc->Subscribe(
stream, options,
[on_success, weak_this, stream_id](std::string session_id) {
auto that = weak_this.lock();
if (!that)
return;
// map current pcc
if (on_success != nullptr) {
std::shared_ptr<ConferenceSubscription> cp(
new ConferenceSubscription(that, session_id, stream_id));
on_success(cp);
}
},
on_failure);
}
void ConferenceClient::UnPublish(
const std::string& session_id,
std::function<void()> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
Expand Down Expand Up @@ -1491,6 +1579,12 @@ void ConferenceClient::ParseStreamInfo(sio::message::ptr stream_info,
rid_obj->get_flag() == sio::message::flag_string) {
video_publication_settings.rid = rid_obj->get_string();
}
auto scalability_mode_obj = (*tit)->get_map()["scalabilityMode"];
if (scalability_mode_obj != nullptr &&
scalability_mode_obj->get_flag() == sio::message::flag_string) {
video_publication_settings.scalability_mode =
scalability_mode_obj->get_string();
}
auto trackid_obj = (*tit)->get_map()["id"];
if (trackid_obj != nullptr &&
trackid_obj->get_flag() == sio::message::flag_string) {
Expand Down
136 changes: 113 additions & 23 deletions talk/owt/sdk/conference/conferencepeerconnectionchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -496,15 +496,6 @@ static bool SubOptionAllowed(
// specifies codec, though signaling allows specifying sample rate and channel
// number.

// If rid is specified, search in publication_settings for rid;
if (subscribe_options.video.rid != "") {
for (auto video_setting : publication_settings.video) {
if (video_setting.rid == subscribe_options.video.rid)
return true;
}
return false;
}

bool resolution_supported = (subscribe_options.video.resolution.width == 0 &&
subscribe_options.video.resolution.height == 0);
bool frame_rate_supported = (subscribe_options.video.frameRate == 0);
Expand Down Expand Up @@ -647,19 +638,7 @@ void ConferencePeerConnectionChannel::Subscribe(
video_options->get_map()["mid"] = sio::string_message::create("1");
}
auto publication_settings = stream->Settings();
if (subscribe_options.video.rid != "") {
for (auto video_setting : publication_settings.video) {
if (video_setting.rid == subscribe_options.video.rid) {
std::string track_id = video_setting.track_id;
video_options->get_map()["from"] =
sio::string_message::create(track_id);
break;
}
}
} else {
video_options->get_map()["from"] =
sio::string_message::create(stream->Id());
}
video_options->get_map()["from"] = sio::string_message::create(stream->Id());
sio::message::ptr video_spec = sio::object_message::create();
sio::message::ptr resolution_options = sio::object_message::create();
if (subscribe_options.video.resolution.width != 0 &&
Expand Down Expand Up @@ -691,10 +670,121 @@ void ConferencePeerConnectionChannel::Subscribe(
sio::int_message::create(subscribe_options.video.frameRate);
}
video_options->get_map()["parameters"] = video_spec;
tracks_options->get_vector().push_back(video_options);
}

media_options->get_map()["tracks"] = tracks_options;
sio_options->get_map()["media"] = media_options;
sio::message::ptr transport_ptr = sio::object_message::create();
transport_ptr->get_map()["type"] = sio::string_message::create("webrtc");
sio_options->get_map()["transport"] = transport_ptr;

signaling_channel_->SendInitializationMessage(
sio_options, "", stream->Id(),
[this](std::string session_id, std::string transport_id) {
// Pre-set the session's ID.
SetSessionId(session_id);
CreateOffer();
},
on_failure); // TODO: on_failure
subscribed_stream_ = stream;
}

void ConferencePeerConnectionChannel::Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions2& subscribe_options,
std::function<void(std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) {
RTC_LOG(LS_ERROR) << "Remote stream cannot be nullptr.";
return;
}
if (subscribe_success_callback_) {
if (on_failure) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, "Subscribing this stream."));
on_failure(std::move(e));
});
}
}
if ((subscribe_options.video.rid == "") &&
(subscribe_options.video.spatialLayerId == -1) &&
(subscribe_options.video.temporalLayerId == -1)) {
if (on_failure) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown,
"Either rid/spatialLayer/temporalLayer needs to be set for subscribing."));
on_failure(std::move(e));
});
}
}
subscribe_success_callback_ = on_success;
failure_callback_ = on_failure;
int audio_track_count = 0, video_track_count = 0;
if (stream->has_audio_ && !subscribe_options.audio.disabled) {
webrtc::RtpTransceiverInit transceiver_init;
transceiver_init.direction = webrtc::RtpTransceiverDirection::kRecvOnly;
AddTransceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, transceiver_init);
audio_track_count = 1;
}
if (stream->has_video_) {
webrtc::RtpTransceiverInit transceiver_init;
transceiver_init.direction = webrtc::RtpTransceiverDirection::kRecvOnly;
AddTransceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, transceiver_init);
video_track_count = 1;
}
sio::message::ptr sio_options = sio::object_message::create();
sio::message::ptr media_options = sio::object_message::create();
sio::message::ptr tracks_options = sio::array_message::create();
if (audio_track_count > 0) {
sio::message::ptr audio_options = sio::object_message::create();
audio_options->get_map()["type"] = sio::string_message::create("audio");
audio_options->get_map()["mid"] = sio::string_message::create("0");
audio_options->get_map()["from"] =
sio::string_message::create(stream->Id());
tracks_options->get_vector().push_back(audio_options);
}
if (video_track_count > 0) {
sio::message::ptr video_options = sio::object_message::create();
video_options->get_map()["type"] = sio::string_message::create("video");
if (audio_track_count == 0) {
video_options->get_map()["mid"] = sio::string_message::create("0");
} else {
video_options->get_map()["mid"] = sio::string_message::create("1");
}
auto publication_settings = stream->Settings();
if (subscribe_options.video.rid != "") {
for (auto video_setting : publication_settings.video) {
if (video_setting.rid == subscribe_options.video.rid) {
std::string track_id = video_setting.track_id;
video_options->get_map()["from"] =
sio::string_message::create(track_id);
break;
}
}
} else {
video_options->get_map()["from"] =
sio::string_message::create(stream->Id());
}
sio::message::ptr layer_spec = sio::object_message::create();
if (subscribe_options.video.spatialLayerId >= 0) {
sio::message::ptr spatial_layer_options =
sio::int_message::create(subscribe_options.video.spatialLayerId);
layer_spec->get_map()["spatialLayer"] = spatial_layer_options;
}
if (subscribe_options.video.temporalLayerId >= 0) {
sio::message::ptr temporal_layer_options =
sio::int_message::create(subscribe_options.video.temporalLayerId);
layer_spec->get_map()["temporallLayer"] = temporal_layer_options;
}
if (subscribe_options.video.rid != "") {
video_options->get_map()["simulcastRid"] =
sio::message::ptr rid_options =
sio::string_message::create(subscribe_options.video.rid);
layer_spec->get_map()["rid"] = rid_options;
}
video_options->get_map()["parameters"] = layer_spec;
tracks_options->get_vector().push_back(video_options);
}

Expand Down
7 changes: 6 additions & 1 deletion talk/owt/sdk/conference/conferencepeerconnectionchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ class ConferencePeerConnectionChannel
const std::string& session_id,
std::function<void()> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
// Subscribe a stream from the conference.
// Subscribe a non-simulcast/non-SVC stream from the conference.
void Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions& options,
std::function<void(std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
// Subscribe a simulcast or SVC stream from the conference.
void Subscribe(std::shared_ptr<RemoteStream> stream,
const SubscribeOptions2& options,
std::function<void(std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
// Unsubscribe a remote stream from the conference.
void Unsubscribe(
const std::string& session_id,
Expand Down
1 change: 1 addition & 0 deletions talk/owt/sdk/include/cpp/owt/base/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct VideoPublicationSettings {
unsigned long keyframe_interval;
std::string rid;
std::string track_id;
std::string scalability_mode;
};

#ifdef OWT_ENABLE_QUIC
Expand Down
22 changes: 21 additions & 1 deletion talk/owt/sdk/include/cpp/owt/conference/conferenceclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,10 @@ class ConferenceClient final
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
/**
@brief Subscribe a stream from the current room.
@brief Subscribe a stream with transcoding from the current room.
@details Should only be called on stream that is published without simulcast
or SVC opotions. If |stream| is a simulcast stream or an SVC stream,
subscription will fail.
@param stream The remote stream to be subscribed.
@param options Options for subscribing the stream.
@param onSuccess Success callback with a stream that contains media stream.
Expand All @@ -325,6 +328,23 @@ class ConferenceClient final
const SubscribeOptions& options,
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
/**
@brief Subscribe a simulcast or SVC stream from the current room with preferred rid
and/or temporal/spatial layers.
@details rid and temporal/spatial layer ID can be specified together. If rid in
|options| is not empty and |stream| is not a simulcast stream, subscribe will fail;
If temporalLayerId/spatialLayerId is larger than -1 in |options| and |stream| is not
an SVC stream, subscribe will fail; If both rid and temporal/spatialLyerId are specified,
temporalLayerId/spatialLayerId only applies to simulcast stream associated with rid.
@param stream The remote stream to be subscribed.
@param options Simulcast and SVC stream subscribe options for subscribing the stream.
@param onSuccess Success callback with a stream that contains media stream.
*/
void Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions2& options,
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
/**
@brief Send messsage to all participants in the conference.
@param message The message to be sent.
Expand Down
33 changes: 31 additions & 2 deletions talk/owt/sdk/include/cpp/owt/conference/subscribeoptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@ struct VideoSubscriptionConstraints {
resolution(0, 0),
frameRate(0),
bitrateMultiplier(0),
keyFrameInterval(0),
rid("") {}
keyFrameInterval(0) {}
bool disabled;
std::vector<owt::base::VideoCodecParameters> codecs;
owt::base::Resolution resolution;
double frameRate;
double bitrateMultiplier;
unsigned long keyFrameInterval;
};
/// Simulcast and SVC stream subscription constranits.
struct VideoSubscriptionConstraints2 {
explicit VideoSubscriptionConstraints2()
: rid(""), spatialLayerId(-1), temporalLayerId(-1) {}
std::string rid;
int spatialLayerId;
int temporalLayerId;
};

#ifdef OWT_ENABLE_QUIC
Expand All @@ -57,6 +63,12 @@ struct SubscribeOptions {
DataSubscriptionConstraints data;
#endif
};

/// SVC and Simulcast stream subscribe options
struct SubscribeOptions2 {
AudioSubscriptionConstraints audio;
VideoSubscriptionConstraints2 video;
};
/// Video subscription update constrains used by subscription's ApplyOptions
/// API.
struct VideoSubscriptionUpdateConstraints {
Expand All @@ -73,11 +85,28 @@ struct VideoSubscriptionUpdateConstraints {
double bitrateMultiplier;
unsigned long keyFrameInterval;
};
/// Simulcast and SVC stream subscription update constraints
struct VideoSubscriptionUpdateConstraints2 {
/**
@brief Construct VideoSubscriptionUpdateConstraints with default value.
*/
explicit VideoSubscriptionUpdateConstraints2()
: rid(""), spatialLayerId(-1), temporalLayerId(-1) {}
std::string rid;
int spatialLayerId;
int temporalLayerId;
};
/// Subscription update option used by subscription's ApplyOptions API.
struct SubscriptionUpdateOptions {
/// Options for updating a subscription.
VideoSubscriptionUpdateConstraints video;
};
/// Simulcast and SVC stream subcription update option used by subscription's
/// ApplyOptions API.
struct SubscriptionUpdateOptions2 {
/// Options for updating a subscription.
VideoSubscriptionUpdateConstraints2 video;
};
} // namespace conference
} // namespace owt
#endif // OWT_CONFERENCE_SUBSCRIBEOPTIONS_H_