Skip to content

Commit

Permalink
worker: Extract enqueue_simple/deduplicated() fns (#9630)
Browse files Browse the repository at this point in the history
  • Loading branch information
Turbo87 authored Oct 11, 2024
1 parent 261f91e commit 6fcac60
Showing 1 changed file with 58 additions and 39 deletions.
97 changes: 58 additions & 39 deletions crates/crates_io_worker/src/background_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use diesel::prelude::*;
use diesel::sql_types::{Int2, Jsonb, Text};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
use std::future::Future;
use tracing::instrument;

Expand Down Expand Up @@ -49,49 +50,67 @@ pub trait BackgroundJob: Serialize + DeserializeOwned + Send + Sync + 'static {
fn enqueue_with_priority(
&self,
conn: &mut impl LoadConnection<Backend = Pg>,
job_priority: i16,
priority: i16,
) -> Result<Option<i64>, EnqueueError> {
let job_data = serde_json::to_value(self)?;
let data = serde_json::to_value(self)?;

if Self::DEDUPLICATED {
let similar_jobs = background_jobs::table
.select(background_jobs::id)
.filter(background_jobs::job_type.eq(Self::JOB_NAME))
.filter(background_jobs::data.eq(&job_data))
.filter(background_jobs::priority.eq(job_priority))
.for_update()
.skip_locked();

let deduplicated_select = diesel::select((
Self::JOB_NAME.into_sql::<Text>(),
(&job_data).into_sql::<Jsonb>(),
job_priority.into_sql::<Int2>(),
))
.filter(not(exists(similar_jobs)));

let id = diesel::insert_into(background_jobs::table)
.values(deduplicated_select)
.into_columns((
background_jobs::job_type,
background_jobs::data,
background_jobs::priority,
))
.returning(background_jobs::id)
.get_result::<i64>(conn)
.optional()?;

Ok(id)
Ok(enqueue_deduplicated(conn, Self::JOB_NAME, &data, priority)?)
} else {
let id = diesel::insert_into(background_jobs::table)
.values((
background_jobs::job_type.eq(Self::JOB_NAME),
background_jobs::data.eq(job_data),
background_jobs::priority.eq(job_priority),
))
.returning(background_jobs::id)
.get_result(conn)?;

Ok(Some(id))
Ok(Some(enqueue_simple(conn, Self::JOB_NAME, &data, priority)?))
}
}
}

fn enqueue_deduplicated(
conn: &mut impl LoadConnection<Backend = Pg>,
job_type: &str,
data: &Value,
priority: i16,
) -> Result<Option<i64>, EnqueueError> {
let similar_jobs = background_jobs::table
.select(background_jobs::id)
.filter(background_jobs::job_type.eq(job_type))
.filter(background_jobs::data.eq(data))
.filter(background_jobs::priority.eq(priority))
.for_update()
.skip_locked();

let deduplicated_select = diesel::select((
job_type.into_sql::<Text>(),
data.into_sql::<Jsonb>(),
priority.into_sql::<Int2>(),
))
.filter(not(exists(similar_jobs)));

let id = diesel::insert_into(background_jobs::table)
.values(deduplicated_select)
.into_columns((
background_jobs::job_type,
background_jobs::data,
background_jobs::priority,
))
.returning(background_jobs::id)
.get_result::<i64>(conn)
.optional()?;

Ok(id)
}

fn enqueue_simple(
conn: &mut impl LoadConnection<Backend = Pg>,
job_type: &str,
data: &Value,
priority: i16,
) -> Result<i64, EnqueueError> {
let id = diesel::insert_into(background_jobs::table)
.values((
background_jobs::job_type.eq(job_type),
background_jobs::data.eq(data),
background_jobs::priority.eq(priority),
))
.returning(background_jobs::id)
.get_result(conn)?;

Ok(id)
}

0 comments on commit 6fcac60

Please sign in to comment.