Skip to content

Commit

Permalink
Update UdpSocket::close()
Browse files Browse the repository at this point in the history
  • Loading branch information
cybergarage committed Aug 8, 2024
1 parent 243789f commit a2ebe77
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 16 deletions.
29 changes: 22 additions & 7 deletions src/transport/multicast_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
use log::*;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::thread::{self, JoinHandle};

use crate::protocol::Message;
use crate::transport::default::*;
Expand All @@ -29,13 +30,17 @@ use crate::transport::udp_socket::UdpSocket;
pub struct MulticastServer {
socket: Arc<RwLock<UdpSocket>>,
notifier: Notifier,
stop_flag: Arc<AtomicBool>,
thread_handle: Option<JoinHandle<()>>,
}

impl MulticastServer {
pub fn new() -> MulticastServer {
MulticastServer {
socket: Arc::new(RwLock::new(UdpSocket::new())),
notifier: notifier_new(),
stop_flag: Arc::new(AtomicBool::new(false)),
thread_handle: None,
}
}

Expand Down Expand Up @@ -119,17 +124,21 @@ impl MulticastServer {
true
}

pub fn close(&self) -> bool {
self.socket.read().unwrap().close();
true
pub fn close(&mut self) -> bool {
let sock = self.socket.try_write();
if sock.is_err() {
return false;
}
sock.unwrap().close()
}

pub fn start(&mut self) -> bool {
let socket = self.socket.clone();
let notifier = self.notifier.clone();
thread::spawn(move || {
let stop_flag = self.stop_flag.clone();
let handle = thread::spawn(move || {
let mut buf = [0 as u8; MAX_PACKET_SIZE];
loop {
while !stop_flag.load(Ordering::Relaxed) {
let recv_res = socket.read().unwrap().recv_from(&mut buf);
match &recv_res {
Ok((n_bytes, remote_addr)) => {
Expand Down Expand Up @@ -164,10 +173,16 @@ impl MulticastServer {
}
}
});

self.thread_handle = Some(handle);
true
}

pub fn stop(&self) -> bool {
pub fn stop(&mut self) -> bool {
self.stop_flag.store(true, Ordering::Relaxed);
if let Some(handle) = self.thread_handle.take() {
handle.join().unwrap();
}
if !self.close() {
return false;
}
Expand Down
8 changes: 6 additions & 2 deletions src/transport/udp_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl UdpSocket {
"socket is not bound",
))
}

pub fn bind(&mut self, ifaddr: SocketAddr) -> Result<()> {
if self.sock.is_some() {
self.close();
Expand Down Expand Up @@ -115,9 +116,9 @@ impl UdpSocket {
Ok(())
}

pub fn close(&self) {
pub fn close(&mut self) -> bool {
if self.sock.is_none() {
return;
return true;
}
#[cfg(feature = "unix")]
{
Expand All @@ -129,9 +130,12 @@ impl UdpSocket {
let res = close(fd);
if res.is_err() {
warn!("close {:?}", res.err());
return false;
}
self.sock = None;
}
thread::sleep(time::Duration::from_millis(UDP_SOCKET_BIND_SLEEP_MSEC));
true
}

pub fn send_to(&self, buf: &[u8], to_addr: SocketAddr) -> Result<usize> {
Expand Down
28 changes: 21 additions & 7 deletions src/transport/unicast_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
use log::*;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::thread::{self, JoinHandle};

use crate::protocol::Message;
use crate::transport::default::*;
Expand All @@ -29,13 +30,17 @@ use crate::transport::udp_socket::UdpSocket;
pub struct UnicastServer {
socket: Arc<RwLock<UdpSocket>>,
notifier: Notifier,
stop_flag: Arc<AtomicBool>,
thread_handle: Option<JoinHandle<()>>,
}

impl UnicastServer {
pub fn new() -> UnicastServer {
UnicastServer {
socket: Arc::new(RwLock::new(UdpSocket::new())),
notifier: notifier_new(),
stop_flag: Arc::new(AtomicBool::new(false)),
thread_handle: None,
}
}

Expand Down Expand Up @@ -85,17 +90,21 @@ impl UnicastServer {
true
}

pub fn close(&self) -> bool {
self.socket.read().unwrap().close();
true
pub fn close(&mut self) -> bool {
let sock = self.socket.try_write();
if sock.is_err() {
return false;
}
sock.unwrap().close()
}

pub fn start(&mut self) -> bool {
let socket = self.socket.clone();
let notifier = self.notifier.clone();
thread::spawn(move || {
let stop_flag = self.stop_flag.clone();
let handle = thread::spawn(move || {
let mut buf = [0 as u8; MAX_PACKET_SIZE];
loop {
while !stop_flag.load(Ordering::Relaxed) {
let recv_res = socket.read().unwrap().recv_from(&mut buf);
match &recv_res {
Ok((n_bytes, remote_addr)) => {
Expand Down Expand Up @@ -130,10 +139,15 @@ impl UnicastServer {
}
}
});
self.thread_handle = Some(handle);
true
}

pub fn stop(&self) -> bool {
pub fn stop(&mut self) -> bool {
self.stop_flag.store(true, Ordering::Relaxed);
if let Some(handle) = self.thread_handle.take() {
handle.join().unwrap();
}
if !self.close() {
return false;
}
Expand Down

0 comments on commit a2ebe77

Please sign in to comment.