Skip to content

Commit

Permalink
Merge pull request #142 from SpaceTeam/dosi-socket
Browse files Browse the repository at this point in the history
Implement a socket listener for events
  • Loading branch information
florg-32 authored Jun 9, 2024
2 parents abab5aa + b85274a commit 0bdb79d
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 19 deletions.
1 change: 1 addition & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ baudrate = 921600
heartbeat_pin = 34
update_pin = 35
heartbeat_freq = 10 # Hz
socket = "/tmp/scheduler_socket"
19 changes: 14 additions & 5 deletions examples/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::{
error::Error, io::{Read, Write}, path::Path, process::{Child, ChildStdin, ChildStdout, Stdio}, time::Duration
error::Error,
io::{Read, Write},
path::Path,
process::{Child, ChildStdin, ChildStdout, Stdio},
time::Duration,
};

use STS1_EDU_Scheduler::communication::{CEPPacket, CommunicationHandle};
Expand Down Expand Up @@ -71,7 +75,10 @@ fn write_scheduler_config(path: &str) {
const COMMANDS: &[&str] =
&["StoreArchive", "ExecuteProgram", "StopProgram", "GetStatus", "ReturnResult", "UpdateTime"];

fn inquire_and_send_command(edu: &mut impl CommunicationHandle, path: &str) -> Result<(), Box<dyn Error>> {
fn inquire_and_send_command(
edu: &mut impl CommunicationHandle,
path: &str,
) -> Result<(), Box<dyn Error>> {
let mut select = inquire::Select::new("Select command", COMMANDS.to_vec());
if Path::new(&format!("{path}/updatepin")).exists() {
select.help_message = Some("Update Pin is high");
Expand Down Expand Up @@ -122,18 +129,20 @@ fn inquire_and_send_command(edu: &mut impl CommunicationHandle, path: &str) -> R
n => println!("Unknown event {n}"),
}
}
},
}
"ReturnResult" => {
let program_id = inquire::Text::new("Program id:").prompt()?.parse()?;
let timestamp = inquire::Text::new("Timestamp:").prompt()?.parse()?;
let result_path = inquire::Text::new("File path for returned result:").with_default("./result.tar").prompt()?;
let result_path = inquire::Text::new("File path for returned result:")
.with_default("./result.tar")
.prompt()?;
edu.send_packet(&CEPPacket::Data(return_result(program_id, timestamp)))?;
match edu.receive_multi_packet() {
Ok(data) => {
std::fs::write(result_path, data)?;
edu.send_packet(&CEPPacket::Ack)?;
println!("Wrote result to file");
},
}
Err(e) => println!("Received {:?}", e),
}
}
Expand Down
33 changes: 27 additions & 6 deletions src/command/execution_context.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
str::FromStr,
sync::{Arc, Mutex},
thread,
};
Expand Down Expand Up @@ -100,30 +101,32 @@ impl UpdatePin {
}

/// Struct used for storing information about a finished student program
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug)]
pub struct ProgramStatus {
pub program_id: u16,
pub timestamp: u32,
pub exit_code: u8,
}

/// Struct used for storing information of a result, waiting to be sent
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[derive(Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug)]
pub struct ResultId {
pub program_id: u16,
pub timestamp: u32,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq, Debug)]
pub enum Event {
Status(ProgramStatus),
Result(ResultId),
EnableDosimeter,
DisableDosimeter,
}

impl Event {
pub fn to_bytes(self) -> Vec<u8> {
impl From<Event> for Vec<u8> {
fn from(value: Event) -> Self {
let mut v = Vec::new();
match self {
match value {
Event::Status(s) => {
v.push(1);
v.extend(s.program_id.to_le_bytes());
Expand All @@ -135,7 +138,25 @@ impl Event {
v.extend(r.program_id.to_le_bytes());
v.extend(r.timestamp.to_le_bytes());
}
Event::EnableDosimeter => {
v.push(3);
}
Event::DisableDosimeter => {
v.push(4);
}
}
v
}
}

impl FromStr for Event {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"dosimeter/on" => Ok(Event::EnableDosimeter),
"dosimeter/off" => Ok(Event::DisableDosimeter),
_ => Err(()),
}
}
}
8 changes: 4 additions & 4 deletions src/command/get_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ pub fn get_status(
l_exec.event_vec.as_ref().iter().position(|x| matches!(x, Event::Status(_)))
{
let event = l_exec.event_vec[index];
com.send_packet(&CEPPacket::Data(event.to_bytes()))?;
com.send_packet(&CEPPacket::Data(event.into()))?;
l_exec.event_vec.remove(index)?;
} else {
let event = *l_exec.event_vec.as_ref().last().unwrap(); // Safe, because we know it is not empty
com.send_packet(&CEPPacket::Data(event.to_bytes()))?;
let event = *l_exec.event_vec.as_ref().first().unwrap(); // Safe, because we know it is not empty
com.send_packet(&CEPPacket::Data(event.into()))?;

if !matches!(event, Event::Result(_)) {
// Results are removed when deleted
l_exec.event_vec.pop()?;
l_exec.event_vec.remove(0)?;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/communication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod cep;
pub use cep::CEPPacket;

pub mod socket;

use std::{
io::{Read, Write},
time::Duration,
Expand Down
158 changes: 158 additions & 0 deletions src/communication/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::{
io::{BufRead, BufReader, Write},
os::unix::net::{UnixListener, UnixStream},
path::Path,
str::FromStr,
};

pub struct UnixSocketParser {
listener: UnixListener,
connection: Option<BufReader<UnixStream>>,
}

impl UnixSocketParser {
pub fn new(path: &str) -> std::io::Result<Self> {
let _ = std::fs::remove_file(path);
Ok(Self { listener: UnixListener::bind(path)?, connection: None })
}

pub fn read_object<T: FromStr>(&mut self) -> std::io::Result<T> {
if self.connection.is_none() {
let (stream, _) = self.listener.accept()?;
self.connection = Some(BufReader::new(stream));
}

let con = self.connection.as_mut().unwrap();
let mut line = String::new();
con.read_line(&mut line)?;

if !line.ends_with('\n') || line.is_empty() {
self.connection.take();
return Err(std::io::ErrorKind::ConnectionAborted.into());
}

if line == Self::SHUTDOWN_STRING {
return Err(std::io::ErrorKind::Other.into());
}

T::from_str(line.trim_end()).map_err(|_| std::io::ErrorKind::InvalidData.into())
}

const SHUTDOWN_STRING: &'static str = "shutdown\n";
pub fn _shutdown(path: impl AsRef<Path>) -> std::io::Result<()> {
let mut stream = UnixStream::connect(path)?;
stream.write_all(Self::SHUTDOWN_STRING.as_bytes())?;

Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};

use super::*;

fn get_unique_tmp_path() -> String {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let value = COUNTER.fetch_add(1, Ordering::Relaxed);
let path = format!("/tmp/STS1_socket_test_{value}");
let _ = std::fs::remove_file(&path);
path
}

#[test]
fn can_shutdown() {
let path = get_unique_tmp_path();
let mut rx = UnixSocketParser::new(&path).unwrap();

UnixSocketParser::_shutdown(&path).unwrap();

assert_eq!(std::io::ErrorKind::Other, rx.read_object::<i32>().unwrap_err().kind());
}

#[test]
fn can_parse_single_value() {
let path = get_unique_tmp_path();
let mut rx = UnixSocketParser::new(&path).unwrap();

let mut stream = UnixStream::connect(&path).unwrap();
writeln!(stream, "1234").unwrap();

assert_eq!(1234, rx.read_object::<i32>().unwrap());

UnixSocketParser::_shutdown(path).unwrap();
}

#[test]
fn can_parse_multiple_values() {
let path = get_unique_tmp_path();
let mut rx = UnixSocketParser::new(&path).unwrap();

let mut stream = UnixStream::connect(&path).unwrap();

const REPS: usize = 100;
for i in 0..REPS {
writeln!(stream, "{i}").unwrap();
}

for i in 0..REPS {
assert_eq!(i, rx.read_object::<usize>().unwrap());
}

UnixSocketParser::_shutdown(path).unwrap();
}

#[test]
fn can_reconnect_multiple_times() {
let path = get_unique_tmp_path();
let mut rx = UnixSocketParser::new(&path).unwrap();

for i in 0..10 {
{
let mut stream = UnixStream::connect(&path).unwrap();
writeln!(stream, "{i}").unwrap();
}

assert_eq!(i, rx.read_object::<u8>().unwrap());
assert_eq!(
rx.read_object::<u8>().unwrap_err().kind(),
std::io::ErrorKind::ConnectionAborted
);
}

UnixSocketParser::_shutdown(path).unwrap();
}

#[test]
fn can_deal_with_invalid_data() {
let path = get_unique_tmp_path();
let mut rx = UnixSocketParser::new(&path).unwrap();

let mut stream = UnixStream::connect(&path).unwrap();
writeln!(stream, "invalid").unwrap();
assert_eq!(std::io::ErrorKind::InvalidData, rx.read_object::<u64>().unwrap_err().kind());

writeln!(stream, "123").unwrap();
assert_eq!(123, rx.read_object::<u64>().unwrap());

UnixSocketParser::_shutdown(path).unwrap();
}

#[test]
fn can_reconnect_after_midline_abort() {
let path = get_unique_tmp_path();
let mut rx = UnixSocketParser::new(&path).unwrap();

{
let mut stream = UnixStream::connect(&path).unwrap();
write!(stream, "1234").unwrap();
}

let mut stream = UnixStream::connect(&path).unwrap();
writeln!(stream, "5647").unwrap();

rx.read_object::<u32>().unwrap_err();
assert_eq!(5647, rx.read_object::<u32>().unwrap());
}
}
31 changes: 30 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
#![allow(non_snake_case)]
use command::ExecutionContext;
use communication::socket::UnixSocketParser;
use core::time;
use rppal::gpio::Gpio;
use serialport::SerialPort;
use std::thread;
use std::{
io::ErrorKind,
sync::{Arc, Mutex},
thread,
};
use STS1_EDU_Scheduler::communication::CommunicationHandle;

use simplelog as sl;

use crate::command::Event;

mod command;
mod communication;

Expand All @@ -17,6 +25,7 @@ struct Configuration {
heartbeat_pin: u8,
update_pin: u8,
heartbeat_freq: u64,
socket: String,
}

fn main() -> ! {
Expand Down Expand Up @@ -45,6 +54,10 @@ fn main() -> ! {
// construct a wrapper for resources that are shared between different commands
let mut exec = command::ExecutionContext::new("events".to_string(), config.update_pin).unwrap();

let socket_rx = communication::socket::UnixSocketParser::new(&config.socket).unwrap();
let socket_context = exec.clone();
std::thread::spawn(move || event_socket_loop(socket_context, socket_rx));

// start a thread that will update the heartbeat pin
thread::spawn(move || heartbeat_loop(config.heartbeat_pin, config.heartbeat_freq));

Expand Down Expand Up @@ -72,6 +85,22 @@ fn heartbeat_loop(heartbeat_pin: u8, freq: u64) -> ! {
}
}

fn event_socket_loop(context: Arc<Mutex<ExecutionContext>>, mut socket: UnixSocketParser) {
loop {
let s = socket.read_object::<Event>();
let event = match s {
Ok(e) => e,
Err(ref e) if e.kind() == ErrorKind::Other => break,
Err(_) => continue,
};

log::info!("Received on socket: {event:?}");
let mut context = context.lock().unwrap();
context.event_vec.push(event).unwrap();
context.check_update_pin();
}
}

/// Tries to create a directory, but only returns an error if the path does not already exists
fn create_directory_if_not_exists(path: impl AsRef<std::path::Path>) -> std::io::Result<()> {
match std::fs::create_dir(path) {
Expand Down
Loading

0 comments on commit 0bdb79d

Please sign in to comment.