Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rtu: Send request and receive response one after another #289

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions examples/rtu-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,45 @@

//! Asynchronous RTU client example

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use tokio_serial::SerialStream;
use tokio_modbus::{prelude::*, Address, Quantity, Slave};
use tokio_serial::SerialStream;

const SERIAL_PATH: &str = "/dev/ttyUSB0";

use tokio_modbus::prelude::*;
const BAUD_RATE: u32 = 19_200;

let tty_path = "/dev/ttyUSB0";
let slave = Slave(0x17);
const SERVER: Slave = Slave(0x17);

let builder = tokio_serial::new(tty_path, 19200);
let port = SerialStream::open(&builder).unwrap();
const SENSOR_ADDRESS: Address = 0x082B;

let mut ctx = rtu::attach_slave(port, slave);
println!("Reading a sensor value");
let rsp = ctx.read_holding_registers(0x082B, 2).await??;
println!("Sensor value is: {rsp:?}");
const SENSOR_QUANTITY: Quantity = 2;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let builder = tokio_serial::new(SERIAL_PATH, BAUD_RATE);
let transport = SerialStream::open(&builder).unwrap();

let mut client = rtu::Client::new(transport);

println!("Reading sensor values (request/response using the low-level API");
let request = Request::ReadHoldingRegisters(SENSOR_ADDRESS, SENSOR_QUANTITY);
let request_context = client.send_request(SERVER, request).await?;
let Response::ReadHoldingRegisters(values) = client.recv_response(request_context).await??
else {
// The response variant will always match its corresponding request variant if successful.
unreachable!();
};
println!("Sensor responded with: {values:?}");

println!("Reading sensor values (call) using the high-level API");
let mut client_context = client::Context::from(rtu::ClientContext::new(client, SERVER).boxed());
let values = client_context
.read_holding_registers(SENSOR_ADDRESS, SENSOR_QUANTITY)
.await??;
println!("Sensor responded with: {values:?}");

println!("Disconnecting");
ctx.disconnect().await?;
client_context.disconnect().await?;

Ok(())
}
22 changes: 22 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,28 @@ impl Writer for Context {
}
}

#[cfg(any(feature = "rtu", feature = "tcp"))]
pub(crate) async fn disconnect_framed<T, C>(
framed: tokio_util::codec::Framed<T, C>,
) -> std::io::Result<()>
where
T: tokio::io::AsyncWrite + Unpin,
{
use tokio::io::AsyncWriteExt as _;

framed
.into_inner()
.shutdown()
.await
.or_else(|err| match err.kind() {
std::io::ErrorKind::NotConnected | std::io::ErrorKind::BrokenPipe => {
// Already disconnected.
Ok(())
}
_ => Err(err),
})
}

#[cfg(test)]
mod tests {
use crate::{Error, Result};
Expand Down
255 changes: 248 additions & 7 deletions src/client/rtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,267 @@

//! RTU client connections

use std::{fmt, io};

use futures_util::{SinkExt as _, StreamExt as _};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;

use crate::{
codec::rtu::ClientCodec,
frame::{
rtu::{Header, RequestAdu},
RequestPdu,
},
slave::SlaveContext,
FunctionCode, Request, Response, Result, Slave,
};

use super::*;
use super::{disconnect_framed, Context};

/// Connect to no particular Modbus slave device for sending
/// Connect to no particular _Modbus_ slave device for sending
/// broadcast messages.
pub fn attach<T>(transport: T) -> Context
where
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + fmt::Debug + Unpin + Send + 'static,
{
attach_slave(transport, Slave::broadcast())
}

/// Connect to any kind of Modbus slave device.
/// Connect to any kind of _Modbus_ slave device.
pub fn attach_slave<T>(transport: T, slave: Slave) -> Context
where
T: AsyncRead + AsyncWrite + Debug + Unpin + Send + 'static,
T: AsyncRead + AsyncWrite + fmt::Debug + Unpin + Send + 'static,
{
let client = crate::service::rtu::Client::new(transport, slave);
let client = Client::new(transport);
let context = ClientContext::new(client, slave);
Context {
client: Box::new(client),
client: Box::new(context),
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RequestContext {
pub(crate) function_code: FunctionCode,
pub(crate) header: Header,
}

impl RequestContext {
#[must_use]
pub const fn function_code(&self) -> FunctionCode {
self.function_code
}
}

/// _Modbus_ RTU client.
#[derive(Debug)]
pub struct Client<T> {
framed: Framed<T, ClientCodec>,
}

impl<T> Client<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub fn new(transport: T) -> Self {
let framed = Framed::new(transport, ClientCodec::default());
Self { framed }
}

pub async fn disconnect(self) -> io::Result<()> {
let Self { framed } = self;
disconnect_framed(framed).await
}

pub async fn call<'a>(&mut self, server: Slave, request: Request<'a>) -> Result<Response> {
let request_context = self.send_request(server, request).await?;
self.recv_response(request_context).await
}

pub async fn send_request<'a>(
&mut self,
server: Slave,
request: Request<'a>,
) -> io::Result<RequestContext> {
let request_adu = request_adu(server, request);
self.send_request_adu(request_adu).await
}

async fn send_request_adu<'a>(
&mut self,
request_adu: RequestAdu<'a>,
) -> io::Result<RequestContext> {
let request_context = request_adu.context();

self.framed.read_buffer_mut().clear();
self.framed.send(request_adu).await?;

Ok(request_context)
}

pub async fn recv_response(&mut self, request_context: RequestContext) -> Result<Response> {
let response_adu = self
.framed
.next()
.await
.unwrap_or_else(|| Err(io::Error::from(io::ErrorKind::BrokenPipe)))?;

response_adu.try_into_response(request_context)
}
}

/// _Modbus_ RTU client with (server) context and connection state.
///
/// Client that invokes methods (request/response) on a single or many (broadcast) server(s).
///
/// The server can be switched between method calls.
#[derive(Debug)]
pub struct ClientContext<T> {
client: Option<Client<T>>,
server: Slave,
}

impl<T> ClientContext<T> {
pub fn new(client: Client<T>, server: Slave) -> Self {
Self {
client: Some(client),
server,
}
}

#[must_use]
pub const fn is_connected(&self) -> bool {
self.client.is_some()
}

#[must_use]
pub const fn server(&self) -> Slave {
self.server
}

pub fn set_server(&mut self, server: Slave) {
self.server = server;
}
}

impl<T> ClientContext<T>
where
T: AsyncWrite + Unpin,
{
pub async fn disconnect(&mut self) -> io::Result<()> {
let Some(client) = self.client.take() else {
// Already disconnected.
return Ok(());
};
disconnect_framed(client.framed).await
}
}

impl<T> ClientContext<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub async fn call(&mut self, request: Request<'_>) -> Result<Response> {
log::debug!("Call {:?}", request);

let Some(client) = &mut self.client else {
return Err(io::Error::new(io::ErrorKind::NotConnected, "disconnected").into());
};

client.call(self.server, request).await
}
}

impl<T> ClientContext<T>
where
T: AsyncRead + AsyncWrite + Unpin + fmt::Debug + Send + 'static,
{
#[must_use]
pub fn boxed(self) -> Box<dyn crate::client::Client> {
Box::new(self)
}
}

impl<T> SlaveContext for ClientContext<T> {
fn set_slave(&mut self, slave: Slave) {
self.set_server(slave);
}
}

#[async_trait::async_trait]
impl<T> crate::client::Client for ClientContext<T>
where
T: fmt::Debug + AsyncRead + AsyncWrite + Send + Unpin,
{
async fn call(&mut self, req: Request<'_>) -> Result<Response> {
self.call(req).await
}

async fn disconnect(&mut self) -> io::Result<()> {
self.disconnect().await
}
}

fn request_adu<'a, R>(server: Slave, request_pdu: R) -> RequestAdu<'a>
where
R: Into<RequestPdu<'a>>,
{
let hdr = Header { slave: server };
let pdu = request_pdu.into();
RequestAdu { hdr, pdu }
}

#[cfg(test)]
mod tests {
use core::{
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf, Result};

use crate::Error;

use super::*;

#[derive(Debug)]
struct MockTransport;

impl Unpin for MockTransport {}

impl AsyncRead for MockTransport {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsyncWrite for MockTransport {
fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, _: &[u8]) -> Poll<Result<usize>> {
Poll::Ready(Ok(2))
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
unimplemented!()
}
}

#[tokio::test]
async fn handle_broken_pipe() {
let transport = MockTransport;
let client = Client::new(transport);
let mut context = ClientContext::new(client, Slave::broadcast());
let res = context.call(Request::ReadCoils(0x00, 5)).await;
assert!(res.is_err());
let err = res.err().unwrap();
assert!(
matches!(err, Error::Transport(err) if err.kind() == std::io::ErrorKind::BrokenPipe)
);
}
}
Loading