Skip to content

Commit

Permalink
split out async channel code (#48023)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges authored Jun 24, 2024
1 parent 2ba17fe commit 2c3418c
Show file tree
Hide file tree
Showing 9 changed files with 813 additions and 812 deletions.
4 changes: 2 additions & 2 deletions benches/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use pushpin::connmgr::client::TestClient;
use pushpin::core::channel;
use pushpin::core::executor::Executor;
use pushpin::core::reactor::Reactor;
use pushpin::future::{AsyncReadExt, AsyncSender, AsyncTcpListener, AsyncTcpStream, AsyncWriteExt};
use pushpin::future::{AsyncReadExt, AsyncTcpListener, AsyncTcpStream, AsyncWriteExt};
use std::net::SocketAddr;
use std::rc::Rc;
use std::str;
Expand All @@ -46,7 +46,7 @@ where

executor
.spawn(async move {
let s = AsyncSender::new(s);
let s = channel::AsyncSender::new(s);
let listener = AsyncTcpListener::new(listener);

for _ in 0..REQS_PER_ITER {
Expand Down
5 changes: 2 additions & 3 deletions src/connmgr/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::connmgr::zhttppacket;
use crate::connmgr::zhttpsocket::{self, SessionKey, FROM_MAX, REQ_ID_MAX};
use crate::core::arena;
use crate::core::buffer::TmpBuffer;
use crate::core::channel;
use crate::core::channel::{self, AsyncLocalReceiver, AsyncLocalSender, AsyncReceiver};
use crate::core::event;
use crate::core::executor::{Executor, Spawner};
use crate::core::list;
Expand All @@ -33,8 +33,7 @@ use crate::core::tnetstring;
use crate::core::zmq::{MultipartHeader, SpecInfo};
use crate::future::{
event_wait, select_2, select_5, select_6, select_option, yield_to_local_events,
AsyncLocalReceiver, AsyncLocalSender, AsyncReceiver, CancellationSender, CancellationToken,
Select2, Select5, Select6, Timeout,
CancellationSender, CancellationToken, Select2, Select5, Select6, Timeout,
};
use arrayvec::ArrayVec;
use ipnet::IpNet;
Expand Down
8 changes: 4 additions & 4 deletions src/connmgr/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::core::arena;
use crate::core::buffer::{
Buffer, ContiguousBuffer, LimitBufsMut, TmpBuffer, VecRingBuffer, VECTORED_MAX,
};
use crate::core::channel::{AsyncLocalReceiver, AsyncLocalSender};
use crate::core::defer::Defer;
use crate::core::http1::Error as CoreHttpError;
use crate::core::http1::{self, client, server, RecvStatus, SendStatus};
Expand All @@ -56,10 +57,9 @@ use crate::core::shuffle::random;
use crate::core::waker::RefWakerData;
use crate::core::zmq::MultipartHeader;
use crate::future::{
io_split, poll_async, select_2, select_3, select_4, select_option, AsyncLocalReceiver,
AsyncLocalSender, AsyncRead, AsyncReadExt, AsyncResolver, AsyncTcpStream, AsyncTlsStream,
AsyncWrite, AsyncWriteExt, CancellationToken, ReadHalf, Select2, Select3, Select4,
StdWriteWrapper, Timeout, TlsWaker, WriteHalf,
io_split, poll_async, select_2, select_3, select_4, select_option, AsyncRead, AsyncReadExt,
AsyncResolver, AsyncTcpStream, AsyncTlsStream, AsyncWrite, AsyncWriteExt, CancellationToken,
ReadHalf, Select2, Select3, Select4, StdWriteWrapper, Timeout, TlsWaker, WriteHalf,
};
use arrayvec::{ArrayString, ArrayVec};
use ipnet::IpNet;
Expand Down
13 changes: 5 additions & 8 deletions src/connmgr/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use crate::core::channel;
use crate::core::executor::Executor;
use crate::core::net::{NetListener, NetStream, SocketAddr};
use crate::core::reactor::Reactor;
use crate::future::{
select_2, select_slice, AsyncNetListener, AsyncReceiver, AsyncSender, NetAcceptFuture, Select2,
WaitWritableFuture,
};
use crate::future::{select_2, select_slice, AsyncNetListener, NetAcceptFuture, Select2};
use log::{debug, error};
use std::cmp;
use std::sync::mpsc;
Expand Down Expand Up @@ -67,18 +64,18 @@ impl Listener {
listeners: Vec<NetListener>,
senders: Vec<channel::Sender<(usize, NetStream, SocketAddr)>>,
) {
let stop = AsyncReceiver::new(stop);
let stop = channel::AsyncReceiver::new(stop);

let mut listeners: Vec<AsyncNetListener> =
listeners.into_iter().map(AsyncNetListener::new).collect();

let mut senders: Vec<AsyncSender<(usize, NetStream, SocketAddr)>> =
senders.into_iter().map(AsyncSender::new).collect();
let mut senders: Vec<channel::AsyncSender<(usize, NetStream, SocketAddr)>> =
senders.into_iter().map(channel::AsyncSender::new).collect();

let mut listeners_pos = 0;
let mut senders_pos = 0;

let mut sender_tasks_mem: Vec<WaitWritableFuture<(usize, NetStream, SocketAddr)>> =
let mut sender_tasks_mem: Vec<channel::WaitWritableFuture<(usize, NetStream, SocketAddr)>> =
Vec::with_capacity(senders.len());

let mut listener_tasks_mem: Vec<NetAcceptFuture> = Vec::with_capacity(listeners.len());
Expand Down
7 changes: 3 additions & 4 deletions src/connmgr/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::connmgr::zhttpsocket;
use crate::connmgr::{ListenConfig, ListenSpec};
use crate::core::arena;
use crate::core::buffer::TmpBuffer;
use crate::core::channel;
use crate::core::channel::{self, AsyncLocalReceiver, AsyncLocalSender, AsyncReceiver};
use crate::core::event;
use crate::core::executor::{Executor, Spawner};
use crate::core::fs::{set_group, set_user};
Expand All @@ -38,9 +38,8 @@ use crate::core::waker::RefWakerData;
use crate::core::zmq::SpecInfo;
use crate::future::{
event_wait, select_2, select_3, select_6, select_8, select_option, yield_to_local_events,
AsyncLocalReceiver, AsyncLocalSender, AsyncReceiver, AsyncTcpStream, AsyncTlsStream,
AsyncUnixStream, CancellationSender, CancellationToken, Select2, Select3, Select6, Select8,
Timeout, TlsWaker,
AsyncTcpStream, AsyncTlsStream, AsyncUnixStream, CancellationSender, CancellationToken,
Select2, Select3, Select6, Select8, Timeout, TlsWaker,
};
use arrayvec::{ArrayString, ArrayVec};
use log::{debug, error, info, warn};
Expand Down
2 changes: 1 addition & 1 deletion src/connmgr/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

use crate::future::AsyncLocalReceiver;
use crate::core::channel::AsyncLocalReceiver;
use std::cell::Cell;
use std::future::Future;
use std::ops::Deref;
Expand Down
7 changes: 3 additions & 4 deletions src/connmgr/zhttpsocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
use crate::connmgr::zhttppacket::{parse_ids, Id, ParseScratch};
use crate::core::arena;
use crate::core::buffer::trim_for_display;
use crate::core::channel;
use crate::core::channel::{self, AsyncReceiver, AsyncSender, RecvFuture, WaitWritableFuture};
use crate::core::event;
use crate::core::executor::Executor;
use crate::core::list;
use crate::core::reactor::Reactor;
use crate::core::tnetstring;
use crate::core::zmq::{MultipartHeader, SpecInfo, ZmqSocket};
use crate::future::{
select_10, select_9, select_option, select_slice, AsyncReceiver, AsyncSender, AsyncZmqSocket,
RecvFuture, Select10, Select9, WaitWritableFuture, ZmqSendFuture, ZmqSendToFuture,
REGISTRATIONS_PER_CHANNEL, REGISTRATIONS_PER_ZMQSOCKET,
select_10, select_9, select_option, select_slice, AsyncZmqSocket, Select10, Select9,
ZmqSendFuture, ZmqSendToFuture, REGISTRATIONS_PER_CHANNEL, REGISTRATIONS_PER_ZMQSOCKET,
};
use arrayvec::{ArrayString, ArrayVec};
use log::{debug, error, log_enabled, trace, warn};
Expand Down
Loading

0 comments on commit 2c3418c

Please sign in to comment.