Skip to content

Commit

Permalink
refactor DataChannelMessageParams #23
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 3, 2024
1 parent 2f741fc commit 03475ad
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 57 deletions.
37 changes: 24 additions & 13 deletions src/handler/datachannel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::messages::{
use datachannel::message::{message_channel_ack::*, message_channel_open::*, message_type::*, *};
use log::{debug, error, warn};
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};
use sctp::ReliabilityType;
use shared::error::Result;
use shared::marshal::*;

Expand Down Expand Up @@ -46,7 +47,9 @@ impl InboundHandler for DataChannelInbound {
message.stream_id,
message.data_message_type);

let _open = DataChannelOpen::unmarshal(&mut buf)?;
let data_channel_open = DataChannelOpen::unmarshal(&mut buf)?;
let (unordered, reliability_type) =
get_reliability_params(data_channel_open.channel_type);

let payload = Message::DataChannelAck(DataChannelAck {}).marshal()?;
Ok((
Expand All @@ -59,12 +62,12 @@ impl InboundHandler for DataChannelInbound {
association_handle: message.association_handle,
stream_id: message.stream_id,
data_message_type: DataChannelMessageType::Control,
params: DataChannelMessageParams::Outbound {
ordered: true,
reliable: true,
max_rtx_count: 0,
max_rtx_millis: 0,
},
params: Some(DataChannelMessageParams {
unordered,
reliability_type,
reliability_parameter: data_channel_open
.reliability_parameter,
}),
payload,
}),
))
Expand Down Expand Up @@ -138,12 +141,7 @@ impl OutboundHandler for DataChannelOutbound {
association_handle: message.association_handle,
stream_id: message.stream_id,
data_message_type: DataChannelMessageType::Text,
params: DataChannelMessageParams::Outbound {
ordered: true,
reliable: true,
max_rtx_count: 0,
max_rtx_millis: 0,
},
params: None,
payload,
})),
});
Expand Down Expand Up @@ -183,3 +181,16 @@ impl Handler for DataChannelHandler {
)
}
}

fn get_reliability_params(channel_type: ChannelType) -> (bool, ReliabilityType) {
let (unordered, reliability_type) = match channel_type {
ChannelType::Reliable => (false, ReliabilityType::Reliable),
ChannelType::ReliableUnordered => (true, ReliabilityType::Reliable),
ChannelType::PartialReliableRexmit => (false, ReliabilityType::Rexmit),
ChannelType::PartialReliableRexmitUnordered => (true, ReliabilityType::Rexmit),
ChannelType::PartialReliableTimed => (false, ReliabilityType::Timed),
ChannelType::PartialReliableTimedUnordered => (true, ReliabilityType::Timed),
};

(unordered, reliability_type)
}
53 changes: 20 additions & 33 deletions src/handler/sctp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, O
use retty::transport::TransportContext;
use sctp::{
AssociationEvent, AssociationHandle, DatagramEvent, EndpointEvent, Event, Payload,
PayloadProtocolIdentifier, ReliabilityType, StreamEvent, Transmit,
PayloadProtocolIdentifier, StreamEvent, Transmit,
};
use shared::error::{Error, Result};
use std::cell::RefCell;
Expand Down Expand Up @@ -124,9 +124,7 @@ impl InboundHandler for SctpInbound {
association_handle: ch.0,
stream_id: id,
data_message_type: to_data_message_type(chunks.ppi),
params: DataChannelMessageParams::Inbound {
seq_num: chunks.ssn,
},
params: None,
payload: BytesMut::from(&self.internal_buffer[0..n]),
}));
}
Expand Down Expand Up @@ -315,33 +313,26 @@ impl OutboundHandler for SctpOutbound {
if let Some(conn) =
sctp_associations.get_mut(&AssociationHandle(message.association_handle))
{
if let DataChannelMessageParams::Outbound {
ordered,
reliable,
max_rtx_count,
max_rtx_millis,
} = message.params
let mut stream = conn.stream(message.stream_id)?;
if let Some(DataChannelMessageParams {
unordered,
reliability_type,
reliability_parameter,
}) = message.params
{
let mut stream = conn.stream(message.stream_id)?;
let (rel_type, rel_val) = if !reliable {
if max_rtx_millis == 0 {
(ReliabilityType::Rexmit, max_rtx_count)
} else {
(ReliabilityType::Timed, max_rtx_millis)
}
} else {
(ReliabilityType::Reliable, 0)
};

stream.set_reliability_params(!ordered, rel_type, rel_val)?;
stream.write_with_ppi(
&message.payload,
to_ppid(message.data_message_type, message.payload.len()),
stream.set_reliability_params(
unordered,
reliability_type,
reliability_parameter,
)?;
}
stream.write_with_ppi(
&message.payload,
to_ppid(message.data_message_type, message.payload.len()),
)?;

while let Some(x) = conn.poll_transmit(msg.now) {
transmits.extend(split_transmit(x));
}
while let Some(x) = conn.poll_transmit(msg.now) {
transmits.extend(split_transmit(x));
}
} else {
return Err(Error::ErrAssociationNotExisted);
Expand Down Expand Up @@ -447,10 +438,6 @@ fn to_ppid(message_type: DataChannelMessageType, length: usize) -> PayloadProtoc
PayloadProtocolIdentifier::BinaryEmpty
}
}
_ =>
// case DataMessageType::CONTROL: // TODO: remove DataMessageType::NONE once DcSctp is landed
{
PayloadProtocolIdentifier::Dcep
}
_ => PayloadProtocolIdentifier::Dcep,
}
}
17 changes: 6 additions & 11 deletions src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bytes::BytesMut;
use retty::transport::TransportContext;
use sctp::ReliabilityType;
use std::time::Instant;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand All @@ -11,16 +12,10 @@ pub(crate) enum DataChannelMessageType {
}

#[derive(Debug)]
pub(crate) enum DataChannelMessageParams {
Inbound {
seq_num: u16,
},
Outbound {
ordered: bool,
reliable: bool,
max_rtx_count: u32,
max_rtx_millis: u32,
},
pub(crate) struct DataChannelMessageParams {
pub(crate) unordered: bool,
pub(crate) reliability_type: ReliabilityType,
pub(crate) reliability_parameter: u32,
}

#[derive(Debug, Clone, Eq, PartialEq)]
Expand All @@ -35,7 +30,7 @@ pub struct DataChannelMessage {
pub(crate) association_handle: usize,
pub(crate) stream_id: u16,
pub(crate) data_message_type: DataChannelMessageType,
pub(crate) params: DataChannelMessageParams,
pub(crate) params: Option<DataChannelMessageParams>,
pub(crate) payload: BytesMut,
}

Expand Down

0 comments on commit 03475ad

Please sign in to comment.