Skip to content

Commit

Permalink
Merge pull request #139 from SpaceTeam/timeout-handling
Browse files Browse the repository at this point in the history
Timeout handling
  • Loading branch information
florg-32 authored Jan 21, 2024
2 parents 8ff32a5 + b31a134 commit e2a6fcc
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 173 deletions.
182 changes: 58 additions & 124 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ coverage: build_with_cov
firefox ./target/release/coverage/index.html&

sw_test:
cargo build --release && RUST_LOG=info cargo test --release --features mock
cargo build --release -Fmock && RUST_LOG=info cargo test --release -Fmock

packs:
cargo test build_pack --features rpi
Expand Down
10 changes: 5 additions & 5 deletions src/command/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ impl From<subprocess::PopenError> for CommandError {
impl From<CommunicationError> for CommandError {
fn from(e: CommunicationError) -> Self {
match e {
CommunicationError::PacketInvalidError => CommandError::External(Box::new(e)),
CommunicationError::CepParsing(_) => CommandError::ProtocolViolation(Box::new(e)),
CommunicationError::Io(_) => CommandError::NonRecoverable(Box::new(e)),
CommunicationError::NotAcknowledged => CommandError::ProtocolViolation(Box::new(e)),
CommunicationError::TimedOut => todo!("Timeout not yet specified"),
CommunicationError::PacketInvalidError => CommandError::External(e.into()),
CommunicationError::CepParsing(_) => CommandError::ProtocolViolation(e.into()),
CommunicationError::Io(_) => CommandError::NonRecoverable(e.into()),
CommunicationError::NotAcknowledged => CommandError::ProtocolViolation(e.into()),
CommunicationError::TimedOut => CommandError::ProtocolViolation(e.into()),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/command/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use crate::communication::{CEPPacket, CommunicationHandle};

mod common;
Expand All @@ -21,6 +23,8 @@ use stop_program::stop_program;
use store_archive::store_archive;
use update_time::update_time;

const COMMAND_TIMEOUT: Duration = Duration::from_secs(1);

type CommandResult = Result<(), CommandError>;

/// Main routine. Waits for a command to be received from the COBC, then parses and executes it.
Expand Down
6 changes: 2 additions & 4 deletions src/command/return_result.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::time::Duration;

use crate::{
command::{check_length, CommandError, Event, ResultId},
command::{check_length, CommandError, Event, ResultId, COMMAND_TIMEOUT},
communication::{CEPPacket, CommunicationHandle},
};

Expand Down Expand Up @@ -31,7 +29,7 @@ pub fn return_result(
log::info!("Returning result for {}:{}", program_id, timestamp);
com.send_multi_packet(&bytes)?;

com.await_ack(&Duration::from_secs(1))?;
com.await_ack(COMMAND_TIMEOUT)?;
let result_id = ResultId { program_id, timestamp };
delete_result(result_id)?;

Expand Down
97 changes: 66 additions & 31 deletions src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,30 @@ pub type ComResult<T> = Result<T, CommunicationError>;

pub trait CommunicationHandle: Read + Write {
const INTEGRITY_ACK_TIMEOUT: Duration;
const UNLIMITED_TIMEOUT: Duration = Duration::MAX;
const UNLIMITED_TIMEOUT: Duration;

const DATA_PACKET_RETRIES: usize = 4;

fn set_timeout(&mut self, timeout: &Duration);
fn set_timeout(&mut self, timeout: Duration);

fn send_packet(&mut self, packet: &CEPPacket) -> ComResult<()> {
let bytes = Vec::from(packet);
self.write_all(&bytes)?;

for _ in 0..Self::DATA_PACKET_RETRIES {
self.write_all(&bytes)?;

if matches!(packet, CEPPacket::Data(_)) {
let response = self.receive_packet()?;
match response {
CEPPacket::Ack => return Ok(()),
CEPPacket::Nack => log::warn!("Received NACK after data packet; Retrying"),
p => {
log::error!("Received {p:?} after data packet");
return Err(CommunicationError::PacketInvalidError);
if !(matches!(packet, CEPPacket::Data(_))) {
return Ok(());
}

for i in 1..=Self::DATA_PACKET_RETRIES {
match self.await_ack(Self::INTEGRITY_ACK_TIMEOUT) {
Ok(()) => return Ok(()),
Err(CommunicationError::NotAcknowledged) => {
log::warn!("Received NACK, retrying");
if i < Self::DATA_PACKET_RETRIES {
self.write_all(&bytes)?;
}
}
} else {
return Ok(());
},
Err(e) => return Err(e)
}
}

Expand All @@ -50,7 +50,7 @@ pub trait CommunicationHandle: Read + Write {
}

self.send_packet(&CEPPacket::Eof)?;
self.await_ack(&Self::INTEGRITY_ACK_TIMEOUT)?;
self.await_ack(Self::INTEGRITY_ACK_TIMEOUT)?;

Ok(())
}
Expand All @@ -64,7 +64,8 @@ pub trait CommunicationHandle: Read + Write {
}
Ok(p) => return Ok(p),
Err(CEPParseError::InvalidCRC) => {
log::warn!("Received data packet with invalid CRC; Retrying")
log::warn!("Received data packet with invalid CRC; Retrying");
self.send_packet(&CEPPacket::Nack)?;
}
Err(e) => {
log::error!("Failed to read packet: {e:?}");
Expand All @@ -73,7 +74,8 @@ pub trait CommunicationHandle: Read + Write {
}
}

todo!()
log::error!("Could not receive data packet after {} retries, giving up", Self::DATA_PACKET_RETRIES);
Err(CommunicationError::PacketInvalidError)
}

fn receive_multi_packet(&mut self) -> ComResult<Vec<u8>> {
Expand Down Expand Up @@ -107,22 +109,26 @@ pub trait CommunicationHandle: Read + Write {
Ok(buffer)
}

fn await_ack(&mut self, timeout: &Duration) -> ComResult<()> {
/// Try to receive an ACK packet with a given `timeout`. Resets the timeout to Duration::MAX afterwards
fn await_ack(&mut self, timeout: Duration) -> ComResult<()> {
self.set_timeout(timeout);
match self.receive_packet()? {
let ret = match self.receive_packet()? {
CEPPacket::Ack => Ok(()),
CEPPacket::Nack => Err(CommunicationError::NotAcknowledged),
_ => Err(CommunicationError::PacketInvalidError),
}
};
self.set_timeout(Self::UNLIMITED_TIMEOUT);
ret
}
}

impl CommunicationHandle for Box<dyn serialport::SerialPort> {
const INTEGRITY_ACK_TIMEOUT: Duration = Duration::from_millis(100);
const UNLIMITED_TIMEOUT: Duration = Duration::MAX;
const INTEGRITY_ACK_TIMEOUT: Duration = Duration::from_millis(1000);
/// Equivalent to 106 days, maximum allowed value due to library limitations (of all serialport libraries I found)
const UNLIMITED_TIMEOUT: Duration = Duration::from_millis(9223372035);

fn set_timeout(&mut self, timeout: &Duration) {
serialport::SerialPort::set_timeout(self.as_mut(), *timeout).unwrap()
fn set_timeout(&mut self, timeout: Duration) {
serialport::SerialPort::set_timeout(self.as_mut(), timeout).unwrap()
}
}

Expand Down Expand Up @@ -159,7 +165,7 @@ impl From<std::io::Error> for CommunicationError {
impl From<CEPParseError> for CommunicationError {
fn from(value: CEPParseError) -> Self {
match value {
CEPParseError::Io(e) => Self::Io(e),
CEPParseError::Io(e) => e.into(),
e => Self::CepParsing(e),
}
}
Expand All @@ -169,6 +175,8 @@ impl std::error::Error for CommunicationError {}

#[cfg(test)]
mod tests {
use self::cep::CEPPacketHeader;

use super::*;
use test_case::test_case;

Expand Down Expand Up @@ -197,7 +205,7 @@ mod tests {
impl CommunicationHandle for TestComHandle {
const INTEGRITY_ACK_TIMEOUT: Duration = Duration::from_millis(100);
const UNLIMITED_TIMEOUT: Duration = Duration::MAX;
fn set_timeout(&mut self, _timeout: &Duration) {}
fn set_timeout(&mut self, _timeout: Duration) {}
}

#[test_case(CEPPacket::Ack)]
Expand Down Expand Up @@ -242,7 +250,7 @@ mod tests {
}

#[test]
fn fail_after_retries() {
fn fail_after_retries_send_packet() {
let mut com = TestComHandle::default();
for _ in 0..TestComHandle::DATA_PACKET_RETRIES {
com.data_to_read.append(&mut CEPPacket::Nack.serialize());
Expand All @@ -252,13 +260,38 @@ mod tests {
com.send_packet(&CEPPacket::Data(vec![1, 2, 3])),
Err(CommunicationError::PacketInvalidError)
));
dbg!(&com.data_to_read);
assert!(com.data_to_read.is_empty());
assert_eq!(
com.written_data,
CEPPacket::Data(vec![1, 2, 3]).serialize().repeat(TestComHandle::DATA_PACKET_RETRIES)
);
}

#[test]
fn fail_after_retries_receive_packet() {
let mut com = TestComHandle::default();
com.data_to_read.extend([CEPPacketHeader::Data as u8, 1, 0, 2, 1, 1, 1, 1].repeat(TestComHandle::DATA_PACKET_RETRIES));

let err = com.receive_packet().expect_err("Invalid data packet should fail");
assert!(matches!(err, CommunicationError::PacketInvalidError));
assert!(com.data_to_read.is_empty(), "Not read: {:?}", com.data_to_read);
assert_eq!(com.written_data, CEPPacket::Nack.serialize().repeat(TestComHandle::DATA_PACKET_RETRIES));
}

#[test]
fn receive_packet_retries_correctly() {
let mut com = TestComHandle::default();
com.data_to_read.extend([CEPPacketHeader::Data as u8, 1, 0, 2, 1, 1, 1, 1].repeat(TestComHandle::DATA_PACKET_RETRIES-1));
com.data_to_read.append(&mut CEPPacket::Data(vec![2]).serialize());

assert_eq!(com.receive_packet().unwrap(), CEPPacket::Data(vec![2]));
assert!(com.data_to_read.is_empty());
let mut expected = CEPPacket::Nack.serialize().repeat(TestComHandle::DATA_PACKET_RETRIES-1);
expected.append(&mut CEPPacket::Ack.serialize());
assert_eq!(com.written_data, expected);
}

#[test]
fn multi_packet_is_sent_correctly() {
let mut com = TestComHandle::default();
Expand All @@ -271,7 +304,10 @@ mod tests {

assert!(com.data_to_read.is_empty());
for c in chunks {
assert_eq!(com.written_data.drain(0..c.len()+7).as_slice(), CEPPacket::Data(c.to_vec()).serialize());
assert_eq!(
com.written_data.drain(0..c.len() + 7).as_slice(),
CEPPacket::Data(c.to_vec()).serialize()
);
}
assert_eq!(com.written_data, CEPPacket::Eof.serialize());
}
Expand All @@ -291,5 +327,4 @@ mod tests {
assert!(com.data_to_read.is_empty());
assert_eq!(com.written_data, CEPPacket::Ack.serialize().repeat(chunks.len() + 1))
}

}
5 changes: 3 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#![allow(non_snake_case)]
use core::time;
use rppal::gpio::Gpio;
use std::{thread, time::Duration};
use serialport::SerialPort;
use std::thread;
use STS1_EDU_Scheduler::communication::CommunicationHandle;

use simplelog as sl;
Expand Down Expand Up @@ -36,7 +37,7 @@ fn main() -> ! {

// construct a wrapper for UART communication
let mut com = serialport::new(&config.uart, config.baudrate).open().expect("Could not open serial port");
com.set_timeout(&Duration::from_secs(60));
com.set_timeout(<Box<dyn SerialPort> as CommunicationHandle>::UNLIMITED_TIMEOUT);

// construct a wrapper for resources that are shared between different commands
let mut exec = command::ExecutionContext::new("events".to_string(), config.update_pin).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/simulation/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn logfile_is_cleared_after_sent() -> std::io::Result<()> {

simulate_test_store_archive(&mut com, 1).unwrap();
com.send_packet(&CEPPacket::Data(execute_program(1, 0, 3))).unwrap();
com.await_ack(&Duration::MAX).unwrap();
com.await_ack(Duration::MAX).unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));

let _ = simulate_return_result(&mut com, 1, 0).unwrap();
Expand Down
7 changes: 4 additions & 3 deletions tests/simulation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod command_execution;
mod full_run;
mod logging;
mod timeout;

use std::{
io::{Read, Write},
Expand Down Expand Up @@ -58,7 +59,7 @@ impl<T: Read, U: Write> CommunicationHandle for SimulationComHandle<T, U> {
const INTEGRITY_ACK_TIMEOUT: Duration = Duration::MAX;
const UNLIMITED_TIMEOUT: Duration = Duration::MAX;

fn set_timeout(&mut self, _timeout: &std::time::Duration) {}
fn set_timeout(&mut self, _timeout: std::time::Duration) {}
}

fn get_config_str(unique: &str) -> String {
Expand Down Expand Up @@ -103,7 +104,7 @@ pub fn simulate_test_store_archive(
let archive = std::fs::read("tests/student_program.zip").unwrap();
com.send_packet(&CEPPacket::Data(store_archive(program_id)))?;
com.send_multi_packet(&archive)?;
com.await_ack(&Duration::MAX)?;
com.await_ack(Duration::MAX)?;

Ok(())
}
Expand All @@ -115,7 +116,7 @@ pub fn simulate_execute_program(
timeout: u16,
) -> Result<(), CommunicationError> {
com.send_packet(&CEPPacket::Data(execute_program(program_id, timestamp, timeout)))?;
com.await_ack(&Duration::MAX)?;
com.await_ack(Duration::MAX)?;

Ok(())
}
Expand Down
22 changes: 22 additions & 0 deletions tests/simulation/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::time::Duration;
use STS1_EDU_Scheduler::communication::{CommunicationHandle, CEPPacket};
use super::{SimulationComHandle, start_scheduler, get_status};

#[test]
fn integrity_ack_timeout_is_honored() {
let (mut cobc, _socat) = SimulationComHandle::with_socat_proc("integrity_timeout");
let _sched = start_scheduler("integrity_timeout").unwrap();

// Check that delayed ACK is allowed
cobc.send_packet(&CEPPacket::Data(get_status())).unwrap();
std::thread::sleep(Duration::from_millis(500));
assert_eq!(cobc.receive_packet().unwrap(), CEPPacket::Data(vec![0]));

cobc.send_packet(&CEPPacket::Data(get_status())).unwrap();
assert_eq!(CEPPacket::try_from_read(&mut cobc.cobc_in).unwrap(), CEPPacket::Data(vec![0])); // No ACK sent!
std::thread::sleep(Duration::from_millis(1010));

// Timeout passed, normal communication should be possible
cobc.send_packet(&CEPPacket::Data(get_status())).unwrap();
assert_eq!(cobc.receive_packet().unwrap(), CEPPacket::Data(vec![0]));
}
4 changes: 2 additions & 2 deletions tests/software_tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl CommunicationHandle for TestCom {
}

if matches!(packet, CEPPacket::Data(_)) {
self.await_ack(&Self::INTEGRITY_ACK_TIMEOUT)?;
self.await_ack(Self::INTEGRITY_ACK_TIMEOUT)?;
}

Ok(())
Expand All @@ -79,7 +79,7 @@ impl CommunicationHandle for TestCom {
const INTEGRITY_ACK_TIMEOUT: std::time::Duration = Duration::MAX;
const UNLIMITED_TIMEOUT: std::time::Duration = Duration::MAX;

fn set_timeout(&mut self, _timeout: &std::time::Duration) {}
fn set_timeout(&mut self, _timeout: std::time::Duration) {}
}

impl TestCom {
Expand Down

0 comments on commit e2a6fcc

Please sign in to comment.