Skip to content

Commit

Permalink
Fix timeout handling, allow more customizable timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
raffber committed Sep 26, 2024
1 parent 6fb161c commit e643a4b
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 65 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 18 additions & 6 deletions comsrv/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,17 +124,20 @@ impl App {
instrument: ScpiInstrument::Visa(instr),
request,
lock,
} => self.handle_visa(instr, request, lock).await,
timeout,
} => self.handle_visa(instr, request, lock, timeout).await,
Request::Scpi {
instrument: ScpiInstrument::Vxi(instr),
request,
lock,
} => self.handle_vxi(instr, request, lock).await,
timeout,
} => self.handle_vxi(instr, request, lock, timeout).await,
Request::Prologix {
instrument,
request,
lock,
} => self.handle_prologix(instrument, request, lock).await,
timeout,
} => self.handle_prologix(instrument, request, lock, timeout).await,
Request::Sigrok { instrument, request } => {
sigrok::read(&instrument.address, request).await.map(Response::Sigrok)
}
Expand Down Expand Up @@ -279,22 +282,29 @@ impl App {
instr: VisaInstrument,
req: ScpiRequest,
lock: Option<Uuid>,
timeout: Option<comsrv_protocol::Duration>,
) -> crate::Result<Response> {
self.inventories
.visa
.wait_connect(&self.server, &instr.address, lock.as_ref())
.await?
.request(req)
.request(req, timeout.map(|x| x.into()))
.await
.map(Response::Scpi)
}

async fn handle_vxi(&self, instr: VxiInstrument, req: ScpiRequest, lock: Option<Uuid>) -> crate::Result<Response> {
async fn handle_vxi(
&self,
instr: VxiInstrument,
req: ScpiRequest,
lock: Option<Uuid>,
timeout: Option<comsrv_protocol::Duration>,
) -> crate::Result<Response> {
self.inventories
.vxi
.wait_connect(&self.server, &instr.host, lock.as_ref())
.await?
.request(req)
.request(req, timeout.map(|x| x.into()))
.await
.map(Response::Scpi)
}
Expand All @@ -304,6 +314,7 @@ impl App {
instr: PrologixInstrument,
req: PrologixRequest,
lock: Option<Uuid>,
timeout: Option<comsrv_protocol::Duration>,
) -> crate::Result<Response> {
let ret = self
.inventories
Expand All @@ -313,6 +324,7 @@ impl App {
.request(serial::Request::Prologix {
gpib_addr: req.addr,
req: req.scpi,
timeout: timeout.map(|x| x.into()),
})
.await?;
match ret {
Expand Down
14 changes: 8 additions & 6 deletions comsrv/src/protocol/prologix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use anyhow::anyhow;
use comsrv_protocol::{ScpiRequest, ScpiResponse};
use std::time::{Duration, Instant};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::time::timeout;
use tokio::time;

const PROLOGIX_TIMEOUT: f32 = 1.0;

Expand All @@ -27,6 +27,7 @@ pub async fn handle_prologix_request<T: AsyncRead + AsyncWrite + Unpin>(
serial: &mut T,
addr: u8,
req: ScpiRequest,
timeout: Option<Duration>,
) -> crate::Result<ScpiResponse> {
log::debug!("handling prologix request for address {}", addr);
let _ = read_all(serial).await.map_err(crate::Error::transport)?;
Expand All @@ -40,7 +41,8 @@ pub async fn handle_prologix_request<T: AsyncRead + AsyncWrite + Unpin>(
ScpiRequest::QueryString(x) => {
write_prologix(serial, x).await?;
write(serial, "++read eoi\n").await?;
let reply = read_prologix(serial).await?;
let timeout = timeout.unwrap_or_else(|| Duration::from_secs_f32(PROLOGIX_TIMEOUT));
let reply = read_prologix(serial, timeout).await?;
Ok(ScpiResponse::String(reply))
}
ScpiRequest::QueryBinary(_) => {
Expand All @@ -67,12 +69,12 @@ async fn write_prologix<T: AsyncWrite + Unpin>(serial: &mut T, mut msg: String)
serial.write(msg.as_bytes()).await.map(|_| ()).map_err(Error::transport)
}

async fn read_prologix<T: AsyncRead + Unpin>(serial: &mut T) -> crate::Result<String> {
async fn read_prologix<T: AsyncRead + Unpin>(serial: &mut T, timeout: Duration) -> crate::Result<String> {
let start = Instant::now();
let mut ret = Vec::new();
loop {
let mut x = [0; 1];
match timeout(Duration::from_secs_f32(PROLOGIX_TIMEOUT), serial.read_exact(&mut x)).await {
match time::timeout(timeout, serial.read_exact(&mut x)).await {
Ok(Ok(_)) => {
let x = x[0];
if x == b'\n' {
Expand All @@ -89,8 +91,8 @@ async fn read_prologix<T: AsyncRead + Unpin>(serial: &mut T) -> crate::Result<St
return Err(Error::protocol_timeout());
}
};
let delta = start.elapsed().as_secs_f32();
if delta > PROLOGIX_TIMEOUT {
let delta = start.elapsed();
if delta > timeout {
return Err(Error::protocol_timeout());
}
}
Expand Down
11 changes: 9 additions & 2 deletions comsrv/src/transport/serial/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub enum Request {
Prologix {
gpib_addr: u8,
req: ScpiRequest,
timeout: Option<Duration>,
},
Bytes {
params: SerialParams,
Expand Down Expand Up @@ -172,12 +173,18 @@ impl Handler {

async fn handle_request(&mut self, req: Request, serial: &mut SerialStream) -> crate::Result<Response> {
match req {
Request::Prologix { gpib_addr, req } => {
Request::Prologix {
gpib_addr,
req,
timeout,
} => {
if !self.prologix_initialized {
init_prologix(serial).await?;
self.prologix_initialized = true;
}
handle_prologix_request(serial, gpib_addr, req).await.map(Response::Scpi)
handle_prologix_request(serial, gpib_addr, req, timeout)
.await
.map(Response::Scpi)
}
Request::Bytes { params: _, req } => {
self.prologix_initialized = false;
Expand Down
52 changes: 35 additions & 17 deletions comsrv/src/transport/visa/asynced.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

use async_trait::async_trait;
use tokio::sync::oneshot;
Expand All @@ -18,6 +19,7 @@ enum Msg {
Scpi {
request: ScpiRequest,
reply: oneshot::Sender<crate::Result<ScpiResponse>>,
timeout: Option<Duration>,
},
Drop,
}
Expand All @@ -29,26 +31,18 @@ impl Instrument {
thread::spawn(move || {
let mut oinstr = None;
while let Ok(msg) = rx.recv() {
let (request, reply) = match msg {
Msg::Scpi { request, reply } => (request, reply),
match msg {
Msg::Scpi {
request,
reply,
timeout,
} => {
Self::run_request(&mut oinstr, &addr, request, reply, timeout);
}
Msg::Drop => {
break;
}
};
let instr = if let Some(instr) = oinstr.take() {
Ok(instr)
} else {
BlockingInstrument::open(&addr)
};
match instr {
Ok(instr) => {
let _ = reply.send(instr.handle_scpi(request));
oinstr.replace(instr);
}
Err(err) => {
let _ = reply.send(Err(err.into()));
}
}
}
});

Expand All @@ -57,11 +51,35 @@ impl Instrument {
}
}

pub async fn request(self, req: ScpiRequest) -> crate::Result<ScpiResponse> {
fn run_request(
oinstr: &mut Option<BlockingInstrument>,
addr: &str,
request: ScpiRequest,
reply: oneshot::Sender<crate::Result<ScpiResponse>>,
timeout: Option<Duration>,
) {
let instr = if let Some(instr) = oinstr.take() {
Ok(instr)
} else {
BlockingInstrument::open(&addr)
};
match instr {
Ok(instr) => {
let _ = reply.send(instr.handle_scpi(request, timeout));
oinstr.replace(instr);
}
Err(err) => {
let _ = reply.send(Err(err.into()));
}
}
}

pub async fn request(self, req: ScpiRequest, timeout: Option<Duration>) -> crate::Result<ScpiResponse> {
let (tx, rx) = oneshot::channel();
let thmsg = Msg::Scpi {
request: req,
reply: tx,
timeout: timeout,
};
self.tx
.lock()
Expand Down
5 changes: 4 additions & 1 deletion comsrv/src/transport/visa/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use serde::{Deserialize, Serialize};

use super::visa_sys::Instrument as VisaInstrument;
Expand Down Expand Up @@ -82,7 +84,8 @@ impl Instrument {
self.instr.addr()
}

pub fn handle_scpi(&self, req: ScpiRequest) -> crate::Result<ScpiResponse> {
pub fn handle_scpi(&self, req: ScpiRequest, _: Option<Duration>) -> crate::Result<ScpiResponse> {
// TODO: handle timeout
match req {
ScpiRequest::Write(x) => self
.write(x)
Expand Down
68 changes: 35 additions & 33 deletions comsrv/src/transport/vxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,13 @@ pub struct Instrument {

#[derive(Clone)]
enum Request {
Scpi(ScpiRequest),
Scpi {
scpi: ScpiRequest,
timeout: Option<Duration>,
},
DropCheck,
}

impl Request {
fn into_scpi(self) -> Option<ScpiRequest> {
match self {
Request::Scpi(x) => Some(x),
_ => None,
}
}
}

enum Response {
Scpi(ScpiResponse),
Done,
Expand All @@ -53,8 +47,8 @@ impl Instrument {
}
}

pub async fn request(&mut self, req: ScpiRequest) -> crate::Result<ScpiResponse> {
let req = Request::Scpi(req);
pub async fn request(&mut self, req: ScpiRequest, timeout: Option<Duration>) -> crate::Result<ScpiResponse> {
let req = Request::Scpi { scpi: req, timeout };
match self.inner.request(req).await? {
Response::Scpi(x) => Ok(x),
Response::Done => Err(crate::Error::internal(anyhow!("Invalid response for request."))),
Expand Down Expand Up @@ -113,9 +107,13 @@ impl Handler {
ret.map_err(map_error)
}

async fn handle_request_timeout(client: &mut CoreClient, req: ScpiRequest) -> crate::Result<ScpiResponse> {
async fn handle_request_timeout(
client: &mut CoreClient,
req: ScpiRequest,
timeout: Duration,
) -> crate::Result<ScpiResponse> {
let fut = Self::handle_request(client, req);
tokio::time::timeout(DEFAULT_CONNECTION_TIMEOUT, fut)
tokio::time::timeout(timeout, fut)
.await
.map_err(|_| crate::Error::protocol_timeout())?
}
Expand Down Expand Up @@ -181,30 +179,34 @@ impl IoHandler for Handler {
} else {
self.connect().await?
};
// save because drop check_check handled other
let req = req.into_scpi().unwrap();
let ret = Self::handle_request_timeout(&mut client, req.clone()).await;
match ret {
Ok(ret) => {
self.client.replace(client);
self.spawn_drop_check(ctx);
Ok(Response::Scpi(ret))
}
Err(err) => {
drop(client);
if err.should_retry() {
sleep(Duration::from_millis(100)).await;
let mut client = self.connect().await?;
let ret = Self::handle_request_timeout(&mut client, req).await;
if ret.is_ok() {
match req {
Request::Scpi { scpi: req, timeout } => {
let timeout = timeout.unwrap_or(DEFAULT_CONNECTION_TIMEOUT);
let ret = Self::handle_request_timeout(&mut client, req.clone(), timeout).await;
match ret {
Ok(ret) => {
self.client.replace(client);
self.spawn_drop_check(ctx);
Ok(Response::Scpi(ret))
}
Err(err) => {
drop(client);
if err.should_retry() {
sleep(Duration::from_millis(100)).await;
let mut client = self.connect().await?;
let ret = Self::handle_request_timeout(&mut client, req, timeout).await;
if ret.is_ok() {
self.client.replace(client);
self.spawn_drop_check(ctx);
}
Ok(Response::Scpi(ret?))
} else {
Err(err)
}
}
Ok(Response::Scpi(ret?))
} else {
Err(err)
}
}
Request::DropCheck => Ok(Response::Done),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
byteorder = "1"
thiserror = "1"
anyhow = { version = "1", features = ["backtrace"] }

[dev-dependencies]
serde_json = "1.0"
4 changes: 4 additions & 0 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,16 @@ pub enum Request {
request: ScpiRequest,
#[serde(skip_serializing_if = "Option::is_none", default)]
lock: Option<Uuid>,
#[serde(default = "Option::default")]
timeout: Option<Duration>,
},
Prologix {
instrument: PrologixInstrument,
request: PrologixRequest,
#[serde(skip_serializing_if = "Option::is_none", default)]
lock: Option<Uuid>,
#[serde(default = "Option::default")]
timeout: Option<Duration>,
},
Sigrok {
instrument: SigrokInstrument,
Expand Down

0 comments on commit e643a4b

Please sign in to comment.