Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tech debt #64

Merged
merged 22 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
//!
//! ```no_run
//! # tokio_test::block_on(async {
//! use faktory::WorkerBuilder;
//! use faktory::Worker;
//! use std::io;
//! let mut w = WorkerBuilder::default()
//! let mut w = Worker::builder()
//! .register_fn("foobar", |job| async move {
//! println!("{:?}", job);
//! Ok::<(), io::Error>(())
Expand All @@ -76,7 +76,8 @@ mod worker;

pub use crate::error::Error;
pub use crate::proto::{
Client, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, ServerSnapshot, WorkerId,
Client, Connection, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect,
ServerSnapshot, WorkerId,
};
pub use crate::worker::{JobRunner, Worker, WorkerBuilder};

Expand Down
15 changes: 7 additions & 8 deletions src/proto/batch/handle.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
use crate::error::Error;
use crate::proto::{Batch, BatchId, Client, Job};
use tokio::io::{AsyncBufRead, AsyncWrite};

/// Represents a newly started or re-opened batch of jobs.
pub struct BatchHandle<'a, S: AsyncWrite + Unpin + Send> {
pub struct BatchHandle<'a> {
bid: BatchId,
c: &'a mut Client<S>,
c: &'a mut Client,
}

impl<'a, S: AsyncWrite + Unpin + Send> BatchHandle<'a, S> {
pub(crate) fn new(bid: BatchId, c: &mut Client<S>) -> BatchHandle<'_, S> {
impl<'a> BatchHandle<'a> {
pub(crate) fn new(bid: BatchId, c: &mut Client) -> BatchHandle<'_> {

Check warning on line 11 in src/proto/batch/handle.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/batch/handle.rs#L11

Added line #L11 was not covered by tests
BatchHandle { bid, c }
}
}

impl<'a, S: AsyncWrite + Unpin + Send> BatchHandle<'a, S> {
impl BatchHandle<'_> {
/// ID issued by the Faktory server to this batch.
pub fn id(&self) -> &BatchId {
&self.bid
}
}

impl<'a, S: AsyncBufRead + AsyncWrite + Unpin + Send> BatchHandle<'a, S> {
impl BatchHandle<'_> {
/// Add the given job to the batch.
///
/// Should the submitted job - for whatever reason - already have a `bid` key present in its custom hash,
Expand All @@ -33,7 +32,7 @@
}

/// Initiate a child batch of jobs.
pub async fn start_batch(&mut self, mut batch: Batch) -> Result<BatchHandle<'_, S>, Error> {
pub async fn start_batch(&mut self, mut batch: Batch) -> Result<BatchHandle<'_>, Error> {

Check warning on line 35 in src/proto/batch/handle.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/batch/handle.rs#L35

Added line #L35 was not covered by tests
batch.parent_bid = Some(self.bid.clone());
self.c.start_batch(batch).await
}
Expand Down
6 changes: 1 addition & 5 deletions src/proto/batch/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use crate::error::Error;
use crate::proto::{BatchId, Client};
use chrono::{DateTime, Utc};
use tokio::io::{AsyncBufRead, AsyncWrite};

// Not documented, but existing de fakto and also mentioned in the official client
// https://github.com/contribsys/faktory/blob/main/client/batch.go#L17-L19
Expand Down Expand Up @@ -83,10 +82,7 @@
/// Open the batch for which this `BatchStatus` has been retrieved.
///
/// See [`open_batch`](Client::open_batch).
pub async fn open<S: AsyncBufRead + AsyncWrite + Unpin + Send>(
&self,
prod: &'a mut Client<S>,
) -> Result<Option<BatchHandle<'a, S>>, Error> {
pub async fn open(&self, prod: &'a mut Client) -> Result<Option<BatchHandle<'a>>, Error> {

Check warning on line 85 in src/proto/batch/status.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/batch/status.rs#L85

Added line #L85 was not covered by tests
prod.open_batch(&self.bid).await
}
}
10 changes: 10 additions & 0 deletions src/proto/client/conn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use tokio::io::{AsyncBufRead, AsyncWrite};

use crate::Reconnect;

/// A duplex buffered stream to the Faktory service.
pub trait Connection: AsyncWrite + AsyncBufRead + Unpin + Send + Reconnect {}

impl<T> Connection for T where T: AsyncWrite + AsyncBufRead + Unpin + Send + Reconnect {}

pub type BoxedConnection = Box<dyn Connection>;
41 changes: 22 additions & 19 deletions src/proto/client/ent.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use super::super::batch::{CommitBatch, GetBatchStatus, OpenBatch};
use super::super::{single, BatchStatus, JobId, Progress, ProgressUpdate, Track};
use super::super::{single, BatchStatus, JobId, Progress, ProgressUpdate};
use super::{Client, ReadToken};
use crate::ent::{Batch, BatchHandle, BatchId};
use crate::error::{self, Error};
use tokio::io::{AsyncBufRead, AsyncWrite};
use crate::proto::FetchProgress;

impl<S: AsyncBufRead + AsyncWrite + Unpin + Send> Client<S> {
impl Client {
/// Send information on a job's execution progress to Faktory.
pub async fn set_progress(&mut self, upd: ProgressUpdate) -> Result<(), Error> {
let cmd = Track::Set(upd);
self.issue(&cmd).await?.read_ok().await
pub async fn set_progress<P>(&mut self, upd: P) -> Result<(), Error>
where
P: AsRef<ProgressUpdate> + Sync,
{
self.issue(&upd).await?.read_ok().await

Check warning on line 14 in src/proto/client/ent.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/ent.rs#L10-L14

Added lines #L10 - L14 were not covered by tests
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
}

/// Fetch information on a job's execution progress from Faktory.
pub async fn get_progress(&mut self, jid: JobId) -> Result<Option<Progress>, Error> {
let cmd = Track::Get(jid);
pub async fn get_progress<J>(&mut self, jid: J) -> Result<Option<Progress>, Error>
where
J: AsRef<JobId> + Sync,
{
let cmd = FetchProgress::new(jid);

Check warning on line 22 in src/proto/client/ent.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/ent.rs#L18-L22

Added lines #L18 - L22 were not covered by tests
self.issue(&cmd).await?.read_json().await
}

Expand All @@ -28,7 +33,7 @@
}

/// Initiate a new batch of jobs.
pub async fn start_batch(&mut self, batch: Batch) -> Result<BatchHandle<'_, S>, Error> {
pub async fn start_batch(&mut self, batch: Batch) -> Result<BatchHandle<'_>, Error> {

Check warning on line 36 in src/proto/client/ent.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/ent.rs#L36

Added line #L36 was not covered by tests
let bid = self.issue(&batch).await?.read_bid().await?;
Ok(BatchHandle::new(bid, self))
}
Expand All @@ -37,7 +42,7 @@
///
/// This will not error if a batch with the provided `bid` does not exist,
/// rather `Ok(None)` will be returned.
pub async fn open_batch<B>(&mut self, bid: B) -> Result<Option<BatchHandle<'_, S>>, Error>
pub async fn open_batch<B>(&mut self, bid: B) -> Result<Option<BatchHandle<'_>>, Error>

Check warning on line 45 in src/proto/client/ent.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/ent.rs#L45

Added line #L45 was not covered by tests
where
B: AsRef<BatchId> + Sync,
{
Expand All @@ -53,23 +58,21 @@
}
}

impl<'a, S: AsyncBufRead + AsyncWrite + Unpin + Send> ReadToken<'a, S> {
impl ReadToken<'_> {
pub(crate) async fn read_bid(self) -> Result<BatchId, Error> {
single::read_bid(&mut self.0.stream).await
}

pub(crate) async fn maybe_bid(self) -> Result<Option<BatchId>, Error> {
match single::read_bid(&mut self.0.stream).await {
Ok(bid) => Ok(Some(bid)),
Err(err) => match err {
Error::Protocol(error::Protocol::Internal { msg }) => {
if msg.starts_with("No such batch") {
return Ok(None);
}
Err(error::Protocol::Internal { msg }.into())
Err(Error::Protocol(error::Protocol::Internal { msg })) => {
if msg.starts_with("No such batch") {
return Ok(None);

Check warning on line 71 in src/proto/client/ent.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/ent.rs#L69-L71

Added lines #L69 - L71 were not covered by tests
}
another => Err(another),
},
Err(error::Protocol::Internal { msg }.into())

Check warning on line 73 in src/proto/client/ent.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/ent.rs#L73

Added line #L73 was not covered by tests
}
Err(another) => Err(another),

Check warning on line 75 in src/proto/client/ent.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/ent.rs#L75

Added line #L75 was not covered by tests
}
}
}
68 changes: 29 additions & 39 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@ mod ent;
#[cfg(doc)]
use crate::proto::{BatchStatus, Progress, ProgressUpdate};

use super::{single, Info, Push, QueueAction, QueueControl, Reconnect};
use super::{single, Info, Push, QueueAction, QueueControl};
use super::{utils, PushBulk};
use crate::error::{self, Error};
use crate::{Job, WorkerId};
use crate::{Job, Reconnect, WorkerId};
use std::collections::HashMap;
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream};
use tokio::io::{AsyncRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;

mod options;
pub(crate) use options::ClientOptions;

mod conn;
pub(crate) use conn::BoxedConnection;
pub use conn::Connection;

pub(crate) const EXPECTED_PROTOCOL_VERSION: usize = 2;

fn check_protocols_match(ver: usize) -> Result<(), Error> {
Expand Down Expand Up @@ -120,14 +124,14 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> {
///
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::{Client, JobId, ent::ProgressUpdateBuilder};
/// use faktory::{Client, JobId, ent::ProgressUpdate};
/// let jid = JobId::new("W8qyVle9vXzUWQOf");
/// let mut cl = Client::connect(None).await?;
/// let progress = ProgressUpdateBuilder::new(jid)
/// let progress = ProgressUpdate::builder(jid)
/// .desc("Almost done...".to_owned())
/// .percent(99)
/// .build();
/// cl.set_progress(progress).await?;
/// cl.set_progress(&progress).await?;
/// # Ok::<(), faktory::Error>(())
/// });
///````
Expand All @@ -145,15 +149,11 @@ fn check_protocols_match(ver: usize) -> Result<(), Error> {
/// # Ok::<(), faktory::Error>(())
/// });
/// ```
pub struct Client<S: AsyncWrite + Unpin + Send> {
stream: S,
pub struct Client {
stream: BoxedConnection,
opts: ClientOptions,
}

impl<S> Client<S>
where
S: AsyncBufRead + AsyncWrite + Unpin + Send + Reconnect,
{
impl Client {
pub(crate) async fn connect_again(&mut self) -> Result<Self, Error> {
let s = self.stream.reconnect().await?;
Client::new(s, self.opts.clone()).await
Expand All @@ -165,10 +165,7 @@ where
}
}

impl<S> Drop for Client<S>
where
S: AsyncWrite + Unpin + Send,
{
impl Drop for Client {
fn drop(&mut self) {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
Expand All @@ -186,22 +183,23 @@ pub(crate) enum HeartbeatStatus {
Quiet,
}

impl<S: AsyncRead + AsyncWrite + Send + Unpin> Client<BufStream<S>> {
impl Client {
/// Create new [`Client`] and connect to a Faktory server with a non-standard stream.
pub async fn connect_with(
stream: S,
pwd: Option<String>,
) -> Result<Client<BufStream<S>>, Error> {
pub async fn connect_with<S>(stream: S, pwd: Option<String>) -> Result<Client, Error>
where
S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
BufStream<S>: Reconnect,
{
let buffered = BufStream::new(stream);
let opts = ClientOptions {
password: pwd,
..Default::default()
};
Client::new(buffered, opts).await
Client::new(Box::new(buffered), opts).await
}
}

impl Client<BufStream<TokioStream>> {
impl Client {
/// Create new [`Client`] and connect to a Faktory server.
///
/// If `url` is not given, will use the standard Faktory environment variables. Specifically,
Expand All @@ -213,17 +211,14 @@ impl Client<BufStream<TokioStream>> {
/// ```text
/// tcp://localhost:7419
/// ```
pub async fn connect(url: Option<&str>) -> Result<Client<BufStream<TokioStream>>, Error> {
pub async fn connect(url: Option<&str>) -> Result<Client, Error> {
let url = utils::parse_provided_or_from_env(url)?;
let stream = TokioStream::connect(utils::host_from_url(&url)).await?;
Self::connect_with(stream, url.password().map(|p| p.to_string())).await
}
}

impl<S> Client<S>
where
S: AsyncBufRead + AsyncWrite + Unpin + Send,
{
impl Client {
async fn init(&mut self) -> Result<(), Error> {
let hi = single::read_hi(&mut self.stream).await?;
check_protocols_match(hi.version)?;
Expand Down Expand Up @@ -264,7 +259,7 @@ where
Ok(())
}

pub(crate) async fn new(stream: S, opts: ClientOptions) -> Result<Client<S>, Error> {
pub(crate) async fn new(stream: BoxedConnection, opts: ClientOptions) -> Result<Client, Error> {
let mut c = Client { stream, opts };
c.init().await?;
Ok(c)
Expand All @@ -273,7 +268,7 @@ where
pub(crate) async fn issue<FC: single::FaktoryCommand>(
&mut self,
c: &FC,
) -> Result<ReadToken<'_, S>, Error> {
) -> Result<ReadToken<'_>, Error> {
single::write_command(&mut self.stream, c).await?;
Ok(ReadToken(self))
}
Expand Down Expand Up @@ -328,10 +323,7 @@ where
}
}

impl<S> Client<S>
where
S: AsyncBufRead + AsyncWrite + Unpin + Send,
{
impl Client {
/// Enqueue the given job on the Faktory server.
///
/// Returns `Ok` if the job was successfully queued by the Faktory server.
Expand Down Expand Up @@ -435,11 +427,9 @@ where
}
}

pub struct ReadToken<'a, S>(pub(crate) &'a mut Client<S>)
where
S: AsyncWrite + Unpin + Send;
pub struct ReadToken<'a>(pub(crate) &'a mut Client);

impl<'a, S: AsyncBufRead + AsyncWrite + Unpin + Send> ReadToken<'a, S> {
impl ReadToken<'_> {
pub(crate) async fn read_ok(self) -> Result<(), Error> {
single::read_ok(&mut self.0.stream).await
}
Expand Down
Loading
Loading