-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tech debt #64
Tech debt #64
Conversation
As for the boxed dyn Connection thread, please see this comment |
For the |
That actually did the trick. We've been able to get rid of the EDIT: the flaky test should be fixed by #71 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm loving this set of changes!
src/worker/builder.rs
Outdated
) -> Result<Worker<E>, Error> | ||
where | ||
S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, | ||
BufStream<S>: Reconnect, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we have a blanket implementation of Reconnect
for BufStream<S> where S: Reconnect
it should be sufficient to add Reconnect
to the bounds on S
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto for Client::connect_with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the thing with Reconnect
is that Box<dyn Connection>
returned from async fn reconnect()
should already be AsyncBufRead
. So we are accepting any S
that is AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static'
because we then buffer it making it AsyncBufRead
which only then qualifies it as Reconnect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, hmm, I think we might need two traits here then:
trait Reconnect {
fn reconnect(&mut self) -> io::Result<BoxedConnection>;
}
trait UnbufferedConnection: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static + ReconnectUnbuffered {}
impl<T> UnbufferedConnection for T where T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static {}
trait ReconnectUnbuffered {
fn reconnect(&mut self) -> io::Result<Box<dyn UnbufferedConnection>>;
}
with
impl<S> Reconnect for BufStream<S> where S: UnbufferedConnection {}
it's not super pretty, but it would allow folks to bring their own buffered or unbuffered connection types, and use the former directly or use the latter wrapped in a BufStream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid I am not following here.
For both Client::connect_with
and WorkerBuilder::connect_with
the bound is now:
S: AsyncBufRead + AsyncWrite + Reconnect + Send + Sync + Unpin + 'static,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe I've led you astray. I read over the code again this morning, and I think I understand what you've been trying to tell me 😅 I think the only thing left that's confusing to me is how we can have
impl<S> Reconnect for BufStream<S>
and also
impl Reconnect for BufStream<TlsStream<tokio::net::TcpStream>>
and
impl Reconnect for BufStream<TlsStream<proto::BoxedConnection>>
It feels like that first impl
should overlap with the other two. I'm guessing it doesn't because we define Reconnect
ourselves and we don't have
impl Reconnect for TlsStream<tokio::net::TcpStream>
or
impl Reconnect for TlsStream<proto::BoxedConnection>
That makes me wonder though — would it be sufficient to instead have
impl Reconnect for TlsStream<S>
(where I don't think we even need to require AsyncBufRead
for that S
)?
It also feels weird that we even have
impl Reconnect for BufStream<TlsStream<proto::BoxedConnection>>
Because that suggests we may end up double-buffering (since BoxedConnection
is already AsyncBufRead
).
@jonhoo Sorry for thrashing the loop 😅 Please have another look at this PR. I think I've addressed almost all the threads, and will need your advice in a couple of them. Something I've noticed, but would like to discuss with you first:
|
.await | ||
.unwrap() | ||
.enqueue(Job::builder("panic").queue(local).build()) | ||
let mut c = Client::connect(None).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I look at connect(None)
, the less I like it. I wonder if we should have two methods:
fn connect() { .. }
fn connect_to(addr: &str) { .. }
what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the suggested changes will make the bindings more ergonomic, because now even though they are already providing FAKTORY_URL into the environment, we still make them pass None
to the the Client::connect
.
What do you make of batching these changes with my earlier suggestions in a follow-up "polishing" PR? Also what do you make of those suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm fine doing that change in a separate PR — this one is already growing quite large and unwieldy! I should have used reviewable from the start for it :p
src/worker/builder.rs
Outdated
) -> Result<Worker<E>, Error> | ||
where | ||
S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, | ||
BufStream<S>: Reconnect, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, hmm, I think we might need two traits here then:
trait Reconnect {
fn reconnect(&mut self) -> io::Result<BoxedConnection>;
}
trait UnbufferedConnection: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static + ReconnectUnbuffered {}
impl<T> UnbufferedConnection for T where T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static {}
trait ReconnectUnbuffered {
fn reconnect(&mut self) -> io::Result<Box<dyn UnbufferedConnection>>;
}
with
impl<S> Reconnect for BufStream<S> where S: UnbufferedConnection {}
it's not super pretty, but it would allow folks to bring their own buffered or unbuffered connection types, and use the former directly or use the latter wrapped in a BufStream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to approve and land this just so we can hone the discussion (in a separate PR) specifically onto the one remaining question around the Reconnect
impls. I still think we need to nail it down before an eventual release.
Threads that needed to be addressed after the recent "async" PR merge:
This change is