Skip to content

Commit

Permalink
setting rtpengine public-ip
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm committed Nov 18, 2024
1 parent 90fca09 commit 05fcbd0
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 20 deletions.
14 changes: 12 additions & 2 deletions bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct Args {
/// The IP address for RTPengine RTP listening.
/// Default: 127.0.0.1
#[arg(env, long, default_value = "127.0.0.1")]
pub rtpengine_rtp_ip: IpAddr,
pub rtpengine_listen_ip: IpAddr,

/// Maximum concurrent connections per CPU core.
#[arg(env, long, default_value_t = 200)]
Expand Down Expand Up @@ -141,6 +141,15 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
};
let webrtc_addrs = node.bind_addrs.iter().map(|addr| SocketAddr::new(addr.ip(), webrtc_port)).collect::<Vec<_>>();
let webrtc_addrs_alt = node.bind_addrs_alt.iter().map(|addr| SocketAddr::new(addr.ip(), webrtc_port)).collect::<Vec<_>>();
let rtpengine_public_ip = webrtc_addrs
.iter()
.chain(webrtc_addrs_alt.iter())
.find(|addr| match addr.ip() {
IpAddr::V4(ipv4) => !ipv4.is_unspecified() && !ipv4.is_multicast() && !ipv4.is_loopback() && !ipv4.is_broadcast() && !ipv4.is_private(),
IpAddr::V6(ipv6) => !ipv6.is_unspecified() && !ipv6.is_multicast() && !ipv6.is_loopback(),
})
.map(|addr| addr.ip())
.unwrap_or(args.rtpengine_listen_ip);

println!("Running media server worker {i} with addrs: {:?}, ice-lite: {}", webrtc_addrs, args.ice_lite);

Expand All @@ -151,7 +160,8 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
media: MediaConfig {
webrtc_addrs,
webrtc_addrs_alt,
rtpengine_rtp_ip: args.rtpengine_rtp_ip,
rtpengine_listen_ip: args.rtpengine_listen_ip,
rtpengine_public_ip: rtpengine_public_ip,
ice_lite: args.ice_lite,
secure: secure.clone(),
max_live: HashMap::from([(ServiceKind::Webrtc, workers as u32 * args.ccu_per_core), (ServiceKind::RtpEngine, workers as u32 * args.ccu_per_core)]),
Expand Down
6 changes: 3 additions & 3 deletions bin/src/server/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct Args {
/// The IP address for RTPengine RTP listening.
/// Default: 127.0.0.1
#[arg(env, long, default_value = "127.0.0.1")]
pub rtpengine_rtp_ip: IpAddr,
pub rtpengine_listen_ip: IpAddr,

/// Media instance count
#[arg(env, long, default_value_t = 2)]
Expand Down Expand Up @@ -208,7 +208,7 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) {
let record_cache = args.record_cache.clone();
let record_mem_max_size = args.record_mem_max_size;
let record_upload_worker = args.record_upload_worker;
let rtpengine_rtp_ip = args.rtpengine_rtp_ip;
let rtpengine_listen_ip = args.rtpengine_listen_ip;
tokio::task::spawn_local(async move {
super::run_media_server(
workers,
Expand All @@ -226,7 +226,7 @@ pub async fn run_standalone(workers: usize, node: NodeConfig, args: Args) {
ice_lite: false,
webrtc_port_seed: 0,
rtpengine_cmd_addr: None,
rtpengine_rtp_ip,
rtpengine_listen_ip,
ccu_per_core: 200,
record_cache,
record_mem_max_size,
Expand Down
5 changes: 3 additions & 2 deletions packages/media_runner/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ pub struct MediaConfig<ES> {
pub ice_lite: bool,
pub webrtc_addrs: Vec<SocketAddr>,
pub webrtc_addrs_alt: Vec<SocketAddr>,
pub rtpengine_rtp_ip: IpAddr,
pub rtpengine_listen_ip: IpAddr,
pub rtpengine_public_ip: IpAddr,
pub secure: Arc<ES>,
pub max_live: HashMap<ServiceKind, u32>,
pub enable_gateway_agent: bool,
Expand Down Expand Up @@ -219,7 +220,7 @@ impl<ES: 'static + MediaEdgeSecure> MediaServerWorker<ES> {
MediaWorkerWebrtc::new(media.webrtc_addrs, media.webrtc_addrs_alt, media.ice_lite, media.secure.clone()),
TaskType::MediaWebrtc,
),
media_rtpengine: TaskSwitcherBranch::new(MediaWorkerRtpEngine::new(media.rtpengine_rtp_ip), TaskType::MediaRtpEngine),
media_rtpengine: TaskSwitcherBranch::new(MediaWorkerRtpEngine::new(media.rtpengine_listen_ip, media.rtpengine_public_ip), TaskType::MediaRtpEngine),
media_max_live,
switcher: TaskSwitcher::new(4),
queue,
Expand Down
16 changes: 8 additions & 8 deletions packages/transport_rtpengine/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ pub struct TransportRtpEngine {
}

impl TransportRtpEngine {
pub fn new_offer(room: RoomId, peer: PeerId, ip: IpAddr) -> Result<(Self, String), String> {
let socket = std::net::UdpSocket::bind(SocketAddr::new(ip, 0)).map_err(|e| e.to_string())?;
pub fn new_offer(room: RoomId, peer: PeerId, public_ip: IpAddr, listen_ip: IpAddr) -> Result<(Self, String), String> {
let socket = std::net::UdpSocket::bind(SocketAddr::new(listen_ip, 0)).map_err(|e| e.to_string())?;
let port = socket.local_addr().map_err(|e| e.to_string())?.port();
let answer = sdp_builder(ip, port);
let answer = sdp_builder(public_ip, port);

Ok((
Self {
Expand All @@ -85,7 +85,7 @@ impl TransportRtpEngine {
last_send_rtp: None,
queue: DynamicDeque::from([
TransportOutput::Net(BackendOutgoing::UdpListen {
addr: SocketAddr::new(ip, port),
addr: SocketAddr::new(listen_ip, port),
reuse: false,
}),
TransportOutput::Event(TransportEvent::State(TransportState::New)),
Expand All @@ -99,7 +99,7 @@ impl TransportRtpEngine {
))
}

pub fn new_answer(room: RoomId, peer: PeerId, ip: IpAddr, offer: &str) -> Result<(Self, String), String> {
pub fn new_answer(room: RoomId, peer: PeerId, public_ip: IpAddr, listen_ip: IpAddr, offer: &str) -> Result<(Self, String), String> {
let mut offer = SessionDescription::try_from(offer.to_string()).map_err(|e| e.to_string())?;
let dest_ip: IpAddr = if let Some(conn) = offer.connection {
conn.connection_address.base
Expand All @@ -111,9 +111,9 @@ impl TransportRtpEngine {

log::info!("[TransportRtpEngine] on create answer => set remote to {remote}");

let socket = std::net::UdpSocket::bind(SocketAddr::new(ip, 0)).map_err(|e| e.to_string())?;
let socket = std::net::UdpSocket::bind(SocketAddr::new(listen_ip, 0)).map_err(|e| e.to_string())?;
let port = socket.local_addr().map_err(|e| e.to_string())?.port();
let answer = sdp_builder(ip, port);
let answer = sdp_builder(public_ip, port);

Ok((
Self {
Expand All @@ -129,7 +129,7 @@ impl TransportRtpEngine {
last_send_rtp: None,
queue: DynamicDeque::from([
TransportOutput::Net(BackendOutgoing::UdpListen {
addr: SocketAddr::new(ip, port),
addr: SocketAddr::new(listen_ip, port),
reuse: false,
}),
TransportOutput::Event(TransportEvent::State(TransportState::Connecting(dest_ip))),
Expand Down
12 changes: 7 additions & 5 deletions packages/transport_rtpengine/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ pub enum GroupOutput {

#[allow(clippy::type_complexity)]
pub struct MediaWorkerRtpEngine {
ip: IpAddr,
listen_ip: IpAddr,
public_ip: IpAddr,
endpoints: TaskGroup<EndpointInput<ExtIn>, EndpointOutput<ExtOut>, Endpoint<TransportRtpEngine, ExtIn, ExtOut>, 16>,
queue: VecDeque<GroupOutput>,
shutdown: bool,
}

impl MediaWorkerRtpEngine {
pub fn new(ip: IpAddr) -> Self {
pub fn new(listen_ip: IpAddr, public_ip: IpAddr) -> Self {
Self {
ip,
listen_ip,
public_ip,
endpoints: TaskGroup::default(),
queue: VecDeque::new(),
shutdown: false,
Expand All @@ -58,9 +60,9 @@ impl MediaWorkerRtpEngine {

pub fn spawn(&mut self, app: AppContext, room: RoomId, peer: PeerId, record: bool, session_id: u64, offer: Option<&str>) -> RpcResult<(usize, String)> {
let (tran, answer) = if let Some(offer) = offer {
TransportRtpEngine::new_answer(room, peer, self.ip, offer).map_err(|e| RpcError::new(1000_u32, &e))?
TransportRtpEngine::new_answer(room, peer, self.public_ip, self.listen_ip, offer).map_err(|e| RpcError::new(1000_u32, &e))?
} else {
TransportRtpEngine::new_offer(room, peer, self.ip).map_err(|e| RpcError::new(1000_u32, &e))?
TransportRtpEngine::new_offer(room, peer, self.public_ip, self.listen_ip).map_err(|e| RpcError::new(1000_u32, &e))?
};
let cfg = EndpointCfg {
app,
Expand Down

0 comments on commit 05fcbd0

Please sign in to comment.