diff --git a/src/lib.rs b/src/lib.rs index 9d62270..559d054 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -689,9 +689,7 @@ pub trait PduProvider { fn packet_target(&self) -> Result; } -pub struct DummyPduProvider { - phantom: core::marker::PhantomData<()>, -} +pub struct DummyPduProvider(()); impl PduProvider for DummyPduProvider { fn pdu_type(&self) -> PduType { @@ -833,7 +831,6 @@ pub mod alloc_mod { pub pdu_type: PduType, pub file_directive_type: Option, pub pdu: alloc::vec::Vec, - //packet_target: PacketTarget, } impl PduWithInfo { diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index 7b06fc5..a83f612 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -1,18 +1,17 @@ use std::{sync::mpsc, thread, time::Duration}; use cfdp::{ + dest::DestinationHandler, filestore::NativeFilestore, request::StaticPutRequestCacher, source::SourceHandler, user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams, TransactionFinishedParams}, - EntityType, FaultHandler, IndicationConfig, LocalEntityConfig, PacketInfo, PduWithInfo, - RemoteEntityConfig, StdRemoteEntityConfigProvider, TransactionId, UserFaultHookProvider, - VecRemoteEntityConfigProvider, + EntityType, IndicationConfig, LocalEntityConfig, PduWithInfo, RemoteEntityConfig, + StdCheckTimerCreator, TransactionId, UserFaultHookProvider, }; use spacepackets::{ cfdp::{ChecksumType, ConditionCode}, - ecss::CrcType, - seq_count::{SeqCountProviderSimple, SeqCountProviderSyncU16}, + seq_count::SeqCountProviderSyncU16, util::UnsignedByteFieldU16, }; @@ -143,7 +142,7 @@ impl CfdpUser for ExampleCfdpUser { } fn main() { - let local_cfg = LocalEntityConfig::new( + let local_cfg_source = LocalEntityConfig::new( LOCAL_ID.into(), IndicationConfig::default(), ExampleFaultHandler::default(), @@ -151,7 +150,7 @@ fn main() { let (source_tx, source_rx) = mpsc::channel::(); let (dest_tx, dest_rx) = mpsc::channel::(); let put_request_cacher = StaticPutRequestCacher::new(2048); - let remote_cfg = RemoteEntityConfig::new_with_default_values( + let remote_cfg_of_dest = RemoteEntityConfig::new_with_default_values( REMOTE_ID.into(), 1024, true, @@ -161,17 +160,40 @@ fn main() { ); let seq_count_provider = SeqCountProviderSyncU16::default(); let mut source_handler = SourceHandler::new( - local_cfg, + local_cfg_source, source_tx, NativeFilestore::default(), put_request_cacher, 2048, - remote_cfg, + remote_cfg_of_dest, seq_count_provider, ); let mut cfdp_user_source = ExampleCfdpUser::new(EntityType::Sending); - thread::spawn(move || { + let local_cfg_dest = LocalEntityConfig::new( + REMOTE_ID.into(), + IndicationConfig::default(), + ExampleFaultHandler::default(), + ); + let remote_cfg_of_source = RemoteEntityConfig::new_with_default_values( + LOCAL_ID.into(), + 1024, + true, + false, + spacepackets::cfdp::TransmissionMode::Unacknowledged, + ChecksumType::Crc32, + ); + let mut dest_handler = DestinationHandler::new( + local_cfg_dest, + 1024, + dest_tx, + NativeFilestore::default(), + remote_cfg_of_source, + StdCheckTimerCreator::default(), + ); + let mut cfdp_user_dest = ExampleCfdpUser::new(EntityType::Receiving); + + let jh_source = thread::spawn(move || { loop { let mut next_delay = None; let mut undelayed_call_count = 0; @@ -206,6 +228,45 @@ fn main() { thread::sleep(Duration::from_millis(100)); } } - //source_handler.(source_rx); }); + + let jh_dest = thread::spawn(move || { + loop { + let mut next_delay = None; + let mut undelayed_call_count = 0; + let packet_info = match source_rx.try_recv() { + Ok(pdu_with_info) => Some(pdu_with_info), + Err(e) => match e { + mpsc::TryRecvError::Empty => None, + mpsc::TryRecvError::Disconnected => { + panic!("unexpected disconnect from destination channel sender"); + } + }, + }; + match dest_handler.state_machine(&mut cfdp_user_dest, packet_info.as_ref()) { + Ok(sent_packets) => { + if sent_packets == 0 { + next_delay = Some(Duration::from_millis(200)); + } + } + Err(e) => { + println!("Source handler error: {}", e); + next_delay = Some(Duration::from_millis(200)); + } + } + if let Some(delay) = next_delay { + thread::sleep(delay); + } else { + undelayed_call_count += 1; + } + // Safety feature against configuration errors. + if undelayed_call_count >= 200 { + println!("Source handler state machine possible in permanent loop"); + thread::sleep(Duration::from_millis(100)); + } + } + }); + + jh_source.join().unwrap(); + jh_dest.join().unwrap(); }