From 30128accb251a7ec6e7a30b4c36c229932960998 Mon Sep 17 00:00:00 2001 From: john Date: Mon, 20 Nov 2023 20:17:28 +0800 Subject: [PATCH] rtmps forward support config --- trunk/src/app/srs_app_config.cpp | 22 ++++++++++++++++++- trunk/src/app/srs_app_config.hpp | 2 ++ trunk/src/app/srs_app_forward.cpp | 9 +++++--- trunk/src/app/srs_app_forward.hpp | 3 ++- trunk/src/app/srs_app_rtmp_conn.cpp | 7 +++--- trunk/src/app/srs_app_rtmp_conn.hpp | 2 +- trunk/src/app/srs_app_source.cpp | 8 +++++-- trunk/src/protocol/srs_protocol_rtmp_conn.cpp | 6 +++-- trunk/src/protocol/srs_protocol_rtmp_conn.hpp | 3 ++- trunk/src/protocol/srs_protocol_utility.cpp | 5 +++-- trunk/src/protocol/srs_protocol_utility.hpp | 2 +- 11 files changed, 52 insertions(+), 17 deletions(-) diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index bd30033cb7..2179ff140b 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -2695,7 +2695,7 @@ srs_error_t SrsConfig::check_normal_config() } else if (n == "forward") { for (int j = 0; j < (int)conf->directives.size(); j++) { string m = conf->at(j)->name; - if (m != "enabled" && m != "destination" && m != "backend") { + if (m != "enabled" && m != "destination" && m != "backend" && m != "rtmps") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str()); } } @@ -5595,6 +5595,26 @@ SrsConfDirective* SrsConfig::get_forward_backend(string vhost) return conf->get("backend"); } +bool SrsConfig::get_forward_rtmps(string vhost) +{ + SrsConfDirective* conf = get_vhost(vhost); + if (!conf) { + return false; + } + + conf = conf->get("forward"); + if (!conf) { + return false; + } + + conf = conf->get("rtmps"); + if (!conf) { + return false; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost) { SrsConfDirective* conf = get_vhost(vhost); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 13f341e8b6..4a83460bc6 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -666,6 +666,8 @@ class SrsConfig virtual SrsConfDirective* get_forwards(std::string vhost); // Get the forward directive of backend. virtual SrsConfDirective* get_forward_backend(std::string vhost); + // Get forward rtmps enabled. + virtual bool get_forward_rtmps(std::string vhost); public: // Whether the srt sevice enabled diff --git a/trunk/src/app/srs_app_forward.cpp b/trunk/src/app/srs_app_forward.cpp index d64b02eae0..7fcaa354ca 100755 --- a/trunk/src/app/srs_app_forward.cpp +++ b/trunk/src/app/srs_app_forward.cpp @@ -36,6 +36,7 @@ SrsForwarder::SrsForwarder(SrsOriginHub* h) req = NULL; sh_video = sh_audio = NULL; + rtmps_ = false; sdk = NULL; trd = new SrsDummyCoroutine(); @@ -56,7 +57,7 @@ SrsForwarder::~SrsForwarder() srs_freep(req); } -srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) +srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep, bool rtmps) { srs_error_t err = srs_success; @@ -67,6 +68,8 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep) // the ep(endpoint) to forward to ep_forward = ep; + rtmps_ = rtmps; + // Remember the source context id. source_cid_ = _srs_context->get_id(); @@ -208,13 +211,13 @@ srs_error_t SrsForwarder::do_cycle() srs_parse_hostport(ep_forward, server, port); // generate url - url = srs_generate_rtmp_url(server, port, req->host, req->vhost, req->app, req->stream, req->param); + url = srs_generate_rtmp_url(server, port, req->host, req->vhost, req->app, req->stream, req->param, rtmps_); } srs_freep(sdk); srs_utime_t cto = SRS_FORWARDER_CIMS; srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT; - sdk = new SrsSimpleRtmpClient(url, cto, sto); + sdk = new SrsSimpleRtmpClient(url, cto, sto, rtmps_); if ((err = sdk->connect()) != srs_success) { return srs_error_wrap(err, "sdk connect url=%s, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); diff --git a/trunk/src/app/srs_app_forward.hpp b/trunk/src/app/srs_app_forward.hpp index 15f25de5b8..c55234b322 100644 --- a/trunk/src/app/srs_app_forward.hpp +++ b/trunk/src/app/srs_app_forward.hpp @@ -32,6 +32,7 @@ class SrsForwarder : public ISrsCoroutineHandler // The ep to forward, server[:port]. std::string ep_forward; SrsRequest* req; + bool rtmps_; private: // The source or stream context id to bind to. SrsContextId source_cid_; @@ -49,7 +50,7 @@ class SrsForwarder : public ISrsCoroutineHandler SrsForwarder(SrsOriginHub* h); virtual ~SrsForwarder(); public: - virtual srs_error_t initialize(SrsRequest* r, std::string ep); + virtual srs_error_t initialize(SrsRequest* r, std::string ep, bool rtmps); virtual void set_queue_size(srs_utime_t queue_size); public: virtual srs_error_t on_publish(); diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 919bb39d01..119a0ba913 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -56,7 +56,8 @@ using namespace std; // when edge timeout, retry next. #define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT (3 * SRS_UTIME_SECONDS) -SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, srs_utime_t ctm, srs_utime_t stm) : SrsBasicRtmpClient(u, ctm, stm) +SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, srs_utime_t ctm, srs_utime_t stm, bool rtmps) + : SrsBasicRtmpClient(u, ctm, stm, rtmps) { } @@ -195,11 +196,11 @@ srs_error_t SrsRtmpConn::do_cycle() #endif #ifdef SRS_APM - srs_trace("RTMP client ip=%s:%d, fd=%d, trace=%s, span=%s", ip.c_str(), port, srs_netfd_fileno(stfd), + srs_trace("%s client ip=%s:%d, fd=%d, trace=%s, span=%s", ip.c_str(), (rtmps_ ? "RTMPS" : "RTMP"), port, srs_netfd_fileno(stfd), span_main_->format_trace_id(), span_main_->format_span_id() ); #else - srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd)); + srs_trace("%s client ip=%s:%d, fd=%d", (rtmps_ ? "RTMPS" : "RTMP"), ip.c_str(), port, srs_netfd_fileno(stfd)); #endif if (rtmps_) { diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index 24d4e75036..84599f2992 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -45,7 +45,7 @@ class SrsSslConnection; class SrsSimpleRtmpClient : public SrsBasicRtmpClient { public: - SrsSimpleRtmpClient(std::string u, srs_utime_t ctm, srs_utime_t stm); + SrsSimpleRtmpClient(std::string u, srs_utime_t ctm, srs_utime_t stm, bool rtmps = false); virtual ~SrsSimpleRtmpClient(); protected: virtual srs_error_t connect_app(); diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 96c0cd82b4..49f3ad57f2 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1467,6 +1467,8 @@ srs_error_t SrsOriginHub::create_forwarders() return err; } + bool rtmps = _srs_config->get_forward_rtmps(req_->vhost); + // For destanition config SrsConfDirective* conf = _srs_config->get_forwards(req_->vhost); for (int i = 0; conf && i < (int)conf->args.size(); i++) { @@ -1476,7 +1478,7 @@ srs_error_t SrsOriginHub::create_forwarders() forwarders.push_back(forwarder); // initialize the forwarder with request. - if ((err = forwarder->initialize(req_, forward_server)) != srs_success) { + if ((err = forwarder->initialize(req_, forward_server, rtmps)) != srs_success) { return srs_error_wrap(err, "init forwarder"); } @@ -1499,6 +1501,8 @@ srs_error_t SrsOriginHub::create_backend_forwarders(bool& applied) // default not configure backend service applied = false; + bool rtmps = _srs_config->get_forward_rtmps(req_->vhost); + SrsConfDirective* conf = _srs_config->get_forward_backend(req_->vhost); if (!conf || conf->arg0().empty()) { return err; @@ -1535,7 +1539,7 @@ srs_error_t SrsOriginHub::create_backend_forwarders(bool& applied) forward_server << req->host << ":" << req->port; // initialize the forwarder with request. - if ((err = forwarder->initialize(req, forward_server.str())) != srs_success) { + if ((err = forwarder->initialize(req, forward_server.str(), rtmps)) != srs_success) { return srs_error_wrap(err, "init backend forwarder failed, forward-to=%s", forward_server.str().c_str()); } diff --git a/trunk/src/protocol/srs_protocol_rtmp_conn.cpp b/trunk/src/protocol/srs_protocol_rtmp_conn.cpp index ccaad1ef47..5be371174c 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_conn.cpp +++ b/trunk/src/protocol/srs_protocol_rtmp_conn.cpp @@ -17,13 +17,14 @@ using namespace std; #include #include -SrsBasicRtmpClient::SrsBasicRtmpClient(string r, srs_utime_t ctm, srs_utime_t stm) +SrsBasicRtmpClient::SrsBasicRtmpClient(string r, srs_utime_t ctm, srs_utime_t stm, bool rtmps) { kbps = new SrsNetworkKbps(); url = r; connect_timeout = ctm; stream_timeout = stm; + rtmps_ = rtmps; req = new SrsRequest(); srs_parse_rtmp_url(url, req->tcUrl, req->stream); @@ -58,8 +59,9 @@ srs_error_t SrsBasicRtmpClient::connect() close(); transport = new SrsTcpClient(req->host, req->port, srs_utime_t(connect_timeout)); - // if (req->schema == "rtmps") + if (rtmps_) { ssl_transport_ = new SrsSslClient(transport); + } client = new SrsRtmpClient(get_transport()); kbps->set_io(get_transport(), get_transport()); diff --git a/trunk/src/protocol/srs_protocol_rtmp_conn.hpp b/trunk/src/protocol/srs_protocol_rtmp_conn.hpp index 617d98c0e3..dc2db1270a 100644 --- a/trunk/src/protocol/srs_protocol_rtmp_conn.hpp +++ b/trunk/src/protocol/srs_protocol_rtmp_conn.hpp @@ -36,6 +36,7 @@ class SrsBasicRtmpClient std::string url; srs_utime_t connect_timeout; srs_utime_t stream_timeout; + bool rtmps_; protected: SrsRequest* req; private: @@ -49,7 +50,7 @@ class SrsBasicRtmpClient // @param r The RTMP url, for example, rtmp://ip:port/app/stream?domain=vhost // @param ctm The timeout in srs_utime_t to connect to server. // @param stm The timeout in srs_utime_t to delivery A/V stream. - SrsBasicRtmpClient(std::string r, srs_utime_t ctm, srs_utime_t stm); + SrsBasicRtmpClient(std::string r, srs_utime_t ctm, srs_utime_t stm, bool rtmps = false); virtual ~SrsBasicRtmpClient(); public: // Get extra args to carry more information. diff --git a/trunk/src/protocol/srs_protocol_utility.cpp b/trunk/src/protocol/srs_protocol_utility.cpp index 8a004ab6b5..e6dca59f9d 100644 --- a/trunk/src/protocol/srs_protocol_utility.cpp +++ b/trunk/src/protocol/srs_protocol_utility.cpp @@ -351,9 +351,10 @@ void srs_parse_rtmp_url(string url, string& tcUrl, string& stream) } } -string srs_generate_rtmp_url(string server, int port, string host, string vhost, string app, string stream, string param) +string srs_generate_rtmp_url(string server, int port, string host, string vhost, string app, string stream, string param, bool rtmps) { - string tcUrl = "rtmp://" + server + ":" + srs_int2str(port) + "/" + app; + string schema = rtmps ? "rtmps" : "rtmp"; + string tcUrl = schema + "://" + server + ":" + srs_int2str(port) + "/" + app; string streamWithQuery = srs_generate_stream_with_query(host, vhost, stream, param); string url = tcUrl + "/" + streamWithQuery; return url; diff --git a/trunk/src/protocol/srs_protocol_utility.hpp b/trunk/src/protocol/srs_protocol_utility.hpp index 2b866ba3e6..d1656f71e3 100644 --- a/trunk/src/protocol/srs_protocol_utility.hpp +++ b/trunk/src/protocol/srs_protocol_utility.hpp @@ -107,7 +107,7 @@ extern void srs_parse_rtmp_url(std::string url, std::string& tcUrl, std::string& // Genereate the rtmp url, for instance, rtmp://server:port/app/stream?param // @remark We always put vhost in param, in the query of url. -extern std::string srs_generate_rtmp_url(std::string server, int port, std::string host, std::string vhost, std::string app, std::string stream, std::string param); +extern std::string srs_generate_rtmp_url(std::string server, int port, std::string host, std::string vhost, std::string app, std::string stream, std::string param, bool rtmps = false); // write large numbers of iovs. extern srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL);