diff --git a/talk/owt/sdk/conference/conferenceclient.cc b/talk/owt/sdk/conference/conferenceclient.cc index 91b8065c3..ac9d5445d 100644 --- a/talk/owt/sdk/conference/conferenceclient.cc +++ b/talk/owt/sdk/conference/conferenceclient.cc @@ -783,7 +783,95 @@ void ConferenceClient::Subscribe( }, on_failure); } -void ConferenceClient::UnPublish( +void ConferenceClient::Subscribe( + std::shared_ptr stream, + const SubscribeOptions2& options, + std::function)> on_success, + std::function)> on_failure) { + if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) { + RTC_LOG(LS_ERROR) << "Remote stream cannot be nullptr."; + return; + } + if (added_stream_type_.find(stream->Id()) == added_stream_type_.end()) { + std::string failure_message( + "Subscribing an invalid stream. Please check whether this stream is " + "removed."); + if (on_failure != nullptr) { + event_queue_->PostTask([on_failure, failure_message]() { + std::unique_ptr e( + new Exception(ExceptionType::kConferenceUnknown, failure_message)); + on_failure(std::move(e)); + }); + } + return; + } + if (!stream->VideoEnabled()) { + std::string failure_message( + "Stream without video is not allowed to be subcribed with simulcast/SVC constraints"); + if (on_failure != nullptr) { + event_queue_->PostTask([on_failure, failure_message]() { + std::unique_ptr e( + new Exception(ExceptionType::kConferenceUnknown, failure_message)); + on_failure(std::move(e)); + }); + } + return; + } + // Avoid subscribing the same stream twice. + { + std::lock_guard lock(subscribe_pcs_mutex_); + // Search subscirbe pcs + auto it = std::find_if( + subscribe_pcs_.begin(), subscribe_pcs_.end(), + [&](std::shared_ptr o) -> bool { + return o->GetSubStreamId() == stream->Id(); + }); + if (it != subscribe_pcs_.end()) { + std::string failure_message( + "The same remote stream has already been subscribed. Subcribe after " + "it is unsubscribed"); + if (on_failure != nullptr) { + event_queue_->PostTask([on_failure, failure_message]() { + std::unique_ptr e(new Exception( + ExceptionType::kConferenceUnknown, failure_message)); + on_failure(std::move(e)); + }); + } + return; + } + } + // Reorder SDP according to perference list. + PeerConnectionChannelConfiguration config = + GetPeerConnectionChannelConfiguration(); + for (auto codec : options.audio.codecs) { + config.audio.push_back(AudioEncodingParameters(codec, 0)); + } + std::shared_ptr pcc( + new ConferencePeerConnectionChannel(config, signaling_channel_, + event_queue_)); + pcc->AddObserver(*this); + { + std::lock_guard lock(subscribe_pcs_mutex_); + subscribe_pcs_.push_back(pcc); + } + std::weak_ptr weak_this = shared_from_this(); + std::string stream_id = stream->Id(); + pcc->Subscribe( + stream, options, + [on_success, weak_this, stream_id](std::string session_id) { + auto that = weak_this.lock(); + if (!that) + return; + // map current pcc + if (on_success != nullptr) { + std::shared_ptr cp( + new ConferenceSubscription(that, session_id, stream_id)); + on_success(cp); + } + }, + on_failure); +} + void ConferenceClient::UnPublish( const std::string& session_id, std::function on_success, std::function)> on_failure) { @@ -1491,6 +1579,12 @@ void ConferenceClient::ParseStreamInfo(sio::message::ptr stream_info, rid_obj->get_flag() == sio::message::flag_string) { video_publication_settings.rid = rid_obj->get_string(); } + auto scalability_mode_obj = (*tit)->get_map()["scalabilityMode"]; + if (scalability_mode_obj != nullptr && + scalability_mode_obj->get_flag() == sio::message::flag_string) { + video_publication_settings.scalability_mode = + scalability_mode_obj->get_string(); + } auto trackid_obj = (*tit)->get_map()["id"]; if (trackid_obj != nullptr && trackid_obj->get_flag() == sio::message::flag_string) { diff --git a/talk/owt/sdk/conference/conferencepeerconnectionchannel.cc b/talk/owt/sdk/conference/conferencepeerconnectionchannel.cc index 51056cb4f..2bd05b0cd 100644 --- a/talk/owt/sdk/conference/conferencepeerconnectionchannel.cc +++ b/talk/owt/sdk/conference/conferencepeerconnectionchannel.cc @@ -496,15 +496,6 @@ static bool SubOptionAllowed( // specifies codec, though signaling allows specifying sample rate and channel // number. - // If rid is specified, search in publication_settings for rid; - if (subscribe_options.video.rid != "") { - for (auto video_setting : publication_settings.video) { - if (video_setting.rid == subscribe_options.video.rid) - return true; - } - return false; - } - bool resolution_supported = (subscribe_options.video.resolution.width == 0 && subscribe_options.video.resolution.height == 0); bool frame_rate_supported = (subscribe_options.video.frameRate == 0); @@ -647,19 +638,7 @@ void ConferencePeerConnectionChannel::Subscribe( video_options->get_map()["mid"] = sio::string_message::create("1"); } auto publication_settings = stream->Settings(); - if (subscribe_options.video.rid != "") { - for (auto video_setting : publication_settings.video) { - if (video_setting.rid == subscribe_options.video.rid) { - std::string track_id = video_setting.track_id; - video_options->get_map()["from"] = - sio::string_message::create(track_id); - break; - } - } - } else { - video_options->get_map()["from"] = - sio::string_message::create(stream->Id()); - } + video_options->get_map()["from"] = sio::string_message::create(stream->Id()); sio::message::ptr video_spec = sio::object_message::create(); sio::message::ptr resolution_options = sio::object_message::create(); if (subscribe_options.video.resolution.width != 0 && @@ -691,10 +670,121 @@ void ConferencePeerConnectionChannel::Subscribe( sio::int_message::create(subscribe_options.video.frameRate); } video_options->get_map()["parameters"] = video_spec; + tracks_options->get_vector().push_back(video_options); + } + + media_options->get_map()["tracks"] = tracks_options; + sio_options->get_map()["media"] = media_options; + sio::message::ptr transport_ptr = sio::object_message::create(); + transport_ptr->get_map()["type"] = sio::string_message::create("webrtc"); + sio_options->get_map()["transport"] = transport_ptr; + + signaling_channel_->SendInitializationMessage( + sio_options, "", stream->Id(), + [this](std::string session_id, std::string transport_id) { + // Pre-set the session's ID. + SetSessionId(session_id); + CreateOffer(); + }, + on_failure); // TODO: on_failure + subscribed_stream_ = stream; +} + +void ConferencePeerConnectionChannel::Subscribe( + std::shared_ptr stream, + const SubscribeOptions2& subscribe_options, + std::function on_success, + std::function)> on_failure) { + if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) { + RTC_LOG(LS_ERROR) << "Remote stream cannot be nullptr."; + return; + } + if (subscribe_success_callback_) { + if (on_failure) { + event_queue_->PostTask([on_failure]() { + std::unique_ptr e(new Exception( + ExceptionType::kConferenceUnknown, "Subscribing this stream.")); + on_failure(std::move(e)); + }); + } + } + if ((subscribe_options.video.rid == "") && + (subscribe_options.video.spatialLayerId == -1) && + (subscribe_options.video.temporalLayerId == -1)) { + if (on_failure) { + event_queue_->PostTask([on_failure]() { + std::unique_ptr e(new Exception( + ExceptionType::kConferenceUnknown, + "Either rid/spatialLayer/temporalLayer needs to be set for subscribing.")); + on_failure(std::move(e)); + }); + } + } + subscribe_success_callback_ = on_success; + failure_callback_ = on_failure; + int audio_track_count = 0, video_track_count = 0; + if (stream->has_audio_ && !subscribe_options.audio.disabled) { + webrtc::RtpTransceiverInit transceiver_init; + transceiver_init.direction = webrtc::RtpTransceiverDirection::kRecvOnly; + AddTransceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, transceiver_init); + audio_track_count = 1; + } + if (stream->has_video_) { + webrtc::RtpTransceiverInit transceiver_init; + transceiver_init.direction = webrtc::RtpTransceiverDirection::kRecvOnly; + AddTransceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, transceiver_init); + video_track_count = 1; + } + sio::message::ptr sio_options = sio::object_message::create(); + sio::message::ptr media_options = sio::object_message::create(); + sio::message::ptr tracks_options = sio::array_message::create(); + if (audio_track_count > 0) { + sio::message::ptr audio_options = sio::object_message::create(); + audio_options->get_map()["type"] = sio::string_message::create("audio"); + audio_options->get_map()["mid"] = sio::string_message::create("0"); + audio_options->get_map()["from"] = + sio::string_message::create(stream->Id()); + tracks_options->get_vector().push_back(audio_options); + } + if (video_track_count > 0) { + sio::message::ptr video_options = sio::object_message::create(); + video_options->get_map()["type"] = sio::string_message::create("video"); + if (audio_track_count == 0) { + video_options->get_map()["mid"] = sio::string_message::create("0"); + } else { + video_options->get_map()["mid"] = sio::string_message::create("1"); + } + auto publication_settings = stream->Settings(); + if (subscribe_options.video.rid != "") { + for (auto video_setting : publication_settings.video) { + if (video_setting.rid == subscribe_options.video.rid) { + std::string track_id = video_setting.track_id; + video_options->get_map()["from"] = + sio::string_message::create(track_id); + break; + } + } + } else { + video_options->get_map()["from"] = + sio::string_message::create(stream->Id()); + } + sio::message::ptr layer_spec = sio::object_message::create(); + if (subscribe_options.video.spatialLayerId >= 0) { + sio::message::ptr spatial_layer_options = + sio::int_message::create(subscribe_options.video.spatialLayerId); + layer_spec->get_map()["spatialLayer"] = spatial_layer_options; + } + if (subscribe_options.video.temporalLayerId >= 0) { + sio::message::ptr temporal_layer_options = + sio::int_message::create(subscribe_options.video.temporalLayerId); + layer_spec->get_map()["temporallLayer"] = temporal_layer_options; + } if (subscribe_options.video.rid != "") { - video_options->get_map()["simulcastRid"] = + sio::message::ptr rid_options = sio::string_message::create(subscribe_options.video.rid); + layer_spec->get_map()["rid"] = rid_options; } + video_options->get_map()["parameters"] = layer_spec; tracks_options->get_vector().push_back(video_options); } diff --git a/talk/owt/sdk/conference/conferencepeerconnectionchannel.h b/talk/owt/sdk/conference/conferencepeerconnectionchannel.h index c63c31497..a474426fe 100644 --- a/talk/owt/sdk/conference/conferencepeerconnectionchannel.h +++ b/talk/owt/sdk/conference/conferencepeerconnectionchannel.h @@ -43,12 +43,17 @@ class ConferencePeerConnectionChannel const std::string& session_id, std::function on_success, std::function)> on_failure); - // Subscribe a stream from the conference. + // Subscribe a non-simulcast/non-SVC stream from the conference. void Subscribe( std::shared_ptr stream, const SubscribeOptions& options, std::function on_success, std::function)> on_failure); + // Subscribe a simulcast or SVC stream from the conference. + void Subscribe(std::shared_ptr stream, + const SubscribeOptions2& options, + std::function on_success, + std::function)> on_failure); // Unsubscribe a remote stream from the conference. void Unsubscribe( const std::string& session_id, diff --git a/talk/owt/sdk/include/cpp/owt/base/options.h b/talk/owt/sdk/include/cpp/owt/base/options.h index 9a405397c..461328326 100644 --- a/talk/owt/sdk/include/cpp/owt/base/options.h +++ b/talk/owt/sdk/include/cpp/owt/base/options.h @@ -40,6 +40,7 @@ struct VideoPublicationSettings { unsigned long keyframe_interval; std::string rid; std::string track_id; + std::string scalability_mode; }; #ifdef OWT_ENABLE_QUIC diff --git a/talk/owt/sdk/include/cpp/owt/conference/conferenceclient.h b/talk/owt/sdk/include/cpp/owt/conference/conferenceclient.h index 3cb04c520..03196ec58 100644 --- a/talk/owt/sdk/include/cpp/owt/conference/conferenceclient.h +++ b/talk/owt/sdk/include/cpp/owt/conference/conferenceclient.h @@ -315,7 +315,10 @@ class ConferenceClient final std::function)> on_success, std::function)> on_failure); /** - @brief Subscribe a stream from the current room. + @brief Subscribe a stream with transcoding from the current room. + @details Should only be called on stream that is published without simulcast + or SVC opotions. If |stream| is a simulcast stream or an SVC stream, + subscription will fail. @param stream The remote stream to be subscribed. @param options Options for subscribing the stream. @param onSuccess Success callback with a stream that contains media stream. @@ -325,6 +328,23 @@ class ConferenceClient final const SubscribeOptions& options, std::function)> on_success, std::function)> on_failure); + /** + @brief Subscribe a simulcast or SVC stream from the current room with preferred rid + and/or temporal/spatial layers. + @details rid and temporal/spatial layer ID can be specified together. If rid in + |options| is not empty and |stream| is not a simulcast stream, subscribe will fail; + If temporalLayerId/spatialLayerId is larger than -1 in |options| and |stream| is not + an SVC stream, subscribe will fail; If both rid and temporal/spatialLyerId are specified, + temporalLayerId/spatialLayerId only applies to simulcast stream associated with rid. + @param stream The remote stream to be subscribed. + @param options Simulcast and SVC stream subscribe options for subscribing the stream. + @param onSuccess Success callback with a stream that contains media stream. +*/ + void Subscribe( + std::shared_ptr stream, + const SubscribeOptions2& options, + std::function)> on_success, + std::function)> on_failure); /** @brief Send messsage to all participants in the conference. @param message The message to be sent. diff --git a/talk/owt/sdk/include/cpp/owt/conference/subscribeoptions.h b/talk/owt/sdk/include/cpp/owt/conference/subscribeoptions.h index 5d9a73e59..0b8c237bf 100644 --- a/talk/owt/sdk/include/cpp/owt/conference/subscribeoptions.h +++ b/talk/owt/sdk/include/cpp/owt/conference/subscribeoptions.h @@ -30,15 +30,21 @@ struct VideoSubscriptionConstraints { resolution(0, 0), frameRate(0), bitrateMultiplier(0), - keyFrameInterval(0), - rid("") {} + keyFrameInterval(0) {} bool disabled; std::vector codecs; owt::base::Resolution resolution; double frameRate; double bitrateMultiplier; unsigned long keyFrameInterval; +}; +/// Simulcast and SVC stream subscription constranits. +struct VideoSubscriptionConstraints2 { + explicit VideoSubscriptionConstraints2() + : rid(""), spatialLayerId(-1), temporalLayerId(-1) {} std::string rid; + int spatialLayerId; + int temporalLayerId; }; #ifdef OWT_ENABLE_QUIC @@ -57,6 +63,12 @@ struct SubscribeOptions { DataSubscriptionConstraints data; #endif }; + +/// SVC and Simulcast stream subscribe options +struct SubscribeOptions2 { + AudioSubscriptionConstraints audio; + VideoSubscriptionConstraints2 video; +}; /// Video subscription update constrains used by subscription's ApplyOptions /// API. struct VideoSubscriptionUpdateConstraints { @@ -73,11 +85,28 @@ struct VideoSubscriptionUpdateConstraints { double bitrateMultiplier; unsigned long keyFrameInterval; }; +/// Simulcast and SVC stream subscription update constraints +struct VideoSubscriptionUpdateConstraints2 { + /** + @brief Construct VideoSubscriptionUpdateConstraints with default value. + */ + explicit VideoSubscriptionUpdateConstraints2() + : rid(""), spatialLayerId(-1), temporalLayerId(-1) {} + std::string rid; + int spatialLayerId; + int temporalLayerId; +}; /// Subscription update option used by subscription's ApplyOptions API. struct SubscriptionUpdateOptions { /// Options for updating a subscription. VideoSubscriptionUpdateConstraints video; }; +/// Simulcast and SVC stream subcription update option used by subscription's +/// ApplyOptions API. +struct SubscriptionUpdateOptions2 { + /// Options for updating a subscription. + VideoSubscriptionUpdateConstraints2 video; +}; } // namespace conference } // namespace owt #endif // OWT_CONFERENCE_SUBSCRIBEOPTIONS_H_