Skip to content

Commit

Permalink
get publish and subscribe event result from stream hub
Browse files Browse the repository at this point in the history
  • Loading branch information
harlanc authored and hailiang8 committed Dec 23, 2023
1 parent d5044e8 commit c954fd3
Show file tree
Hide file tree
Showing 16 changed files with 251 additions and 1,038 deletions.
1 change: 1 addition & 0 deletions application/xiu/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
xiu::{config, config::Config, service::Service},
};

// #[tokio::main(flavor = "current_thread")]
#[tokio::main]
async fn main() -> Result<()> {
let log_levels = vec!["trace", "debug", "info", "warn", "error"];
Expand Down
26 changes: 20 additions & 6 deletions library/streamhub/src/define.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct SubscriberInfo {
pub id: Uuid,
pub sub_type: SubscribeType,
pub notify_info: NotifyInfo,
pub sub_data_type: SubDataType,
}

impl Serialize for SubscriberInfo {
Expand All @@ -84,6 +85,7 @@ impl Serialize for SubscriberInfo {
pub struct PublisherInfo {
pub id: Uuid,
pub pub_type: PublishType,
pub pub_data_type: PubDataType,
pub notify_info: NotifyInfo,
}

Expand Down Expand Up @@ -165,8 +167,9 @@ pub type AvStatisticReceiver = mpsc::UnboundedReceiver<StreamStatistics>;
pub type StreamStatisticSizeSender = oneshot::Sender<usize>;
pub type StreamStatisticSizeReceiver = oneshot::Receiver<usize>;

pub type EventExecuteResultSender = oneshot::Sender<Result<(), ChannelError>>;
pub type EventExecuteResultReceiver = oneshot::Sender<Result<(), ChannelError>>;
pub type SubEventExecuteResultSender = oneshot::Sender<Result<DataReceiver, ChannelError>>;
pub type PubEventExecuteResultSender =
oneshot::Sender<Result<(Option<FrameDataSender>, Option<PacketDataSender>), ChannelError>>;

#[async_trait]
pub trait TStreamHandler: Send + Sync {
Expand All @@ -191,16 +194,27 @@ pub enum DataSender {
Frame { sender: FrameDataSender },
Packet { sender: PacketDataSender },
}
//we can only sub one kind of stream.
#[derive(Debug, Clone, Serialize)]
pub enum SubDataType {
Frame,
Packet,
}
//we can pub frame or packet or both.
#[derive(Debug, Clone, Serialize)]
pub enum PubDataType {
Frame,
Packet,
Both,
}

#[derive(Serialize)]
pub enum StreamHubEvent {
Subscribe {
identifier: StreamIdentifier,
info: SubscriberInfo,
#[serde(skip_serializing)]
sender: DataSender,
#[serde(skip_serializing)]
eer_sender: EventExecuteResultSender,
result_sender: SubEventExecuteResultSender,
},
UnSubscribe {
identifier: StreamIdentifier,
Expand All @@ -210,7 +224,7 @@ pub enum StreamHubEvent {
identifier: StreamIdentifier,
info: PublisherInfo,
#[serde(skip_serializing)]
receiver: DataReceiver,
result_sender: PubEventExecuteResultSender,
#[serde(skip_serializing)]
stream_handler: Arc<dyn TStreamHandler>,
},
Expand Down
96 changes: 85 additions & 11 deletions library/streamhub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,25 +337,71 @@ impl StreamsHub {
match message {
StreamHubEvent::Publish {
identifier,
receiver,
info,
result_sender,
stream_handler,
} => {
let rv = self
let (frame_sender, packet_sender, receiver) = match info.pub_data_type {
define::PubDataType::Frame => {
let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
(
Some(sender_chan),
None,
DataReceiver {
frame_receiver: Some(receiver_chan),
packet_receiver: None,
},
)
}
define::PubDataType::Packet => {
let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
(
None,
Some(sender_chan),
DataReceiver {
frame_receiver: None,
packet_receiver: Some(receiver_chan),
},
)
}
define::PubDataType::Both => {
let (sender_frame_chan, receiver_frame_chan) =
mpsc::unbounded_channel();
let (sender_packet_chan, receiver_packet_chan) =
mpsc::unbounded_channel();

(
Some(sender_frame_chan),
Some(sender_packet_chan),
DataReceiver {
frame_receiver: Some(receiver_frame_chan),
packet_receiver: Some(receiver_packet_chan),
},
)
}
};

let result = match self
.publish(identifier.clone(), receiver, stream_handler)
.await;
match rv {
.await
{
Ok(()) => {
if let Some(notifier) = &self.notifier {
notifier.on_publish_notify(event_serialize_str).await;
}
self.streams_info
.insert(info.id, PubSubInfo::Publish { identifier });

Ok((frame_sender, packet_sender))
}
Err(err) => {
log::error!("event_loop Publish err: {}", err);
continue;
Err(err)
}
};

if result_sender.send(result).is_err() {
log::error!("event_loop Subscribe error: The receiver dropped.")
}
}

Expand All @@ -378,14 +424,40 @@ impl StreamsHub {
StreamHubEvent::Subscribe {
identifier,
info,
sender,
eer_sender,
result_sender,
} => {
let sub_id = info.id;
let info_clone = info.clone();
let rv = self.subscribe(&identifier, info_clone, sender).await;

match &rv {
//new chan for Frame/Packet sender and receiver
let (sender, receiver) = match info.sub_data_type {
define::SubDataType::Frame => {
let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
(
DataSender::Frame {
sender: sender_chan,
},
DataReceiver {
frame_receiver: Some(receiver_chan),
packet_receiver: None,
},
)
}
define::SubDataType::Packet => {
let (sender_chan, receiver_chan) = mpsc::unbounded_channel();
(
DataSender::Packet {
sender: sender_chan,
},
DataReceiver {
frame_receiver: None,
packet_receiver: Some(receiver_chan),
},
)
}
};

let rv = match self.subscribe(&identifier, info_clone, sender).await {
Ok(()) => {
if let Some(notifier) = &self.notifier {
notifier.on_play_notify(event_serialize_str).await;
Expand All @@ -398,13 +470,15 @@ impl StreamsHub {
sub_info: info,
},
);
Ok(receiver)
}
Err(err) => {
log::error!("event_loop Subscribe error: {}", err);
Err(err)
}
}
};

if eer_sender.send(rv).is_err() {
if result_sender.send(rv).is_err() {
log::error!("event_loop Subscribe error: The receiver dropped.")
}
}
Expand Down
11 changes: 11 additions & 0 deletions protocol/hls/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
std::fmt,
streamhub::errors::ChannelError,
tokio::sync::broadcast::error::RecvError,
tokio::sync::oneshot::error::RecvError as OneshotRecvError,
xflv::errors::FlvDemuxerError,
xmpegts::errors::MpegTsError,
};
Expand Down Expand Up @@ -133,6 +134,8 @@ pub enum HlsErrorValue {
MediaError(#[cause] MediaError),
#[fail(display = "receive error:{}", _0)]
RecvError(#[cause] RecvError),
#[fail(display = "tokio: oneshot receiver err: {}", _0)]
OneshotRecvError(#[cause] OneshotRecvError),
}
impl From<RecvError> for HlsError {
fn from(error: RecvError) -> Self {
Expand Down Expand Up @@ -190,6 +193,14 @@ impl From<ChannelError> for HlsError {
}
}

impl From<OneshotRecvError> for HlsError {
fn from(error: OneshotRecvError) -> Self {
HlsError {
value: HlsErrorValue::OneshotRecvError(error),
}
}
}

impl fmt::Display for HlsError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.value, f)
Expand Down
17 changes: 4 additions & 13 deletions protocol/hls/src/flv_data_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ impl FlvDataReceiver {
app_name: String,
stream_name: String,
) -> Result<(), HlsError> {
let (sender, receiver) = mpsc::unbounded_channel();
/*the sub info is only used to transfer from RTMP to HLS, but not for client player */
let sub_info = SubscriberInfo {
id: self.subscriber_id,
sub_type: SubscribeType::GenerateHls,
sub_data_type: streamhub::define::SubDataType::Frame,
notify_info: NotifyInfo {
request_url: String::from(""),
remote_addr: String::from(""),
Expand All @@ -118,8 +118,7 @@ impl FlvDataReceiver {
let subscribe_event = StreamHubEvent::Subscribe {
identifier,
info: sub_info,
sender: streamhub::define::DataSender::Frame { sender },
eer_sender: event_result_sender,
result_sender: event_result_sender,
};

let rv = self.event_producer.send(subscribe_event);
Expand All @@ -132,16 +131,7 @@ impl FlvDataReceiver {
});
}

match event_result_receiver.await {
Ok(rv) => {
rv?;
}
Err(_) => {
return Err(HlsError {
value: HlsErrorValue::ChannelRecvError,
});
}
}
let receiver = event_result_receiver.await??.frame_receiver.unwrap();

self.data_consumer = receiver;

Expand All @@ -152,6 +142,7 @@ impl FlvDataReceiver {
let sub_info = SubscriberInfo {
id: self.subscriber_id,
sub_type: SubscribeType::PlayerHls,
sub_data_type: streamhub::define::SubDataType::Frame,
notify_info: NotifyInfo {
request_url: String::from(""),
remote_addr: String::from(""),
Expand Down
13 changes: 11 additions & 2 deletions protocol/httpflv/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
amf0::errors::Amf0WriteError, cache::errors::MetadataError, session::errors::SessionError,
},
std::fmt,
tokio::sync::oneshot::error::RecvError,
xflv::errors::FlvMuxerError,
};

Expand Down Expand Up @@ -39,10 +40,10 @@ pub enum HttpFLvErrorValue {
MetadataError(MetadataError),
#[fail(display = "tokio mpsc error")]
MpscSendError(SendError),
#[fail(display = "receiver being dropped")]
ReceiverDroppedError(SendError),
#[fail(display = "event execute error: {}", _0)]
ChannelError(ChannelError),
#[fail(display = "tokio: oneshot receiver err: {}", _0)]
RecvError(#[cause] RecvError),
#[fail(display = "channel recv error")]
ChannelRecvError,
}
Expand Down Expand Up @@ -95,6 +96,14 @@ impl From<ChannelError> for HttpFLvError {
}
}

impl From<RecvError> for HttpFLvError {
fn from(error: RecvError) -> Self {
HttpFLvError {
value: HttpFLvErrorValue::RecvError(error),
}
}
}

impl fmt::Display for HttpFLvError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.value, f)
Expand Down
Loading

0 comments on commit c954fd3

Please sign in to comment.