Skip to content

Commit

Permalink
rtmps forward support config
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaozhihong committed Nov 20, 2023
1 parent a32205f commit 30128ac
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 17 deletions.
22 changes: 21 additions & 1 deletion trunk/src/app/srs_app_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2695,7 +2695,7 @@ srs_error_t SrsConfig::check_normal_config()
} else if (n == "forward") {
for (int j = 0; j < (int)conf->directives.size(); j++) {
string m = conf->at(j)->name;
if (m != "enabled" && m != "destination" && m != "backend") {
if (m != "enabled" && m != "destination" && m != "backend" && m != "rtmps") {
return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal vhost.forward.%s of %s", m.c_str(), vhost->arg0().c_str());
}
}
Expand Down Expand Up @@ -5595,6 +5595,26 @@ SrsConfDirective* SrsConfig::get_forward_backend(string vhost)
return conf->get("backend");
}

bool SrsConfig::get_forward_rtmps(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
if (!conf) {
return false;
}

conf = conf->get("forward");
if (!conf) {
return false;
}

conf = conf->get("rtmps");
if (!conf) {
return false;
}

return SRS_CONF_PERFER_FALSE(conf->arg0());
}

SrsConfDirective* SrsConfig::get_vhost_http_hooks(string vhost)
{
SrsConfDirective* conf = get_vhost(vhost);
Expand Down
2 changes: 2 additions & 0 deletions trunk/src/app/srs_app_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,8 @@ class SrsConfig
virtual SrsConfDirective* get_forwards(std::string vhost);
// Get the forward directive of backend.
virtual SrsConfDirective* get_forward_backend(std::string vhost);
// Get forward rtmps enabled.
virtual bool get_forward_rtmps(std::string vhost);

public:
// Whether the srt sevice enabled
Expand Down
9 changes: 6 additions & 3 deletions trunk/src/app/srs_app_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ SrsForwarder::SrsForwarder(SrsOriginHub* h)

req = NULL;
sh_video = sh_audio = NULL;
rtmps_ = false;

sdk = NULL;
trd = new SrsDummyCoroutine();
Expand All @@ -56,7 +57,7 @@ SrsForwarder::~SrsForwarder()
srs_freep(req);
}

srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep, bool rtmps)
{
srs_error_t err = srs_success;

Expand All @@ -67,6 +68,8 @@ srs_error_t SrsForwarder::initialize(SrsRequest* r, string ep)
// the ep(endpoint) to forward to
ep_forward = ep;

rtmps_ = rtmps;

// Remember the source context id.
source_cid_ = _srs_context->get_id();

Expand Down Expand Up @@ -208,13 +211,13 @@ srs_error_t SrsForwarder::do_cycle()
srs_parse_hostport(ep_forward, server, port);

// generate url
url = srs_generate_rtmp_url(server, port, req->host, req->vhost, req->app, req->stream, req->param);
url = srs_generate_rtmp_url(server, port, req->host, req->vhost, req->app, req->stream, req->param, rtmps_);
}

srs_freep(sdk);
srs_utime_t cto = SRS_FORWARDER_CIMS;
srs_utime_t sto = SRS_CONSTS_RTMP_TIMEOUT;
sdk = new SrsSimpleRtmpClient(url, cto, sto);
sdk = new SrsSimpleRtmpClient(url, cto, sto, rtmps_);

if ((err = sdk->connect()) != srs_success) {
return srs_error_wrap(err, "sdk connect url=%s, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto));
Expand Down
3 changes: 2 additions & 1 deletion trunk/src/app/srs_app_forward.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class SrsForwarder : public ISrsCoroutineHandler
// The ep to forward, server[:port].
std::string ep_forward;
SrsRequest* req;
bool rtmps_;
private:
// The source or stream context id to bind to.
SrsContextId source_cid_;
Expand All @@ -49,7 +50,7 @@ class SrsForwarder : public ISrsCoroutineHandler
SrsForwarder(SrsOriginHub* h);
virtual ~SrsForwarder();
public:
virtual srs_error_t initialize(SrsRequest* r, std::string ep);
virtual srs_error_t initialize(SrsRequest* r, std::string ep, bool rtmps);
virtual void set_queue_size(srs_utime_t queue_size);
public:
virtual srs_error_t on_publish();
Expand Down
7 changes: 4 additions & 3 deletions trunk/src/app/srs_app_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ using namespace std;
// when edge timeout, retry next.
#define SRS_EDGE_TOKEN_TRAVERSE_TIMEOUT (3 * SRS_UTIME_SECONDS)

SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, srs_utime_t ctm, srs_utime_t stm) : SrsBasicRtmpClient(u, ctm, stm)
SrsSimpleRtmpClient::SrsSimpleRtmpClient(string u, srs_utime_t ctm, srs_utime_t stm, bool rtmps)
: SrsBasicRtmpClient(u, ctm, stm, rtmps)
{
}

Expand Down Expand Up @@ -195,11 +196,11 @@ srs_error_t SrsRtmpConn::do_cycle()
#endif

#ifdef SRS_APM
srs_trace("RTMP client ip=%s:%d, fd=%d, trace=%s, span=%s", ip.c_str(), port, srs_netfd_fileno(stfd),
srs_trace("%s client ip=%s:%d, fd=%d, trace=%s, span=%s", ip.c_str(), (rtmps_ ? "RTMPS" : "RTMP"), port, srs_netfd_fileno(stfd),
span_main_->format_trace_id(), span_main_->format_span_id()
);
#else
srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));
srs_trace("%s client ip=%s:%d, fd=%d", (rtmps_ ? "RTMPS" : "RTMP"), ip.c_str(), port, srs_netfd_fileno(stfd));
#endif

if (rtmps_) {
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtmp_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SrsSslConnection;
class SrsSimpleRtmpClient : public SrsBasicRtmpClient
{
public:
SrsSimpleRtmpClient(std::string u, srs_utime_t ctm, srs_utime_t stm);
SrsSimpleRtmpClient(std::string u, srs_utime_t ctm, srs_utime_t stm, bool rtmps = false);
virtual ~SrsSimpleRtmpClient();
protected:
virtual srs_error_t connect_app();
Expand Down
8 changes: 6 additions & 2 deletions trunk/src/app/srs_app_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,8 @@ srs_error_t SrsOriginHub::create_forwarders()
return err;
}

bool rtmps = _srs_config->get_forward_rtmps(req_->vhost);

// For destanition config
SrsConfDirective* conf = _srs_config->get_forwards(req_->vhost);
for (int i = 0; conf && i < (int)conf->args.size(); i++) {
Expand All @@ -1476,7 +1478,7 @@ srs_error_t SrsOriginHub::create_forwarders()
forwarders.push_back(forwarder);

// initialize the forwarder with request.
if ((err = forwarder->initialize(req_, forward_server)) != srs_success) {
if ((err = forwarder->initialize(req_, forward_server, rtmps)) != srs_success) {
return srs_error_wrap(err, "init forwarder");
}

Expand All @@ -1499,6 +1501,8 @@ srs_error_t SrsOriginHub::create_backend_forwarders(bool& applied)
// default not configure backend service
applied = false;

bool rtmps = _srs_config->get_forward_rtmps(req_->vhost);

SrsConfDirective* conf = _srs_config->get_forward_backend(req_->vhost);
if (!conf || conf->arg0().empty()) {
return err;
Expand Down Expand Up @@ -1535,7 +1539,7 @@ srs_error_t SrsOriginHub::create_backend_forwarders(bool& applied)
forward_server << req->host << ":" << req->port;

// initialize the forwarder with request.
if ((err = forwarder->initialize(req, forward_server.str())) != srs_success) {
if ((err = forwarder->initialize(req, forward_server.str(), rtmps)) != srs_success) {
return srs_error_wrap(err, "init backend forwarder failed, forward-to=%s", forward_server.str().c_str());
}

Expand Down
6 changes: 4 additions & 2 deletions trunk/src/protocol/srs_protocol_rtmp_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ using namespace std;
#include <srs_protocol_utility.hpp>
#include <srs_protocol_http_client.hpp>

SrsBasicRtmpClient::SrsBasicRtmpClient(string r, srs_utime_t ctm, srs_utime_t stm)
SrsBasicRtmpClient::SrsBasicRtmpClient(string r, srs_utime_t ctm, srs_utime_t stm, bool rtmps)
{
kbps = new SrsNetworkKbps();

url = r;
connect_timeout = ctm;
stream_timeout = stm;
rtmps_ = rtmps;

req = new SrsRequest();
srs_parse_rtmp_url(url, req->tcUrl, req->stream);
Expand Down Expand Up @@ -58,8 +59,9 @@ srs_error_t SrsBasicRtmpClient::connect()
close();

transport = new SrsTcpClient(req->host, req->port, srs_utime_t(connect_timeout));
// if (req->schema == "rtmps")
if (rtmps_) {
ssl_transport_ = new SrsSslClient(transport);
}

client = new SrsRtmpClient(get_transport());
kbps->set_io(get_transport(), get_transport());
Expand Down
3 changes: 2 additions & 1 deletion trunk/src/protocol/srs_protocol_rtmp_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SrsBasicRtmpClient
std::string url;
srs_utime_t connect_timeout;
srs_utime_t stream_timeout;
bool rtmps_;
protected:
SrsRequest* req;
private:
Expand All @@ -49,7 +50,7 @@ class SrsBasicRtmpClient
// @param r The RTMP url, for example, rtmp://ip:port/app/stream?domain=vhost
// @param ctm The timeout in srs_utime_t to connect to server.
// @param stm The timeout in srs_utime_t to delivery A/V stream.
SrsBasicRtmpClient(std::string r, srs_utime_t ctm, srs_utime_t stm);
SrsBasicRtmpClient(std::string r, srs_utime_t ctm, srs_utime_t stm, bool rtmps = false);
virtual ~SrsBasicRtmpClient();
public:
// Get extra args to carry more information.
Expand Down
5 changes: 3 additions & 2 deletions trunk/src/protocol/srs_protocol_utility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,10 @@ void srs_parse_rtmp_url(string url, string& tcUrl, string& stream)
}
}

string srs_generate_rtmp_url(string server, int port, string host, string vhost, string app, string stream, string param)
string srs_generate_rtmp_url(string server, int port, string host, string vhost, string app, string stream, string param, bool rtmps)
{
string tcUrl = "rtmp://" + server + ":" + srs_int2str(port) + "/" + app;
string schema = rtmps ? "rtmps" : "rtmp";
string tcUrl = schema + "://" + server + ":" + srs_int2str(port) + "/" + app;
string streamWithQuery = srs_generate_stream_with_query(host, vhost, stream, param);
string url = tcUrl + "/" + streamWithQuery;
return url;
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/protocol/srs_protocol_utility.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ extern void srs_parse_rtmp_url(std::string url, std::string& tcUrl, std::string&

// Genereate the rtmp url, for instance, rtmp://server:port/app/stream?param
// @remark We always put vhost in param, in the query of url.
extern std::string srs_generate_rtmp_url(std::string server, int port, std::string host, std::string vhost, std::string app, std::string stream, std::string param);
extern std::string srs_generate_rtmp_url(std::string server, int port, std::string host, std::string vhost, std::string app, std::string stream, std::string param, bool rtmps = false);

// write large numbers of iovs.
extern srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int size, ssize_t* pnwrite = NULL);
Expand Down

0 comments on commit 30128ac

Please sign in to comment.