Skip to content

Commit

Permalink
source handler test
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Sep 7, 2024
1 parent 467037c commit a0b863f
Showing 1 changed file with 104 additions and 51 deletions.
155 changes: 104 additions & 51 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,55 +462,30 @@ impl<
/// If not unexpected errors occur, this method returns [true] if the transfer was cancelled
/// propery and [false] if there is no transaction active or the passed transaction ID and the
/// active ID do not match.
pub fn cancel_request(&mut self, transaction_id: &TransactionId) -> Result<bool, SourceError> {
pub fn cancel_request(&mut self, user: &mut impl CfdpUser, transaction_id: &TransactionId) -> Result<bool, SourceError> {
if self.state_helper.state == super::State::Idle {
return Ok(false);
}
if let Some(active_id) = self.transaction_id() {
if active_id == *transaction_id {
self.declare_fault(ConditionCode::CancelRequestReceived)?;
self.notice_of_cancellation(user, ConditionCode::CancelRequestReceived)?;
return Ok(true);
}
}
Ok(false)
}

pub fn transaction_id(&self) -> Option<TransactionId> {
self.tstate.as_ref().map(|v| v.transaction_id)
}

fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 {
let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
&PduHeader::new_no_file_data(self.pdu_conf, 0),
remote_cfg.max_packet_len,
None,
);
if remote_cfg.max_file_segment_len.is_some() {
derived_max_seg_len = core::cmp::min(
remote_cfg.max_file_segment_len.unwrap(),
derived_max_seg_len,
);
}
derived_max_seg_len as u64
}

/// Returns the [TransmissionMode] for the active file operation.
#[inline]
pub fn transmission_mode(&self) -> Option<super::TransmissionMode> {
self.tstate.as_ref().map(|v| v.transmission_mode)
}

fn fsm_busy(
&mut self,
cfdp_user: &mut impl CfdpUser,
user: &mut impl CfdpUser,
pdu: Option<&impl PduProvider>,
) -> Result<u32, SourceError> {
let mut sent_packets = 0;
if self.state_helper.step == TransactionStep::Idle {
self.state_helper.step = TransactionStep::TransactionStart;
}
if self.state_helper.step == TransactionStep::TransactionStart {
self.handle_transaction_start(cfdp_user)?;
self.handle_transaction_start(user)?;
self.state_helper.step = TransactionStep::SendingMetadata;
}
if self.state_helper.step == TransactionStep::SendingMetadata {
Expand All @@ -526,21 +501,22 @@ impl<
}
}
if self.state_helper.step == TransactionStep::SendingEof {
self.eof_fsm(cfdp_user)?;
self.eof_fsm(user)?;
sent_packets += 1;
}
if self.state_helper.step == TransactionStep::WaitingForFinished {
self.handle_wait_for_finished_pdu(pdu)?;
self.handle_wait_for_finished_pdu(user, pdu)?;
}
if self.state_helper.step == TransactionStep::NoticeOfCompletion {
self.notice_of_completion(cfdp_user);
self.notice_of_completion(user);
self.reset();
}
Ok(sent_packets)
}

fn handle_wait_for_finished_pdu(
&mut self,
user: &mut impl CfdpUser,
packet: Option<&impl PduProvider>,
) -> Result<(), SourceError> {
if let Some(packet) = packet {
Expand All @@ -562,7 +538,7 @@ impl<
}
// If we reach this state, countdown is definitely valid instance.
if self.countdown.as_ref().unwrap().has_expired() {
self.declare_fault(ConditionCode::CheckLimitReached)?;
self.declare_fault(user, ConditionCode::CheckLimitReached)?;
}
/*
def _handle_wait_for_finish(self):
Expand Down Expand Up @@ -592,19 +568,16 @@ impl<
Ok(())
}

fn eof_fsm(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), SourceError> {
fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> {
let tstate = self.tstate.as_ref().unwrap();
let checksum = self.vfs.calculate_checksum(
self.put_request_cacher.source_file().unwrap(),
tstate.remote_cfg.default_crc_type,
self.fparams.file_size,
self.pdu_and_cksum_buffer.get_mut(),
)?;
self.prepare_and_send_eof_pdu(checksum)?;
self.prepare_and_send_eof_pdu(user, checksum)?;
let tstate = self.tstate.as_ref().unwrap();
if self.local_cfg.indication_cfg.eof_sent {
cfdp_user.eof_sent_indication(&tstate.transaction_id);
}
if tstate.transmission_mode == TransmissionMode::Unacknowledged {
if tstate.closure_requested {
self.countdown = Some(self.timer_creator.create_countdown(
Expand Down Expand Up @@ -806,6 +779,21 @@ impl<
}
}

fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 {
let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
&PduHeader::new_no_file_data(self.pdu_conf, 0),
remote_cfg.max_packet_len,
None,
);
if remote_cfg.max_file_segment_len.is_some() {
derived_max_seg_len = core::cmp::min(
remote_cfg.max_file_segment_len.unwrap(),
derived_max_seg_len,
);
}
derived_max_seg_len as u64
}

fn send_progressing_file_data_pdu(&mut self) -> Result<bool, SourceError> {
// Should never be called, but use defensive programming here.
if self.fparams.progress >= self.fparams.file_size {
Expand Down Expand Up @@ -887,7 +875,7 @@ impl<
Ok(true)
}

fn prepare_and_send_eof_pdu(&mut self, checksum: u32) -> Result<(), SourceError> {
fn prepare_and_send_eof_pdu(&mut self, cfdp_user: &mut impl CfdpUser, checksum: u32) -> Result<(), SourceError> {
let tstate = self
.tstate
.as_ref()
Expand All @@ -900,6 +888,9 @@ impl<
None,
);
self.pdu_send_helper(&eof_pdu)?;
if self.local_cfg.indication_cfg.eof_sent {
cfdp_user.eof_sent_indication(&tstate.transaction_id);
}
Ok(())
}

Expand Down Expand Up @@ -953,6 +944,16 @@ impl<

fn handle_keep_alive_pdu(&mut self) {}

pub fn transaction_id(&self) -> Option<TransactionId> {
self.tstate.as_ref().map(|v| v.transaction_id)
}

/// Returns the [TransmissionMode] for the active file operation.
#[inline]
pub fn transmission_mode(&self) -> Option<super::TransmissionMode> {
self.tstate.as_ref().map(|v| v.transmission_mode)
}

/// Get the [TransactionStep], which denotes the exact step of a pending CFDP transaction when
/// applicable.
pub fn step(&self) -> TransactionStep {
Expand All @@ -967,11 +968,18 @@ impl<
&self.local_cfg
}

fn declare_fault(&mut self, cond: ConditionCode) -> Result<(), SourceError> {
fn declare_fault(
&mut self,
user: &mut impl CfdpUser,
cond: ConditionCode,
) -> Result<(), SourceError> {
// Need to cache those in advance, because a notice of cancellation can reset the handler.
let transaction_id = self.tstate.as_ref().unwrap().transaction_id;
let progress = self.fparams.progress;
let fh = self.local_cfg.fault_handler.get_fault_handler(cond);
match fh {
spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => {
if let ControlFlow::Break(_) = self.notice_of_cancellation(cond)? {
if let ControlFlow::Break(_) = self.notice_of_cancellation(user, cond)? {
return Ok(());
}
}
Expand All @@ -981,30 +989,30 @@ impl<
spacepackets::cfdp::FaultHandlerCode::IgnoreError => (),
spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => self.abandon_transaction(),
}
let tstate = self.tstate.as_ref().unwrap();
self.local_cfg.fault_handler.report_fault(
tstate.transaction_id,
transaction_id,
cond,
self.fparams.progress,
progress,
);
Ok(())
}

fn notice_of_cancellation(
&mut self,
user: &mut impl CfdpUser,
condition_code: ConditionCode,
) -> Result<ControlFlow<()>, SourceError> {
let transaction_id = self.tstate.as_ref().unwrap().transaction_id;
// CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring
// the EOF (cancel) PDU must result in abandonment of the transaction.
if let Some(cond_code_eof) = self.tstate.as_ref().unwrap().cond_code_eof {
if cond_code_eof != ConditionCode::NoError {
let tstate = self.tstate.as_ref().unwrap();
// Still call the abandonment callback to ensure the fault is logged.
self.local_cfg
.fault_handler
.user_hook
.get_mut()
.abandoned_cb(tstate.transaction_id, cond_code_eof, self.fparams.progress);
.abandoned_cb(transaction_id, cond_code_eof, self.fparams.progress);
self.abandon_transaction();
return Ok(ControlFlow::Break(()));
}
Expand All @@ -1020,7 +1028,13 @@ impl<
self.fparams.progress,
self.pdu_and_cksum_buffer.get_mut(),
)?;
self.prepare_and_send_eof_pdu(checksum)?;
self.prepare_and_send_eof_pdu(user, checksum)?;
if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
// We are done.
self.reset();
} else {
self.state_helper.step = TransactionStep::WaitingForEofAck;
}
Ok(ControlFlow::Continue(()))
}

Expand Down Expand Up @@ -1072,7 +1086,8 @@ impl<

#[cfg(test)]
mod tests {
use std::{fs::OpenOptions, io::Write, path::PathBuf, vec::Vec};
use core::time::Duration;
use std::{fs::OpenOptions, io::Write, path::PathBuf, thread, vec::Vec};

use alloc::string::String;
use rand::Rng;
Expand Down Expand Up @@ -1175,6 +1190,10 @@ mod tests {
)
}

fn set_check_limit_timeout(&mut self, timeout: Duration) {
self.handler.timer_creator.check_limit_timeout = timeout;
}

fn put_request(
&mut self,
put_request: &impl ReadablePutRequest,
Expand Down Expand Up @@ -1601,14 +1620,48 @@ mod tests {
let error = tb.put_request(&put_request);
assert!(error.is_err());
let error = error.unwrap_err();
if let PutRequestError::FileDoesNotExist = error {
} else {
if !matches!(error, PutRequestError::FileDoesNotExist) {
panic!("unexpected error type: {:?}", error);
}
}

#[test]
fn test_finished_pdu_check_timeout() {
// TODO: Implement.
let fault_handler = TestFaultHandler::default();
let test_sender = TestCfdpSender::default();
let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
tb.set_check_limit_timeout(Duration::from_millis(45));
let filesize = 0;
let put_request = PutRequestOwned::new_regular_request(
REMOTE_ID.into(),
&tb.srcfile,
&tb.destfile,
Some(TransmissionMode::Unacknowledged),
Some(true),
)
.expect("creating put request failed");
let mut cfdp_user = tb.create_user(0, filesize);
let (closure_requested, _pdu_header) =
tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
tb.common_eof_pdu_check(
&mut cfdp_user,
closure_requested,
filesize,
CRC_32.digest().finalize(),
);
// After 50 ms delay, we run into a timeout, which leads to a check limit error
// declaration -> leads to a notice of cancellation -> leads to an EOF PDU with the
// appropriate error code.
thread::sleep(Duration::from_millis(50));
assert_eq!(
tb.handler.state_machine_no_packet(&mut cfdp_user).unwrap(),
0
);
let next_pdu = tb.get_next_sent_pdu().unwrap();
let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
assert_eq!(eof_pdu.condition_code(), ConditionCode::CheckLimitReached);
assert_eq!(eof_pdu.file_size(), 0);
assert_eq!(eof_pdu.file_checksum(), 0);
}
}

0 comments on commit a0b863f

Please sign in to comment.