Skip to content

Commit

Permalink
Refine BlockwiseStream, fusing into gcoap::ReqInner::new_blockwise()
Browse files Browse the repository at this point in the history
  • Loading branch information
j-devel committed Jan 10, 2024
1 parent 54333a4 commit 6e29a04
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 19 deletions.
39 changes: 32 additions & 7 deletions examples/xbd-net/src/xbd/gcoap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,29 @@ impl ReqInner {
}
}

pub fn new_blockwise(method: CoapMethod, addr: &str, uri: &str, payload: Option<Vec<u8>>) -> Self {
pub fn new_blockwise(method: CoapMethod, addr: &str, uri: &str, payload: Option<Vec<u8>>) -> BlockwiseStream {
crate::println!("!!!! [gcoap.rs] ReqInner::new_blockwise(): ^^");

assert_eq!(method, COAP_METHOD_GET);
assert_eq!(method, COAP_METHOD_GET); // !!
assert_eq!(payload, None);

// TODO cleanup
ReqInner {
// TODO !!!! fuse
let _fut = ReqInner {
method,
blockwise: true,
addr: addr.to_string(),
uri: uri.to_string(),
payload,
out: Rc::new(RefCell::new(None)),
_waker: Some(AtomicWaker::new()),
}
};

let bs = BlockwiseStream::new();
add_blockwise_req(GcoapBlock::First); // !!!
add_blockwise_req(GcoapBlock::Second); // !!!
add_blockwise_req(GcoapBlock::Last); // !!!

bs
}
}

Expand Down Expand Up @@ -189,10 +196,28 @@ use super::stream::XbdStream;
pub static BLOCKWISE_QUEUE: OnceCell<ArrayQueue<GcoapBlock>> = OnceCell::uninit();
pub static BLOCKWISE_WAKER: AtomicWaker = AtomicWaker::new();

pub type BlockwiseStream = XbdStream<GcoapBlock>;

pub fn add_blockwise_req(req: GcoapBlock) {
XbdStream::add(&BLOCKWISE_QUEUE, &BLOCKWISE_WAKER, req);
}
//----
use futures_util::Stream;
pub struct BlockwiseStream(XbdStream<GcoapBlock>);
impl BlockwiseStream {
pub fn new() -> Self {
Self(XbdStream::new(&BLOCKWISE_QUEUE, &BLOCKWISE_WAKER))
}
}
impl Stream for BlockwiseStream {
type Item = GcoapBlock;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
unsafe {
match Pin::get_unchecked_mut(self) {
Self(inner) => Pin::new_unchecked(inner).poll_next(cx),
}
}
}
}
//----

//
14 changes: 2 additions & 12 deletions examples/xbd-net/src/xbd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,10 @@ impl Xbd {
pub fn async_gcoap_get(addr: &str, uri: &str) -> impl Future<Output = GcoapMemoState> + 'static {
gcoap::Req::new(gcoap::COAP_METHOD_GET, addr, uri, None)
}
//-------- !!!!
pub fn async_gcoap_get_blockwise(addr: &str, uri: &str) -> gcoap::BlockwiseStream {
use crate::xbd::{stream::XbdStream, gcoap::{BLOCKWISE_QUEUE, BLOCKWISE_WAKER, GcoapBlock, add_blockwise_req}};

let _fut = gcoap::ReqInner::new_blockwise(gcoap::COAP_METHOD_GET, "!!!", "!!!", None);
// ^^^^ vvvv TODO fuse
let bs = XbdStream::new(&BLOCKWISE_QUEUE, &BLOCKWISE_WAKER);
add_blockwise_req(GcoapBlock::First);
add_blockwise_req(GcoapBlock::Second);
add_blockwise_req(GcoapBlock::Last);

bs
pub fn async_gcoap_get_blockwise(addr: &str, uri: &str) -> gcoap::BlockwiseStream {
gcoap::ReqInner::new_blockwise(gcoap::COAP_METHOD_GET, addr, uri, None)
}
//-------- !!!!

pub fn async_gcoap_post(addr: &str, uri: &str, payload: &[u8]) -> impl Future<Output = GcoapMemoState> + 'static {
gcoap::Req::new(gcoap::COAP_METHOD_POST, addr, uri, Some(payload.to_vec()))
Expand Down
1 change: 1 addition & 0 deletions examples/xbd-net/src/xbd/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub async fn process_gcoap_server_stream() {

//====
println!("@@ process_gcoap_server_stream(): calling _on_sock_udp_evt_minerva(sock, flags, arg)");
// TODO check comp ,log--get-blockwise-sync (flags)
unsafe { _on_sock_udp_evt_minerva(sock, flags, arg) };
//====
//let pdu_args = (); // !! (pdu, buf, len, ctx) = xx(evt_args);
Expand Down

0 comments on commit 6e29a04

Please sign in to comment.