From 352e15507bdca148a114bcd348bd66705b45d8ce Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Sun, 13 Oct 2024 13:48:40 +0200 Subject: [PATCH 1/9] rtu: Send request and receive response one after another --- examples/rtu-client.rs | 13 ++- src/frame/mod.rs | 20 ++++- src/frame/rtu.rs | 96 ++++++++++++++++++++++- src/prelude.rs | 2 + src/service/mod.rs | 15 ---- src/service/rtu.rs | 174 +++++++++++++++++++---------------------- src/service/tcp.rs | 3 +- src/slave.rs | 1 + 8 files changed, 207 insertions(+), 117 deletions(-) diff --git a/examples/rtu-client.rs b/examples/rtu-client.rs index bf896542..0e6c5c23 100644 --- a/examples/rtu-client.rs +++ b/examples/rtu-client.rs @@ -15,13 +15,18 @@ async fn main() -> Result<(), Box> { let builder = tokio_serial::new(tty_path, 19200); let port = SerialStream::open(&builder).unwrap(); - let mut ctx = rtu::attach_slave(port, slave); + let mut conn = rtu::ClientConnection::new(port); println!("Reading a sensor value"); - let rsp = ctx.read_holding_registers(0x082B, 2).await??; - println!("Sensor value is: {rsp:?}"); + let request = Request::ReadHoldingRegisters(0x082B, 2); + let request_context = conn.send_request(request, slave).await?; + let Response::ReadHoldingRegisters(value) = conn.recv_response(request_context).await?? else { + // The response variant will always match its corresponding request variant if successful. + unreachable!(); + }; + println!("Sensor value is: {value:?}"); println!("Disconnecting"); - ctx.disconnect().await?; + conn.disconnect().await?; Ok(()) } diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 336c436f..26ce98ed 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -18,7 +18,7 @@ use crate::bytes::Bytes; /// A Modbus function code. /// /// All function codes as defined by the protocol specification V1.1b3. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum FunctionCode { /// 01 (0x01) Read Coils. ReadCoils, @@ -586,6 +586,24 @@ impl error::Error for ExceptionResponse { } } +/// Check that `req_hdr` is the same `Header` as `rsp_hdr`. +/// +/// # Errors +/// +/// If the 2 headers are different, an error message with the details will be returned. +#[cfg(any(feature = "rtu", feature = "tcp"))] +pub(crate) fn verify_response_header( + req_hdr: &H, + rsp_hdr: &H, +) -> Result<(), String> { + if req_hdr != rsp_hdr { + return Err(format!( + "expected/request = {req_hdr:?}, actual/response = {rsp_hdr:?}" + )); + } + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/frame/rtu.rs b/src/frame/rtu.rs index b483fd92..0576928e 100644 --- a/src/frame/rtu.rs +++ b/src/frame/rtu.rs @@ -3,9 +3,22 @@ use super::*; -use crate::slave::SlaveId; +use crate::{ProtocolError, Result, SlaveId}; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct RequestContext { + function_code: FunctionCode, + header: Header, +} + +impl RequestContext { + #[must_use] + pub const fn function_code(&self) -> FunctionCode { + self.function_code + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(crate) struct Header { pub(crate) slave_id: SlaveId, } @@ -16,12 +29,60 @@ pub struct RequestAdu<'a> { pub(crate) pdu: RequestPdu<'a>, } +impl RequestAdu<'_> { + pub(crate) fn context(&self) -> RequestContext { + RequestContext { + function_code: self.pdu.0.function_code(), + header: self.hdr, + } + } +} + #[derive(Debug, Clone)] pub(crate) struct ResponseAdu { pub(crate) hdr: Header, pub(crate) pdu: ResponsePdu, } +impl ResponseAdu { + pub(crate) fn try_into_response(self, request_context: RequestContext) -> Result { + let RequestContext { + function_code: req_function_code, + header: req_hdr, + } = request_context; + + let ResponseAdu { + hdr: rsp_hdr, + pdu: rsp_pdu, + } = self; + let ResponsePdu(result) = rsp_pdu; + + if let Err(message) = verify_response_header(&req_hdr, &rsp_hdr) { + return Err(ProtocolError::HeaderMismatch { message, result }.into()); + } + + // Match function codes of request and response. + let rsp_function_code = match &result { + Ok(response) => response.function_code(), + Err(ExceptionResponse { function, .. }) => *function, + }; + if req_function_code != rsp_function_code { + return Err(ProtocolError::FunctionCodeMismatch { + request: req_function_code, + result, + } + .into()); + } + + Ok(result.map_err( + |ExceptionResponse { + function: _, + exception, + }| exception, + )) + } +} + impl<'a> From> for Request<'a> { fn from(from: RequestAdu<'a>) -> Self { from.pdu.into() @@ -37,3 +98,34 @@ impl<'a> From> for SlaveRequest<'a> { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn validate_same_headers() { + // Given + let req_hdr = Header { slave_id: 0 }; + let rsp_hdr = Header { slave_id: 0 }; + + // When + let result = verify_response_header(&req_hdr, &rsp_hdr); + + // Then + assert!(result.is_ok()); + } + + #[test] + fn invalid_validate_not_same_slave_id() { + // Given + let req_hdr = Header { slave_id: 0 }; + let rsp_hdr = Header { slave_id: 5 }; + + // When + let result = verify_response_header(&req_hdr, &rsp_hdr); + + // Then + assert!(result.is_err()); + } +} diff --git a/src/prelude.rs b/src/prelude.rs index 33c694e1..f7695096 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -12,6 +12,8 @@ pub use crate::client; #[cfg(feature = "rtu")] pub mod rtu { pub use crate::client::rtu::*; + pub use crate::frame::rtu::RequestContext; + pub use crate::service::rtu::ClientConnection; } #[allow(missing_docs)] diff --git a/src/service/mod.rs b/src/service/mod.rs index b3ba92e0..212f224b 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -26,18 +26,3 @@ where _ => Err(err), }) } - -/// Check that `req_hdr` is the same `Header` as `rsp_hdr`. -/// -/// # Errors -/// -/// If the 2 headers are different, an error message with the details will be returned. -#[cfg(any(feature = "rtu", feature = "tcp"))] -fn verify_response_header(req_hdr: &H, rsp_hdr: &H) -> Result<(), String> { - if req_hdr != rsp_hdr { - return Err(format!( - "expected/request = {req_hdr:?}, actual/response = {rsp_hdr:?}" - )); - } - Ok(()) -} diff --git a/src/service/rtu.rs b/src/service/rtu.rs index 0f4be17f..eb00beb7 100644 --- a/src/service/rtu.rs +++ b/src/service/rtu.rs @@ -11,102 +11,119 @@ use crate::{ codec, frame::{rtu::*, *}, slave::*, - ProtocolError, Result, + Result, }; -use super::{disconnect, verify_response_header}; +use super::disconnect; /// Modbus RTU client #[derive(Debug)] -pub(crate) struct Client { - framed: Option>, - slave_id: SlaveId, +pub struct ClientConnection { + framed: Framed, } -impl Client +impl ClientConnection where T: AsyncRead + AsyncWrite + Unpin, { - pub(crate) fn new(transport: T, slave: Slave) -> Self { + pub fn new(transport: T) -> Self { let framed = Framed::new(transport, codec::rtu::ClientCodec::default()); - let slave_id = slave.into(); - Self { - slave_id, - framed: Some(framed), - } + Self { framed } } - fn framed(&mut self) -> io::Result<&mut Framed> { - let Some(framed) = &mut self.framed else { - return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected")); - }; - Ok(framed) + pub async fn disconnect(self) -> io::Result<()> { + let Self { framed } = self; + disconnect(framed).await + } + + pub async fn send_request<'a>( + &mut self, + request: Request<'a>, + server: Slave, + ) -> io::Result { + self.send_request_pdu(request, server).await } - fn next_request_adu<'a, R>(&self, req: R) -> RequestAdu<'a> + async fn send_request_pdu<'a, R>( + &mut self, + request: R, + server: Slave, + ) -> io::Result where R: Into>, { - let slave_id = self.slave_id; - let hdr = Header { slave_id }; - let pdu = req.into(); - RequestAdu { hdr, pdu } - } - - async fn call(&mut self, req: Request<'_>) -> Result { - log::debug!("Call {:?}", req); + let request_adu = request_adu(request, server); + let context = request_adu.context(); - let req_function_code = req.function_code(); - let req_adu = self.next_request_adu(req); - let req_hdr = req_adu.hdr; - - let framed = self.framed()?; + let Self { framed } = self; framed.read_buffer_mut().clear(); - framed.send(req_adu).await?; + framed.send(request_adu).await?; + + Ok(context) + } - let res_adu = framed + pub async fn recv_response(&mut self, request_context: RequestContext) -> Result { + let res_adu = self + .framed .next() .await .unwrap_or_else(|| Err(io::Error::from(io::ErrorKind::BrokenPipe)))?; - let ResponseAdu { - hdr: res_hdr, - pdu: res_pdu, - } = res_adu; - let ResponsePdu(result) = res_pdu; - - // Match headers of request and response. - if let Err(message) = verify_response_header(&req_hdr, &res_hdr) { - return Err(ProtocolError::HeaderMismatch { message, result }.into()); - } - // Match function codes of request and response. - let rsp_function_code = match &result { - Ok(response) => response.function_code(), - Err(ExceptionResponse { function, .. }) => *function, - }; - if req_function_code != rsp_function_code { - return Err(ProtocolError::FunctionCodeMismatch { - request: req_function_code, - result, - } - .into()); - } + res_adu.try_into_response(request_context) + } +} + +fn request_adu<'a, R>(req: R, server: Slave) -> RequestAdu<'a> +where + R: Into>, +{ + let hdr = Header { + slave_id: server.into(), + }; + let pdu = req.into(); + RequestAdu { hdr, pdu } +} + +/// Modbus RTU client +#[derive(Debug)] +pub(crate) struct Client { + connection: Option>, + slave_id: SlaveId, +} - Ok(result.map_err( - |ExceptionResponse { - function: _, - exception, - }| exception, - )) +impl Client +where + T: AsyncRead + AsyncWrite + Unpin, +{ + pub(crate) fn new(transport: T, slave: Slave) -> Self { + let connection = ClientConnection::new(transport); + let slave_id = slave.into(); + Self { + connection: Some(connection), + slave_id, + } } async fn disconnect(&mut self) -> io::Result<()> { - let Some(framed) = self.framed.take() else { + let Some(connection) = self.connection.take() else { // Already disconnected. return Ok(()); }; - disconnect(framed).await + disconnect(connection.framed).await + } + + async fn call(&mut self, request: Request<'_>) -> Result { + log::debug!("Call {:?}", request); + + let Some(connection) = &mut self.connection else { + return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected").into()); + }; + + let request_context = connection + .send_request(request, Slave(self.slave_id)) + .await?; + connection.recv_response(request_context).await } } @@ -139,36 +156,7 @@ mod tests { }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Result}; - use crate::{ - service::{rtu::Header, verify_response_header}, - Error, - }; - - #[test] - fn validate_same_headers() { - // Given - let req_hdr = Header { slave_id: 0 }; - let rsp_hdr = Header { slave_id: 0 }; - - // When - let result = verify_response_header(&req_hdr, &rsp_hdr); - - // Then - assert!(result.is_ok()); - } - - #[test] - fn invalid_validate_not_same_slave_id() { - // Given - let req_hdr = Header { slave_id: 0 }; - let rsp_hdr = Header { slave_id: 5 }; - - // When - let result = verify_response_header(&req_hdr, &rsp_hdr); - - // Then - assert!(result.is_err()); - } + use crate::Error; #[derive(Debug)] struct MockTransport; diff --git a/src/service/tcp.rs b/src/service/tcp.rs index 0aea3092..38bb6a60 100644 --- a/src/service/tcp.rs +++ b/src/service/tcp.rs @@ -11,9 +11,8 @@ use crate::{ codec, frame::{ tcp::{Header, RequestAdu, ResponseAdu, TransactionId, UnitId}, - RequestPdu, ResponsePdu, + verify_response_header, RequestPdu, ResponsePdu, }, - service::verify_response_header, slave::*, ExceptionResponse, ProtocolError, Request, Response, Result, }; diff --git a/src/slave.rs b/src/slave.rs index 0fa5b945..9dc77651 100644 --- a/src/slave.rs +++ b/src/slave.rs @@ -10,6 +10,7 @@ pub type SlaveId = u8; /// A single byte for addressing Modbus slave devices. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[repr(transparent)] pub struct Slave(pub SlaveId); impl Slave { From 2f1747d8e66513b30155725fa4d9dbdbf4e0b6b1 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Sat, 12 Oct 2024 12:44:24 +0200 Subject: [PATCH 2/9] Extend example --- examples/rtu-client.rs | 46 ++++++++++++++++++++++++++++-------------- src/client/rtu.rs | 13 +++++++++++- src/service/rtu.rs | 9 +++++---- 3 files changed, 48 insertions(+), 20 deletions(-) diff --git a/examples/rtu-client.rs b/examples/rtu-client.rs index 0e6c5c23..550012b3 100644 --- a/examples/rtu-client.rs +++ b/examples/rtu-client.rs @@ -3,30 +3,46 @@ //! Asynchronous RTU client example -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { - use tokio_serial::SerialStream; +use tokio_modbus::{prelude::*, Address, Quantity, Slave}; +use tokio_serial::SerialStream; + +const SERIAL_PATH: &str = "/dev/ttyUSB0"; - use tokio_modbus::prelude::*; +const BAUD_RATE: u32 = 19_200; - let tty_path = "/dev/ttyUSB0"; - let slave = Slave(0x17); +const SERVER: Slave = Slave(0x17); - let builder = tokio_serial::new(tty_path, 19200); - let port = SerialStream::open(&builder).unwrap(); +const SENSOR_ADDRESS: Address = 0x082B; - let mut conn = rtu::ClientConnection::new(port); - println!("Reading a sensor value"); - let request = Request::ReadHoldingRegisters(0x082B, 2); - let request_context = conn.send_request(request, slave).await?; - let Response::ReadHoldingRegisters(value) = conn.recv_response(request_context).await?? else { +const SENSOR_QUANTITY: Quantity = 2; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + let builder = tokio_serial::new(SERIAL_PATH, BAUD_RATE); + let transport = SerialStream::open(&builder).unwrap(); + + let mut connection = rtu::ClientConnection::new(transport); + + println!("Reading sensor values (request/response"); + let request = Request::ReadHoldingRegisters(SENSOR_ADDRESS, SENSOR_QUANTITY); + let request_context = connection.send_request(request, SERVER).await?; + let Response::ReadHoldingRegisters(values) = + connection.recv_response(request_context).await?? + else { // The response variant will always match its corresponding request variant if successful. unreachable!(); }; - println!("Sensor value is: {value:?}"); + println!("Sensor responded with: {values:?}"); + + println!("Reading sensor values (call"); + let mut context = rtu::client_context(connection, SERVER); + let values = context + .read_holding_registers(SENSOR_ADDRESS, SENSOR_QUANTITY) + .await??; + println!("Sensor responded with: {values:?}"); println!("Disconnecting"); - conn.disconnect().await?; + context.disconnect().await?; Ok(()) } diff --git a/src/client/rtu.rs b/src/client/rtu.rs index 03f3479c..0e9e0789 100644 --- a/src/client/rtu.rs +++ b/src/client/rtu.rs @@ -5,6 +5,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; +use crate::prelude::rtu::ClientConnection; + use super::*; /// Connect to no particular Modbus slave device for sending @@ -21,7 +23,16 @@ pub fn attach_slave(transport: T, slave: Slave) -> Context where T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static, { - let client = crate::service::rtu::Client::new(transport, slave); + let connection = ClientConnection::new(transport); + client_context(connection, slave) +} + +/// Creates a client/server connection. +pub fn client_context(connection: ClientConnection, server: Slave) -> Context +where + T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static, +{ + let client = crate::service::rtu::Client::new(connection, server); Context { client: Box::new(client), } diff --git a/src/service/rtu.rs b/src/service/rtu.rs index eb00beb7..cae22884 100644 --- a/src/service/rtu.rs +++ b/src/service/rtu.rs @@ -96,8 +96,7 @@ impl Client where T: AsyncRead + AsyncWrite + Unpin, { - pub(crate) fn new(transport: T, slave: Slave) -> Self { - let connection = ClientConnection::new(transport); + pub(crate) fn new(connection: ClientConnection, slave: Slave) -> Self { let slave_id = slave.into(); Self { connection: Some(connection), @@ -149,7 +148,6 @@ where #[cfg(test)] mod tests { - use core::{ pin::Pin, task::{Context, Poll}, @@ -158,6 +156,8 @@ mod tests { use crate::Error; + use super::*; + #[derive(Debug)] struct MockTransport; @@ -190,8 +190,9 @@ mod tests { #[tokio::test] async fn handle_broken_pipe() { let transport = MockTransport; + let connection = ClientConnection::new(transport); let mut client = - crate::service::rtu::Client::new(transport, crate::service::rtu::Slave::broadcast()); + crate::service::rtu::Client::new(connection, crate::service::rtu::Slave::broadcast()); let res = client .call(crate::service::rtu::Request::ReadCoils(0x00, 5)) .await; From 614083cd5e342fb4ea15a5d20b3a6b29d556ed12 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Sun, 13 Oct 2024 14:22:25 +0200 Subject: [PATCH 3/9] Rename types and functions --- examples/rtu-client.rs | 17 +++--- src/client/rtu.rs | 16 ++---- src/prelude.rs | 2 +- src/service/rtu.rs | 119 ++++++++++++++++++++++++++--------------- 4 files changed, 89 insertions(+), 65 deletions(-) diff --git a/examples/rtu-client.rs b/examples/rtu-client.rs index 550012b3..98cad144 100644 --- a/examples/rtu-client.rs +++ b/examples/rtu-client.rs @@ -21,28 +21,27 @@ async fn main() -> Result<(), Box> { let builder = tokio_serial::new(SERIAL_PATH, BAUD_RATE); let transport = SerialStream::open(&builder).unwrap(); - let mut connection = rtu::ClientConnection::new(transport); + let mut client = rtu::Client::new(transport); - println!("Reading sensor values (request/response"); + println!("Reading sensor values (request/response using the low-level API"); let request = Request::ReadHoldingRegisters(SENSOR_ADDRESS, SENSOR_QUANTITY); - let request_context = connection.send_request(request, SERVER).await?; - let Response::ReadHoldingRegisters(values) = - connection.recv_response(request_context).await?? + let request_context = client.send_request(request, SERVER).await?; + let Response::ReadHoldingRegisters(values) = client.recv_response(request_context).await?? else { // The response variant will always match its corresponding request variant if successful. unreachable!(); }; println!("Sensor responded with: {values:?}"); - println!("Reading sensor values (call"); - let mut context = rtu::client_context(connection, SERVER); - let values = context + println!("Reading sensor values (call) using the high-level API"); + let mut client_context = client::Context::from(rtu::ClientContext::new(client, SERVER).boxed()); + let values = client_context .read_holding_registers(SENSOR_ADDRESS, SENSOR_QUANTITY) .await??; println!("Sensor responded with: {values:?}"); println!("Disconnecting"); - context.disconnect().await?; + client_context.disconnect().await?; Ok(()) } diff --git a/src/client/rtu.rs b/src/client/rtu.rs index 0e9e0789..5d04396f 100644 --- a/src/client/rtu.rs +++ b/src/client/rtu.rs @@ -5,7 +5,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; -use crate::prelude::rtu::ClientConnection; +use crate::service::rtu::{Client, ClientContext}; use super::*; @@ -23,17 +23,9 @@ pub fn attach_slave(transport: T, slave: Slave) -> Context where T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static, { - let connection = ClientConnection::new(transport); - client_context(connection, slave) -} - -/// Creates a client/server connection. -pub fn client_context(connection: ClientConnection, server: Slave) -> Context -where - T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static, -{ - let client = crate::service::rtu::Client::new(connection, server); + let client = Client::new(transport); + let context = ClientContext::new(client, slave); Context { - client: Box::new(client), + client: Box::new(context), } } diff --git a/src/prelude.rs b/src/prelude.rs index f7695096..268196d2 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -13,7 +13,7 @@ pub use crate::client; pub mod rtu { pub use crate::client::rtu::*; pub use crate::frame::rtu::RequestContext; - pub use crate::service::rtu::ClientConnection; + pub use crate::service::rtu::{Client, ClientContext}; } #[allow(missing_docs)] diff --git a/src/service/rtu.rs b/src/service/rtu.rs index cae22884..0993400d 100644 --- a/src/service/rtu.rs +++ b/src/service/rtu.rs @@ -16,13 +16,13 @@ use crate::{ use super::disconnect; -/// Modbus RTU client +/// _Modbus_ RTU client. #[derive(Debug)] -pub struct ClientConnection { +pub struct Client { framed: Framed, } -impl ClientConnection +impl Client where T: AsyncRead + AsyncWrite + Unpin, { @@ -72,68 +72,93 @@ where res_adu.try_into_response(request_context) } -} -fn request_adu<'a, R>(req: R, server: Slave) -> RequestAdu<'a> -where - R: Into>, -{ - let hdr = Header { - slave_id: server.into(), - }; - let pdu = req.into(); - RequestAdu { hdr, pdu } + pub async fn call<'a>(&mut self, request: Request<'a>, server: Slave) -> Result { + let request_context = self.send_request(request, server).await?; + self.recv_response(request_context).await + } } -/// Modbus RTU client +/// _Modbus_ RTU client with (server) context and connection state. +/// +/// Client that invokes methods (request/response) on a single or many (broadcast) server(s). +/// +/// The server can be switched between method calls. #[derive(Debug)] -pub(crate) struct Client { - connection: Option>, - slave_id: SlaveId, +pub struct ClientContext { + client: Option>, + server: Slave, } -impl Client -where - T: AsyncRead + AsyncWrite + Unpin, -{ - pub(crate) fn new(connection: ClientConnection, slave: Slave) -> Self { - let slave_id = slave.into(); +impl ClientContext { + pub fn new(client: Client, server: Slave) -> Self { Self { - connection: Some(connection), - slave_id, + client: Some(client), + server, } } - async fn disconnect(&mut self) -> io::Result<()> { - let Some(connection) = self.connection.take() else { + #[must_use] + pub const fn is_connected(&self) -> bool { + self.client.is_some() + } + + #[must_use] + pub const fn server(&self) -> Slave { + self.server + } + + pub fn set_server(&mut self, server: Slave) { + self.server = server; + } +} + +impl ClientContext +where + T: AsyncWrite + Unpin, +{ + pub async fn disconnect(&mut self) -> io::Result<()> { + let Some(client) = self.client.take() else { // Already disconnected. return Ok(()); }; - disconnect(connection.framed).await + disconnect(client.framed).await } +} - async fn call(&mut self, request: Request<'_>) -> Result { +impl ClientContext +where + T: AsyncRead + AsyncWrite + Unpin, +{ + pub async fn call(&mut self, request: Request<'_>) -> Result { log::debug!("Call {:?}", request); - let Some(connection) = &mut self.connection else { + let Some(client) = &mut self.client else { return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected").into()); }; - let request_context = connection - .send_request(request, Slave(self.slave_id)) - .await?; - connection.recv_response(request_context).await + client.call(request, self.server).await + } +} + +impl ClientContext +where + T: AsyncRead + AsyncWrite + Unpin + fmt::Debug + Send + 'static, +{ + #[must_use] + pub fn boxed(self) -> Box { + Box::new(self) } } -impl SlaveContext for Client { +impl SlaveContext for ClientContext { fn set_slave(&mut self, slave: Slave) { - self.slave_id = slave.into(); + self.set_server(slave); } } #[async_trait::async_trait] -impl crate::client::Client for Client +impl crate::client::Client for ClientContext where T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin, { @@ -146,6 +171,17 @@ where } } +fn request_adu<'a, R>(req: R, server: Slave) -> RequestAdu<'a> +where + R: Into>, +{ + let hdr = Header { + slave_id: server.into(), + }; + let pdu = req.into(); + RequestAdu { hdr, pdu } +} + #[cfg(test)] mod tests { use core::{ @@ -190,12 +226,9 @@ mod tests { #[tokio::test] async fn handle_broken_pipe() { let transport = MockTransport; - let connection = ClientConnection::new(transport); - let mut client = - crate::service::rtu::Client::new(connection, crate::service::rtu::Slave::broadcast()); - let res = client - .call(crate::service::rtu::Request::ReadCoils(0x00, 5)) - .await; + let client = Client::new(transport); + let mut context = ClientContext::new(client, Slave::broadcast()); + let res = context.call(Request::ReadCoils(0x00, 5)).await; assert!(res.is_err()); let err = res.err().unwrap(); assert!( From d48c2ff6bd7ec88a188c5bfa5b405d2623f10001 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Sun, 13 Oct 2024 18:52:52 +0200 Subject: [PATCH 4/9] Reorder arguments --- examples/rtu-client.rs | 2 +- src/service/rtu.rs | 43 +++++++++++++++++++++++------------------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/examples/rtu-client.rs b/examples/rtu-client.rs index 98cad144..f74493e1 100644 --- a/examples/rtu-client.rs +++ b/examples/rtu-client.rs @@ -25,7 +25,7 @@ async fn main() -> Result<(), Box> { println!("Reading sensor values (request/response using the low-level API"); let request = Request::ReadHoldingRegisters(SENSOR_ADDRESS, SENSOR_QUANTITY); - let request_context = client.send_request(request, SERVER).await?; + let request_context = client.send_request(SERVER, request).await?; let Response::ReadHoldingRegisters(values) = client.recv_response(request_context).await?? else { // The response variant will always match its corresponding request variant if successful. diff --git a/src/service/rtu.rs b/src/service/rtu.rs index 0993400d..f98645e0 100644 --- a/src/service/rtu.rs +++ b/src/service/rtu.rs @@ -36,46 +36,51 @@ where disconnect(framed).await } + pub async fn call<'a>(&mut self, server: Slave, request: Request<'a>) -> Result { + let request_context = self.send_request(server, request).await?; + self.recv_response(request_context).await + } + pub async fn send_request<'a>( &mut self, - request: Request<'a>, server: Slave, + request: Request<'a>, ) -> io::Result { - self.send_request_pdu(request, server).await + self.send_request_pdu(server, request).await } async fn send_request_pdu<'a, R>( &mut self, - request: R, server: Slave, + request_pdu: R, ) -> io::Result where R: Into>, { - let request_adu = request_adu(request, server); - let context = request_adu.context(); + let request_adu = request_adu(server, request_pdu); + self.send_request_adu(request_adu).await + } - let Self { framed } = self; + async fn send_request_adu<'a>( + &mut self, + request_adu: RequestAdu<'a>, + ) -> io::Result { + let request_context = request_adu.context(); - framed.read_buffer_mut().clear(); - framed.send(request_adu).await?; + self.framed.read_buffer_mut().clear(); + self.framed.send(request_adu).await?; - Ok(context) + Ok(request_context) } pub async fn recv_response(&mut self, request_context: RequestContext) -> Result { - let res_adu = self + let response_adu = self .framed .next() .await .unwrap_or_else(|| Err(io::Error::from(io::ErrorKind::BrokenPipe)))?; - res_adu.try_into_response(request_context) - } - - pub async fn call<'a>(&mut self, request: Request<'a>, server: Slave) -> Result { - let request_context = self.send_request(request, server).await?; - self.recv_response(request_context).await + response_adu.try_into_response(request_context) } } @@ -137,7 +142,7 @@ where return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected").into()); }; - client.call(request, self.server).await + client.call(self.server, request).await } } @@ -171,14 +176,14 @@ where } } -fn request_adu<'a, R>(req: R, server: Slave) -> RequestAdu<'a> +fn request_adu<'a, R>(server: Slave, request_pdu: R) -> RequestAdu<'a> where R: Into>, { let hdr = Header { slave_id: server.into(), }; - let pdu = req.into(); + let pdu = request_pdu.into(); RequestAdu { hdr, pdu } } From 21c2aad099d3b87e441cf59ee6585806abcc749b Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 17 Oct 2024 14:45:41 +0200 Subject: [PATCH 5/9] Use newtype in RTU header --- src/codec/rtu.rs | 26 +++++++++++++++++--------- src/frame/mod.rs | 2 +- src/frame/rtu.rs | 17 +++++++++-------- src/service/rtu.rs | 4 +--- src/slave.rs | 12 ++++++------ 5 files changed, 34 insertions(+), 27 deletions(-) diff --git a/src/codec/rtu.rs b/src/codec/rtu.rs index 67a2af25..d16e6533 100644 --- a/src/codec/rtu.rs +++ b/src/codec/rtu.rs @@ -10,7 +10,7 @@ use tokio_util::codec::{Decoder, Encoder}; use crate::{ bytes::{Buf, BufMut, Bytes, BytesMut}, frame::rtu::*, - slave::SlaveId, + Slave, SlaveId, }; use super::{encode_request_pdu, request_pdu_size, RequestPdu}; @@ -284,7 +284,9 @@ impl Decoder for ClientCodec { return Ok(None); }; - let hdr = Header { slave_id }; + let hdr = Header { + slave: Slave(slave_id), + }; // Decoding of the PDU is unlikely to fail due // to transmission errors, because the frame's bytes @@ -309,7 +311,9 @@ impl Decoder for ServerCodec { return Ok(None); }; - let hdr = Header { slave_id }; + let hdr = Header { + slave: Slave(slave_id), + }; // Decoding of the PDU is unlikely to fail due // to transmission errors, because the frame's bytes @@ -335,7 +339,7 @@ impl<'a> Encoder> for ClientCodec { let buf_offset = buf.len(); let request_pdu_size = request_pdu_size(&request)?; buf.reserve((buf.capacity() - buf_offset) + request_pdu_size + 3); - buf.put_u8(hdr.slave_id); + buf.put_u8(hdr.slave.into()); encode_request_pdu(buf, &request); let crc = calc_crc(&buf[buf_offset..]); buf.put_u16(crc); @@ -355,7 +359,7 @@ impl Encoder for ServerCodec { let buf_offset = buf.len(); let response_result_pdu_size = super::response_result_pdu_size(&pdu_res)?; buf.reserve(response_result_pdu_size + 3); - buf.put_u8(hdr.slave_id); + buf.put_u8(hdr.slave.into()); super::encode_response_result_pdu(buf, &pdu_res); let crc = calc_crc(&buf[buf_offset..]); buf.put_u16(crc); @@ -661,7 +665,7 @@ mod tests { ); let ResponseAdu { hdr, pdu } = codec.decode(&mut buf).unwrap().unwrap(); assert_eq!(buf.len(), 1); - assert_eq!(hdr.slave_id, 0x01); + assert_eq!(hdr.slave, Slave(0x01)); if let Ok(Response::ReadHoldingRegisters(data)) = pdu.into() { assert_eq!(data.len(), 2); assert_eq!(data, vec![0x8902, 0x42C7]); @@ -692,7 +696,7 @@ mod tests { ); let ResponseAdu { hdr, pdu } = codec.decode(&mut buf).unwrap().unwrap(); assert_eq!(buf.len(), 1); - assert_eq!(hdr.slave_id, 0x01); + assert_eq!(hdr.slave, Slave(0x01)); if let Ok(Response::ReadHoldingRegisters(data)) = pdu.into() { assert_eq!(data.len(), 2); assert_eq!(data, vec![0x8902, 0x42C7]); @@ -730,7 +734,9 @@ mod tests { let req = Request::ReadHoldingRegisters(0x082b, 2); let pdu = req.into(); let slave_id = 0x01; - let hdr = Header { slave_id }; + let hdr = Header { + slave: Slave(slave_id), + }; let adu = RequestAdu { hdr, pdu }; codec.encode(adu, &mut buf).unwrap(); @@ -746,7 +752,9 @@ mod tests { let req = Request::ReadHoldingRegisters(0x082b, 2); let pdu = req.into(); let slave_id = 0x01; - let hdr = Header { slave_id }; + let hdr = Header { + slave: Slave(slave_id), + }; let adu = RequestAdu { hdr, pdu }; let mut buf = BytesMut::with_capacity(40); #[allow(unsafe_code)] diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 26ce98ed..f2f4063a 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -294,7 +294,7 @@ impl<'a> Request<'a> { #[derive(Debug, Clone, PartialEq, Eq)] pub struct SlaveRequest<'a> { /// Slave Id from the request - pub slave: crate::slave::SlaveId, + pub slave: crate::SlaveId, /// A `Request` enum pub request: Request<'a>, } diff --git a/src/frame/rtu.rs b/src/frame/rtu.rs index 0576928e..18233716 100644 --- a/src/frame/rtu.rs +++ b/src/frame/rtu.rs @@ -3,7 +3,7 @@ use super::*; -use crate::{ProtocolError, Result, SlaveId}; +use crate::{ProtocolError, Result, Slave}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct RequestContext { @@ -20,7 +20,7 @@ impl RequestContext { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(crate) struct Header { - pub(crate) slave_id: SlaveId, + pub(crate) slave: Slave, } #[derive(Debug, Clone)] @@ -92,9 +92,10 @@ impl<'a> From> for Request<'a> { #[cfg(feature = "server")] impl<'a> From> for SlaveRequest<'a> { fn from(from: RequestAdu<'a>) -> Self { + let RequestAdu { hdr, pdu } = from; Self { - slave: from.hdr.slave_id, - request: from.pdu.into(), + slave: hdr.slave.into(), + request: pdu.into(), } } } @@ -106,8 +107,8 @@ mod tests { #[test] fn validate_same_headers() { // Given - let req_hdr = Header { slave_id: 0 }; - let rsp_hdr = Header { slave_id: 0 }; + let req_hdr = Header { slave: Slave(0) }; + let rsp_hdr = Header { slave: Slave(0) }; // When let result = verify_response_header(&req_hdr, &rsp_hdr); @@ -119,8 +120,8 @@ mod tests { #[test] fn invalid_validate_not_same_slave_id() { // Given - let req_hdr = Header { slave_id: 0 }; - let rsp_hdr = Header { slave_id: 5 }; + let req_hdr = Header { slave: Slave(0) }; + let rsp_hdr = Header { slave: Slave(5) }; // When let result = verify_response_header(&req_hdr, &rsp_hdr); diff --git a/src/service/rtu.rs b/src/service/rtu.rs index f98645e0..292c8a21 100644 --- a/src/service/rtu.rs +++ b/src/service/rtu.rs @@ -180,9 +180,7 @@ fn request_adu<'a, R>(server: Slave, request_pdu: R) -> RequestAdu<'a> where R: Into>, { - let hdr = Header { - slave_id: server.into(), - }; + let hdr = Header { slave: server }; let pdu = request_pdu.into(); RequestAdu { hdr, pdu } } diff --git a/src/slave.rs b/src/slave.rs index 9dc77651..6aedc844 100644 --- a/src/slave.rs +++ b/src/slave.rs @@ -9,7 +9,7 @@ use std::{fmt, num::ParseIntError, str::FromStr}; pub type SlaveId = u8; /// A single byte for addressing Modbus slave devices. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(transparent)] pub struct Slave(pub SlaveId); @@ -101,7 +101,7 @@ impl FromStr for Slave { impl fmt::Display for Slave { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{} (0x{:0>2X})", self.0, self.0) + write!(f, "{} (0x{:02X})", self.0, self.0) } } @@ -148,9 +148,9 @@ mod tests { } #[test] - fn format() { - assert!(format!("{}", Slave(123)).contains("123")); - assert!(format!("{}", Slave(0x7B)).contains("0x7B")); - assert!(!format!("{}", Slave(0x7B)).contains("0x7b")); + fn display() { + assert_eq!("0 (0x00)", Slave(0).to_string()); + assert_eq!("123 (0x7B)", Slave(123).to_string()); + assert_eq!("123 (0x7B)", Slave(0x7B).to_string()); } } From f619baf23bd91c047cfd84647c4d25a0df763bc5 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 17 Oct 2024 14:46:49 +0200 Subject: [PATCH 6/9] Retire the `service` modules --- src/client/rtu.rs | 2 +- src/client/tcp.rs | 2 +- src/codec/mod.rs | 20 ++++++++++++++++++++ src/frame/rtu.rs | 15 +-------------- src/lib.rs | 8 ++++++-- src/prelude.rs | 3 +-- src/{service => }/rtu.rs | 15 +++++++++++++-- src/service/mod.rs | 28 ---------------------------- src/{service => }/tcp.rs | 4 +--- 9 files changed, 44 insertions(+), 53 deletions(-) rename src/{service => }/rtu.rs (94%) delete mode 100644 src/service/mod.rs rename src/{service => }/tcp.rs (99%) diff --git a/src/client/rtu.rs b/src/client/rtu.rs index 5d04396f..606ffad3 100644 --- a/src/client/rtu.rs +++ b/src/client/rtu.rs @@ -5,7 +5,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; -use crate::service::rtu::{Client, ClientContext}; +use crate::rtu::{Client, ClientContext}; use super::*; diff --git a/src/client/tcp.rs b/src/client/tcp.rs index acd0df56..da72ce4c 100644 --- a/src/client/tcp.rs +++ b/src/client/tcp.rs @@ -43,7 +43,7 @@ pub fn attach_slave(transport: T, slave: Slave) -> Context where T: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug + 'static, { - let client = crate::service::tcp::Client::new(transport, slave); + let client = crate::tcp::Client::new(transport, slave); Context { client: Box::new(client), } diff --git a/src/codec/mod.rs b/src/codec/mod.rs index ee97bb5f..62c336c5 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -26,6 +26,26 @@ pub(crate) mod tcp; const MAX_PDU_SIZE: usize = 253; #[cfg(any(test, feature = "rtu", feature = "tcp"))] +#[cfg(any(feature = "rtu", feature = "tcp"))] +pub(crate) async fn disconnect(framed: tokio_util::codec::Framed) -> std::io::Result<()> +where + T: tokio::io::AsyncWrite + Unpin, +{ + use tokio::io::AsyncWriteExt as _; + + framed + .into_inner() + .shutdown() + .await + .or_else(|err| match err.kind() { + std::io::ErrorKind::NotConnected | std::io::ErrorKind::BrokenPipe => { + // Already disconnected. + Ok(()) + } + _ => Err(err), + }) +} + #[allow(clippy::cast_possible_truncation)] fn u16_len(len: usize) -> u16 { // This type conversion should always be safe, because either diff --git a/src/frame/rtu.rs b/src/frame/rtu.rs index 18233716..d7da0189 100644 --- a/src/frame/rtu.rs +++ b/src/frame/rtu.rs @@ -3,20 +3,7 @@ use super::*; -use crate::{ProtocolError, Result, Slave}; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct RequestContext { - function_code: FunctionCode, - header: Header, -} - -impl RequestContext { - #[must_use] - pub const fn function_code(&self) -> FunctionCode { - self.function_code - } -} +use crate::{rtu::RequestContext, ProtocolError, Result, Slave}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(crate) struct Header { diff --git a/src/lib.rs b/src/lib.rs index d2417f2f..6451263c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,12 @@ pub mod client; pub mod slave; pub use self::slave::{Slave, SlaveId}; +#[cfg(feature = "rtu")] +pub mod rtu; + +#[cfg(feature = "tcp")] +pub mod tcp; + mod codec; mod error; @@ -61,7 +67,5 @@ pub use self::frame::{ /// 2. [`ExceptionCode`]: An error occurred on the _Modbus_ server. pub type Result = std::result::Result, Error>; -mod service; - #[cfg(feature = "server")] pub mod server; diff --git a/src/prelude.rs b/src/prelude.rs index 268196d2..35dcd3f3 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -12,8 +12,7 @@ pub use crate::client; #[cfg(feature = "rtu")] pub mod rtu { pub use crate::client::rtu::*; - pub use crate::frame::rtu::RequestContext; - pub use crate::service::rtu::{Client, ClientContext}; + pub use crate::rtu::{Client, ClientContext, RequestContext}; } #[allow(missing_docs)] diff --git a/src/service/rtu.rs b/src/rtu.rs similarity index 94% rename from src/service/rtu.rs rename to src/rtu.rs index 292c8a21..666019e1 100644 --- a/src/service/rtu.rs +++ b/src/rtu.rs @@ -8,13 +8,24 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; use crate::{ - codec, + codec::{self, disconnect}, frame::{rtu::*, *}, slave::*, Result, }; -use super::disconnect; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct RequestContext { + pub(crate) function_code: FunctionCode, + pub(crate) header: Header, +} + +impl RequestContext { + #[must_use] + pub const fn function_code(&self) -> FunctionCode { + self.function_code + } +} /// _Modbus_ RTU client. #[derive(Debug)] diff --git a/src/service/mod.rs b/src/service/mod.rs deleted file mode 100644 index 212f224b..00000000 --- a/src/service/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2017-2024 slowtec GmbH -// SPDX-License-Identifier: MIT OR Apache-2.0 - -#[cfg(feature = "rtu")] -pub(crate) mod rtu; - -#[cfg(feature = "tcp")] -pub(crate) mod tcp; - -#[cfg(any(feature = "rtu", feature = "tcp"))] -async fn disconnect(framed: tokio_util::codec::Framed) -> std::io::Result<()> -where - T: tokio::io::AsyncWrite + Unpin, -{ - use tokio::io::AsyncWriteExt as _; - - framed - .into_inner() - .shutdown() - .await - .or_else(|err| match err.kind() { - std::io::ErrorKind::NotConnected | std::io::ErrorKind::BrokenPipe => { - // Already disconnected. - Ok(()) - } - _ => Err(err), - }) -} diff --git a/src/service/tcp.rs b/src/tcp.rs similarity index 99% rename from src/service/tcp.rs rename to src/tcp.rs index 38bb6a60..f6914da1 100644 --- a/src/service/tcp.rs +++ b/src/tcp.rs @@ -8,7 +8,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; use crate::{ - codec, + codec::{self, disconnect}, frame::{ tcp::{Header, RequestAdu, ResponseAdu, TransactionId, UnitId}, verify_response_header, RequestPdu, ResponsePdu, @@ -17,8 +17,6 @@ use crate::{ ExceptionResponse, ProtocolError, Request, Response, Result, }; -use super::disconnect; - const INITIAL_TRANSACTION_ID: TransactionId = 0; #[derive(Debug)] From ba8362815189f1ed3dd2df240424a01cff966339 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 17 Oct 2024 14:47:58 +0200 Subject: [PATCH 7/9] Regroup modules --- src/client/mod.rs | 22 ++++ src/client/rtu.rs | 261 ++++++++++++++++++++++++++++++++++++++++++++-- src/client/tcp.rs | 221 ++++++++++++++++++++++++++++++++++++++- src/codec/mod.rs | 21 ---- src/frame/rtu.rs | 2 +- src/lib.rs | 6 -- src/prelude.rs | 1 - src/rtu.rs | 252 -------------------------------------------- src/tcp.rs | 223 --------------------------------------- 9 files changed, 497 insertions(+), 512 deletions(-) delete mode 100644 src/rtu.rs delete mode 100644 src/tcp.rs diff --git a/src/client/mod.rs b/src/client/mod.rs index 651f6125..92b9f2ad 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -316,6 +316,28 @@ impl Writer for Context { } } +#[cfg(any(feature = "rtu", feature = "tcp"))] +pub(crate) async fn disconnect_framed( + framed: tokio_util::codec::Framed, +) -> std::io::Result<()> +where + T: tokio::io::AsyncWrite + Unpin, +{ + use tokio::io::AsyncWriteExt as _; + + framed + .into_inner() + .shutdown() + .await + .or_else(|err| match err.kind() { + std::io::ErrorKind::NotConnected | std::io::ErrorKind::BrokenPipe => { + // Already disconnected. + Ok(()) + } + _ => Err(err), + }) +} + #[cfg(test)] mod tests { use crate::{Error, Result}; diff --git a/src/client/rtu.rs b/src/client/rtu.rs index 606ffad3..1d2eb0d3 100644 --- a/src/client/rtu.rs +++ b/src/client/rtu.rs @@ -3,25 +3,37 @@ //! RTU client connections +use std::{fmt, io}; + +use futures_util::{SinkExt as _, StreamExt as _}; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::codec::Framed; -use crate::rtu::{Client, ClientContext}; +use crate::{ + codec::rtu::ClientCodec, + frame::{ + rtu::{Header, RequestAdu}, + RequestPdu, + }, + slave::SlaveContext, + FunctionCode, Request, Response, Result, Slave, +}; -use super::*; +use super::{disconnect_framed, Context}; -/// Connect to no particular Modbus slave device for sending +/// Connect to no particular _Modbus_ slave device for sending /// broadcast messages. pub fn attach(transport: T) -> Context where - T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + fmt::Debug + Unpin + Send + 'static, { attach_slave(transport, Slave::broadcast()) } -/// Connect to any kind of Modbus slave device. +/// Connect to any kind of _Modbus_ slave device. pub fn attach_slave(transport: T, slave: Slave) -> Context where - T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static, + T: AsyncRead + AsyncWrite + fmt::Debug + Unpin + Send + 'static, { let client = Client::new(transport); let context = ClientContext::new(client, slave); @@ -29,3 +41,240 @@ where client: Box::new(context), } } + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct RequestContext { + pub(crate) function_code: FunctionCode, + pub(crate) header: Header, +} + +impl RequestContext { + #[must_use] + pub const fn function_code(&self) -> FunctionCode { + self.function_code + } +} + +/// _Modbus_ RTU client. +#[derive(Debug)] +pub struct Client { + framed: Framed, +} + +impl Client +where + T: AsyncRead + AsyncWrite + Unpin, +{ + pub fn new(transport: T) -> Self { + let framed = Framed::new(transport, ClientCodec::default()); + Self { framed } + } + + pub async fn disconnect(self) -> io::Result<()> { + let Self { framed } = self; + disconnect_framed(framed).await + } + + pub async fn call<'a>(&mut self, server: Slave, request: Request<'a>) -> Result { + let request_context = self.send_request(server, request).await?; + self.recv_response(request_context).await + } + + pub async fn send_request<'a>( + &mut self, + server: Slave, + request: Request<'a>, + ) -> io::Result { + self.send_request_pdu(server, request).await + } + + async fn send_request_pdu<'a, R>( + &mut self, + server: Slave, + request_pdu: R, + ) -> io::Result + where + R: Into>, + { + let request_adu = request_adu(server, request_pdu); + self.send_request_adu(request_adu).await + } + + async fn send_request_adu<'a>( + &mut self, + request_adu: RequestAdu<'a>, + ) -> io::Result { + let request_context = request_adu.context(); + + self.framed.read_buffer_mut().clear(); + self.framed.send(request_adu).await?; + + Ok(request_context) + } + + pub async fn recv_response(&mut self, request_context: RequestContext) -> Result { + let response_adu = self + .framed + .next() + .await + .unwrap_or_else(|| Err(io::Error::from(io::ErrorKind::BrokenPipe)))?; + + response_adu.try_into_response(request_context) + } +} + +/// _Modbus_ RTU client with (server) context and connection state. +/// +/// Client that invokes methods (request/response) on a single or many (broadcast) server(s). +/// +/// The server can be switched between method calls. +#[derive(Debug)] +pub struct ClientContext { + client: Option>, + server: Slave, +} + +impl ClientContext { + pub fn new(client: Client, server: Slave) -> Self { + Self { + client: Some(client), + server, + } + } + + #[must_use] + pub const fn is_connected(&self) -> bool { + self.client.is_some() + } + + #[must_use] + pub const fn server(&self) -> Slave { + self.server + } + + pub fn set_server(&mut self, server: Slave) { + self.server = server; + } +} + +impl ClientContext +where + T: AsyncWrite + Unpin, +{ + pub async fn disconnect(&mut self) -> io::Result<()> { + let Some(client) = self.client.take() else { + // Already disconnected. + return Ok(()); + }; + disconnect_framed(client.framed).await + } +} + +impl ClientContext +where + T: AsyncRead + AsyncWrite + Unpin, +{ + pub async fn call(&mut self, request: Request<'_>) -> Result { + log::debug!("Call {:?}", request); + + let Some(client) = &mut self.client else { + return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected").into()); + }; + + client.call(self.server, request).await + } +} + +impl ClientContext +where + T: AsyncRead + AsyncWrite + Unpin + fmt::Debug + Send + 'static, +{ + #[must_use] + pub fn boxed(self) -> Box { + Box::new(self) + } +} + +impl SlaveContext for ClientContext { + fn set_slave(&mut self, slave: Slave) { + self.set_server(slave); + } +} + +#[async_trait::async_trait] +impl crate::client::Client for ClientContext +where + T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin, +{ + async fn call(&mut self, req: Request<'_>) -> Result { + self.call(req).await + } + + async fn disconnect(&mut self) -> io::Result<()> { + self.disconnect().await + } +} + +fn request_adu<'a, R>(server: Slave, request_pdu: R) -> RequestAdu<'a> +where + R: Into>, +{ + let hdr = Header { slave: server }; + let pdu = request_pdu.into(); + RequestAdu { hdr, pdu } +} + +#[cfg(test)] +mod tests { + use core::{ + pin::Pin, + task::{Context, Poll}, + }; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Result}; + + use crate::Error; + + use super::*; + + #[derive(Debug)] + struct MockTransport; + + impl Unpin for MockTransport {} + + impl AsyncRead for MockTransport { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut ReadBuf<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + } + + impl AsyncWrite for MockTransport { + fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll> { + Poll::Ready(Ok(2)) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + unimplemented!() + } + } + + #[tokio::test] + async fn handle_broken_pipe() { + let transport = MockTransport; + let client = Client::new(transport); + let mut context = ClientContext::new(client, Slave::broadcast()); + let res = context.call(Request::ReadCoils(0x00, 5)).await; + assert!(res.is_err()); + let err = res.err().unwrap(); + assert!( + matches!(err, Error::Transport(err) if err.kind() == std::io::ErrorKind::BrokenPipe) + ); + } +} diff --git a/src/client/tcp.rs b/src/client/tcp.rs index da72ce4c..6c0381ea 100644 --- a/src/client/tcp.rs +++ b/src/client/tcp.rs @@ -5,12 +5,24 @@ use std::{fmt, io, net::SocketAddr}; +use futures_util::{SinkExt as _, StreamExt as _}; use tokio::{ io::{AsyncRead, AsyncWrite}, net::TcpStream, }; +use tokio_util::codec::Framed; -use super::*; +use crate::{ + codec::tcp::ClientCodec, + frame::{ + tcp::{Header, RequestAdu, ResponseAdu, TransactionId, UnitId}, + verify_response_header, RequestPdu, ResponsePdu, + }, + slave::SlaveContext, + ExceptionResponse, ProtocolError, Request, Response, Result, Slave, +}; + +use super::{disconnect_framed, Context}; /// Establish a direct connection to a Modbus TCP coupler. pub async fn connect(socket_addr: SocketAddr) -> io::Result { @@ -43,8 +55,213 @@ pub fn attach_slave(transport: T, slave: Slave) -> Context where T: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug + 'static, { - let client = crate::tcp::Client::new(transport, slave); + let client = Client::new(transport, slave); Context { client: Box::new(client), } } + +const INITIAL_TRANSACTION_ID: TransactionId = 0; + +#[derive(Debug)] +struct TransactionIdGenerator { + next_transaction_id: TransactionId, +} + +impl TransactionIdGenerator { + const fn new() -> Self { + Self { + next_transaction_id: INITIAL_TRANSACTION_ID, + } + } + + fn next(&mut self) -> TransactionId { + let next_transaction_id = self.next_transaction_id; + self.next_transaction_id = next_transaction_id.wrapping_add(1); + next_transaction_id + } +} + +/// Modbus TCP client +#[derive(Debug)] +pub(crate) struct Client { + framed: Option>, + transaction_id_generator: TransactionIdGenerator, + unit_id: UnitId, +} + +impl Client +where + T: AsyncRead + AsyncWrite + Unpin, +{ + pub(crate) fn new(transport: T, slave: Slave) -> Self { + let framed = Framed::new(transport, ClientCodec::new()); + let transaction_id_generator = TransactionIdGenerator::new(); + let unit_id: UnitId = slave.into(); + Self { + framed: Some(framed), + transaction_id_generator, + unit_id, + } + } + + fn next_request_hdr(&mut self, unit_id: UnitId) -> Header { + let transaction_id = self.transaction_id_generator.next(); + Header { + transaction_id, + unit_id, + } + } + + fn next_request_adu<'a, R>(&mut self, req: R) -> RequestAdu<'a> + where + R: Into>, + { + RequestAdu { + hdr: self.next_request_hdr(self.unit_id), + pdu: req.into(), + } + } + + fn framed(&mut self) -> io::Result<&mut Framed> { + let Some(framed) = &mut self.framed else { + return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected")); + }; + Ok(framed) + } + + pub(crate) async fn call(&mut self, req: Request<'_>) -> Result { + log::debug!("Call {:?}", req); + + let req_function_code = req.function_code(); + let req_adu = self.next_request_adu(req); + let req_hdr = req_adu.hdr; + + let framed = self.framed()?; + + framed.read_buffer_mut().clear(); + framed.send(req_adu).await?; + + let res_adu = framed.next().await.ok_or_else(io::Error::last_os_error)??; + let ResponseAdu { + hdr: res_hdr, + pdu: res_pdu, + } = res_adu; + let ResponsePdu(result) = res_pdu; + + // Match headers of request and response. + if let Err(message) = verify_response_header(&req_hdr, &res_hdr) { + return Err(ProtocolError::HeaderMismatch { message, result }.into()); + } + + // Match function codes of request and response. + let rsp_function_code = match &result { + Ok(response) => response.function_code(), + Err(ExceptionResponse { function, .. }) => *function, + }; + if req_function_code != rsp_function_code { + return Err(ProtocolError::FunctionCodeMismatch { + request: req_function_code, + result, + } + .into()); + } + + Ok(result.map_err( + |ExceptionResponse { + function: _, + exception, + }| exception, + )) + } + + async fn disconnect(&mut self) -> io::Result<()> { + let Some(framed) = self.framed.take() else { + // Already disconnected. + return Ok(()); + }; + disconnect_framed(framed).await + } +} + +impl SlaveContext for Client { + fn set_slave(&mut self, slave: Slave) { + self.unit_id = slave.into(); + } +} + +#[async_trait::async_trait] +impl crate::client::Client for Client +where + T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin, +{ + async fn call(&mut self, req: Request<'_>) -> Result { + self.call(req).await + } + + async fn disconnect(&mut self) -> io::Result<()> { + self.disconnect().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn validate_same_headers() { + // Given + let req_hdr = Header { + unit_id: 0, + transaction_id: 42, + }; + let rsp_hdr = Header { + unit_id: 0, + transaction_id: 42, + }; + + // When + let result = verify_response_header(&req_hdr, &rsp_hdr); + + // Then + assert!(result.is_ok()); + } + + #[test] + fn invalid_validate_not_same_unit_id() { + // Given + let req_hdr = Header { + unit_id: 0, + transaction_id: 42, + }; + let rsp_hdr = Header { + unit_id: 5, + transaction_id: 42, + }; + + // When + let result = verify_response_header(&req_hdr, &rsp_hdr); + + // Then + assert!(result.is_err()); + } + + #[test] + fn invalid_validate_not_same_transaction_id() { + // Given + let req_hdr = Header { + unit_id: 0, + transaction_id: 42, + }; + let rsp_hdr = Header { + unit_id: 0, + transaction_id: 86, + }; + + // When + let result = verify_response_header(&req_hdr, &rsp_hdr); + + // Then + assert!(result.is_err()); + } +} diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 62c336c5..dc27a18b 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -25,27 +25,6 @@ pub(crate) mod tcp; /// As defined by the spec for both RTU and TCP. const MAX_PDU_SIZE: usize = 253; -#[cfg(any(test, feature = "rtu", feature = "tcp"))] -#[cfg(any(feature = "rtu", feature = "tcp"))] -pub(crate) async fn disconnect(framed: tokio_util::codec::Framed) -> std::io::Result<()> -where - T: tokio::io::AsyncWrite + Unpin, -{ - use tokio::io::AsyncWriteExt as _; - - framed - .into_inner() - .shutdown() - .await - .or_else(|err| match err.kind() { - std::io::ErrorKind::NotConnected | std::io::ErrorKind::BrokenPipe => { - // Already disconnected. - Ok(()) - } - _ => Err(err), - }) -} - #[allow(clippy::cast_possible_truncation)] fn u16_len(len: usize) -> u16 { // This type conversion should always be safe, because either diff --git a/src/frame/rtu.rs b/src/frame/rtu.rs index d7da0189..01992988 100644 --- a/src/frame/rtu.rs +++ b/src/frame/rtu.rs @@ -3,7 +3,7 @@ use super::*; -use crate::{rtu::RequestContext, ProtocolError, Result, Slave}; +use crate::{client::rtu::RequestContext, ProtocolError, Result, Slave}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(crate) struct Header { diff --git a/src/lib.rs b/src/lib.rs index 6451263c..7ea11da0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,12 +39,6 @@ pub mod client; pub mod slave; pub use self::slave::{Slave, SlaveId}; -#[cfg(feature = "rtu")] -pub mod rtu; - -#[cfg(feature = "tcp")] -pub mod tcp; - mod codec; mod error; diff --git a/src/prelude.rs b/src/prelude.rs index 35dcd3f3..33c694e1 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -12,7 +12,6 @@ pub use crate::client; #[cfg(feature = "rtu")] pub mod rtu { pub use crate::client::rtu::*; - pub use crate::rtu::{Client, ClientContext, RequestContext}; } #[allow(missing_docs)] diff --git a/src/rtu.rs b/src/rtu.rs deleted file mode 100644 index 666019e1..00000000 --- a/src/rtu.rs +++ /dev/null @@ -1,252 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2017-2024 slowtec GmbH -// SPDX-License-Identifier: MIT OR Apache-2.0 - -use std::{fmt, io}; - -use futures_util::{SinkExt as _, StreamExt as _}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::codec::Framed; - -use crate::{ - codec::{self, disconnect}, - frame::{rtu::*, *}, - slave::*, - Result, -}; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct RequestContext { - pub(crate) function_code: FunctionCode, - pub(crate) header: Header, -} - -impl RequestContext { - #[must_use] - pub const fn function_code(&self) -> FunctionCode { - self.function_code - } -} - -/// _Modbus_ RTU client. -#[derive(Debug)] -pub struct Client { - framed: Framed, -} - -impl Client -where - T: AsyncRead + AsyncWrite + Unpin, -{ - pub fn new(transport: T) -> Self { - let framed = Framed::new(transport, codec::rtu::ClientCodec::default()); - Self { framed } - } - - pub async fn disconnect(self) -> io::Result<()> { - let Self { framed } = self; - disconnect(framed).await - } - - pub async fn call<'a>(&mut self, server: Slave, request: Request<'a>) -> Result { - let request_context = self.send_request(server, request).await?; - self.recv_response(request_context).await - } - - pub async fn send_request<'a>( - &mut self, - server: Slave, - request: Request<'a>, - ) -> io::Result { - self.send_request_pdu(server, request).await - } - - async fn send_request_pdu<'a, R>( - &mut self, - server: Slave, - request_pdu: R, - ) -> io::Result - where - R: Into>, - { - let request_adu = request_adu(server, request_pdu); - self.send_request_adu(request_adu).await - } - - async fn send_request_adu<'a>( - &mut self, - request_adu: RequestAdu<'a>, - ) -> io::Result { - let request_context = request_adu.context(); - - self.framed.read_buffer_mut().clear(); - self.framed.send(request_adu).await?; - - Ok(request_context) - } - - pub async fn recv_response(&mut self, request_context: RequestContext) -> Result { - let response_adu = self - .framed - .next() - .await - .unwrap_or_else(|| Err(io::Error::from(io::ErrorKind::BrokenPipe)))?; - - response_adu.try_into_response(request_context) - } -} - -/// _Modbus_ RTU client with (server) context and connection state. -/// -/// Client that invokes methods (request/response) on a single or many (broadcast) server(s). -/// -/// The server can be switched between method calls. -#[derive(Debug)] -pub struct ClientContext { - client: Option>, - server: Slave, -} - -impl ClientContext { - pub fn new(client: Client, server: Slave) -> Self { - Self { - client: Some(client), - server, - } - } - - #[must_use] - pub const fn is_connected(&self) -> bool { - self.client.is_some() - } - - #[must_use] - pub const fn server(&self) -> Slave { - self.server - } - - pub fn set_server(&mut self, server: Slave) { - self.server = server; - } -} - -impl ClientContext -where - T: AsyncWrite + Unpin, -{ - pub async fn disconnect(&mut self) -> io::Result<()> { - let Some(client) = self.client.take() else { - // Already disconnected. - return Ok(()); - }; - disconnect(client.framed).await - } -} - -impl ClientContext -where - T: AsyncRead + AsyncWrite + Unpin, -{ - pub async fn call(&mut self, request: Request<'_>) -> Result { - log::debug!("Call {:?}", request); - - let Some(client) = &mut self.client else { - return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected").into()); - }; - - client.call(self.server, request).await - } -} - -impl ClientContext -where - T: AsyncRead + AsyncWrite + Unpin + fmt::Debug + Send + 'static, -{ - #[must_use] - pub fn boxed(self) -> Box { - Box::new(self) - } -} - -impl SlaveContext for ClientContext { - fn set_slave(&mut self, slave: Slave) { - self.set_server(slave); - } -} - -#[async_trait::async_trait] -impl crate::client::Client for ClientContext -where - T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin, -{ - async fn call(&mut self, req: Request<'_>) -> Result { - self.call(req).await - } - - async fn disconnect(&mut self) -> io::Result<()> { - self.disconnect().await - } -} - -fn request_adu<'a, R>(server: Slave, request_pdu: R) -> RequestAdu<'a> -where - R: Into>, -{ - let hdr = Header { slave: server }; - let pdu = request_pdu.into(); - RequestAdu { hdr, pdu } -} - -#[cfg(test)] -mod tests { - use core::{ - pin::Pin, - task::{Context, Poll}, - }; - use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Result}; - - use crate::Error; - - use super::*; - - #[derive(Debug)] - struct MockTransport; - - impl Unpin for MockTransport {} - - impl AsyncRead for MockTransport { - fn poll_read( - self: Pin<&mut Self>, - _: &mut Context<'_>, - _: &mut ReadBuf<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - } - - impl AsyncWrite for MockTransport { - fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll> { - Poll::Ready(Ok(2)) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - unimplemented!() - } - } - - #[tokio::test] - async fn handle_broken_pipe() { - let transport = MockTransport; - let client = Client::new(transport); - let mut context = ClientContext::new(client, Slave::broadcast()); - let res = context.call(Request::ReadCoils(0x00, 5)).await; - assert!(res.is_err()); - let err = res.err().unwrap(); - assert!( - matches!(err, Error::Transport(err) if err.kind() == std::io::ErrorKind::BrokenPipe) - ); - } -} diff --git a/src/tcp.rs b/src/tcp.rs deleted file mode 100644 index f6914da1..00000000 --- a/src/tcp.rs +++ /dev/null @@ -1,223 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2017-2024 slowtec GmbH -// SPDX-License-Identifier: MIT OR Apache-2.0 - -use std::{fmt, io}; - -use futures_util::{SinkExt as _, StreamExt as _}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::codec::Framed; - -use crate::{ - codec::{self, disconnect}, - frame::{ - tcp::{Header, RequestAdu, ResponseAdu, TransactionId, UnitId}, - verify_response_header, RequestPdu, ResponsePdu, - }, - slave::*, - ExceptionResponse, ProtocolError, Request, Response, Result, -}; - -const INITIAL_TRANSACTION_ID: TransactionId = 0; - -#[derive(Debug)] -struct TransactionIdGenerator { - next_transaction_id: TransactionId, -} - -impl TransactionIdGenerator { - const fn new() -> Self { - Self { - next_transaction_id: INITIAL_TRANSACTION_ID, - } - } - - fn next(&mut self) -> TransactionId { - let next_transaction_id = self.next_transaction_id; - self.next_transaction_id = next_transaction_id.wrapping_add(1); - next_transaction_id - } -} - -/// Modbus TCP client -#[derive(Debug)] -pub(crate) struct Client { - framed: Option>, - transaction_id_generator: TransactionIdGenerator, - unit_id: UnitId, -} - -impl Client -where - T: AsyncRead + AsyncWrite + Unpin, -{ - pub(crate) fn new(transport: T, slave: Slave) -> Self { - let framed = Framed::new(transport, codec::tcp::ClientCodec::new()); - let transaction_id_generator = TransactionIdGenerator::new(); - let unit_id: UnitId = slave.into(); - Self { - framed: Some(framed), - transaction_id_generator, - unit_id, - } - } - - fn next_request_hdr(&mut self, unit_id: UnitId) -> Header { - let transaction_id = self.transaction_id_generator.next(); - Header { - transaction_id, - unit_id, - } - } - - fn next_request_adu<'a, R>(&mut self, req: R) -> RequestAdu<'a> - where - R: Into>, - { - RequestAdu { - hdr: self.next_request_hdr(self.unit_id), - pdu: req.into(), - } - } - - fn framed(&mut self) -> io::Result<&mut Framed> { - let Some(framed) = &mut self.framed else { - return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected")); - }; - Ok(framed) - } - - pub(crate) async fn call(&mut self, req: Request<'_>) -> Result { - log::debug!("Call {:?}", req); - - let req_function_code = req.function_code(); - let req_adu = self.next_request_adu(req); - let req_hdr = req_adu.hdr; - - let framed = self.framed()?; - - framed.read_buffer_mut().clear(); - framed.send(req_adu).await?; - - let res_adu = framed.next().await.ok_or_else(io::Error::last_os_error)??; - let ResponseAdu { - hdr: res_hdr, - pdu: res_pdu, - } = res_adu; - let ResponsePdu(result) = res_pdu; - - // Match headers of request and response. - if let Err(message) = verify_response_header(&req_hdr, &res_hdr) { - return Err(ProtocolError::HeaderMismatch { message, result }.into()); - } - - // Match function codes of request and response. - let rsp_function_code = match &result { - Ok(response) => response.function_code(), - Err(ExceptionResponse { function, .. }) => *function, - }; - if req_function_code != rsp_function_code { - return Err(ProtocolError::FunctionCodeMismatch { - request: req_function_code, - result, - } - .into()); - } - - Ok(result.map_err( - |ExceptionResponse { - function: _, - exception, - }| exception, - )) - } - - async fn disconnect(&mut self) -> io::Result<()> { - let Some(framed) = self.framed.take() else { - // Already disconnected. - return Ok(()); - }; - disconnect(framed).await - } -} - -impl SlaveContext for Client { - fn set_slave(&mut self, slave: Slave) { - self.unit_id = slave.into(); - } -} - -#[async_trait::async_trait] -impl crate::client::Client for Client -where - T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin, -{ - async fn call(&mut self, req: Request<'_>) -> Result { - self.call(req).await - } - - async fn disconnect(&mut self) -> io::Result<()> { - self.disconnect().await - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn validate_same_headers() { - // Given - let req_hdr = Header { - unit_id: 0, - transaction_id: 42, - }; - let rsp_hdr = Header { - unit_id: 0, - transaction_id: 42, - }; - - // When - let result = verify_response_header(&req_hdr, &rsp_hdr); - - // Then - assert!(result.is_ok()); - } - - #[test] - fn invalid_validate_not_same_unit_id() { - // Given - let req_hdr = Header { - unit_id: 0, - transaction_id: 42, - }; - let rsp_hdr = Header { - unit_id: 5, - transaction_id: 42, - }; - - // When - let result = verify_response_header(&req_hdr, &rsp_hdr); - - // Then - assert!(result.is_err()); - } - - #[test] - fn invalid_validate_not_same_transaction_id() { - // Given - let req_hdr = Header { - unit_id: 0, - transaction_id: 42, - }; - let rsp_hdr = Header { - unit_id: 0, - transaction_id: 86, - }; - - // When - let result = verify_response_header(&req_hdr, &rsp_hdr); - - // Then - assert!(result.is_err()); - } -} From faeb1c28c4809757ea893bc3b64d8cb5ed49d08d Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Mon, 14 Oct 2024 02:58:56 +0200 Subject: [PATCH 8/9] Remove unused method --- src/client/rtu.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/client/rtu.rs b/src/client/rtu.rs index 1d2eb0d3..d0280da6 100644 --- a/src/client/rtu.rs +++ b/src/client/rtu.rs @@ -85,18 +85,7 @@ where server: Slave, request: Request<'a>, ) -> io::Result { - self.send_request_pdu(server, request).await - } - - async fn send_request_pdu<'a, R>( - &mut self, - server: Slave, - request_pdu: R, - ) -> io::Result - where - R: Into>, - { - let request_adu = request_adu(server, request_pdu); + let request_adu = request_adu(server, request); self.send_request_adu(request_adu).await } From c604d68251afdd2461de53b5f503cd30d4a6d209 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Thu, 17 Oct 2024 14:48:44 +0200 Subject: [PATCH 9/9] Fix pre-commit --- src/codec/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/codec/mod.rs b/src/codec/mod.rs index dc27a18b..ee97bb5f 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -25,6 +25,7 @@ pub(crate) mod tcp; /// As defined by the spec for both RTU and TCP. const MAX_PDU_SIZE: usize = 253; +#[cfg(any(test, feature = "rtu", feature = "tcp"))] #[allow(clippy::cast_possible_truncation)] fn u16_len(len: usize) -> u16 { // This type conversion should always be safe, because either