From 4b46a71a59aa05c5acf07c1e214c3a9d96ccc7e2 Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 11 Oct 2024 14:56:24 +0200 Subject: [PATCH 1/2] worker/jobs: Simplify `enqueue_sync_to_index()` fn This requires two queries instead of one, but it significantly simplifies the code, now that our worker system supports automatic deduplication. Once the enqueuing code uses `diesel-async` the performance hit can be mitigated by using pipelining via `try_join!()`. --- src/worker/jobs/mod.rs | 61 +++--------------------------------------- 1 file changed, 3 insertions(+), 58 deletions(-) diff --git a/src/worker/jobs/mod.rs b/src/worker/jobs/mod.rs index 2ba1a34f81..05510068ad 100644 --- a/src/worker/jobs/mod.rs +++ b/src/worker/jobs/mod.rs @@ -1,9 +1,5 @@ use crate::util::diesel::Conn; -use crates_io_worker::schema::background_jobs; use crates_io_worker::{BackgroundJob, EnqueueError}; -use diesel::dsl::{exists, not}; -use diesel::prelude::*; -use diesel::sql_types::{Int2, Jsonb, Text}; use std::fmt::Display; mod archive_version_downloads; @@ -49,61 +45,10 @@ pub fn enqueue_sync_to_index( krate: T, conn: &mut impl Conn, ) -> Result<(), EnqueueError> { - // Returns jobs with matching `job_type`, `data` and `priority`, - // skipping ones that are already locked by the background worker. - let find_similar_jobs_query = - |job_type: &'static str, data: serde_json::Value, priority: i16| { - background_jobs::table - .select(background_jobs::id) - .filter(background_jobs::job_type.eq(job_type)) - .filter(background_jobs::data.eq(data)) - .filter(background_jobs::priority.eq(priority)) - .for_update() - .skip_locked() - }; + let krate = krate.to_string(); - // Returns one `job_type, data, priority` row with values from the - // passed-in `job`, unless a similar row already exists. - let deduplicated_select_query = - |job_type: &'static str, data: serde_json::Value, priority: i16| { - diesel::select(( - job_type.into_sql::(), - data.clone().into_sql::(), - priority.into_sql::(), - )) - .filter(not(exists(find_similar_jobs_query( - job_type, data, priority, - )))) - }; - - let to_git = deduplicated_select_query( - SyncToGitIndex::JOB_NAME, - serde_json::to_value(SyncToGitIndex::new(krate.to_string()))?, - SyncToGitIndex::PRIORITY, - ); - - let to_sparse = deduplicated_select_query( - SyncToSparseIndex::JOB_NAME, - serde_json::to_value(SyncToSparseIndex::new(krate.to_string()))?, - SyncToSparseIndex::PRIORITY, - ); - - // Insert index update background jobs, but only if they do not - // already exist. - let added_jobs_count = diesel::insert_into(background_jobs::table) - .values(to_git.union_all(to_sparse)) - .into_columns(( - background_jobs::job_type, - background_jobs::data, - background_jobs::priority, - )) - .execute(conn)?; - - // Print a log event if we skipped inserting a job due to deduplication. - if added_jobs_count != 2 { - let skipped_jobs_count = 2 - added_jobs_count; - info!(%skipped_jobs_count, "Skipped adding duplicate jobs to the background worker queue"); - } + SyncToGitIndex::new(krate.clone()).enqueue(conn)?; + SyncToSparseIndex::new(krate).enqueue(conn)?; Ok(()) } From b19bdfdd1360efcfbf22aaba8d364d0ce6bef2ec Mon Sep 17 00:00:00 2001 From: Tobias Bieniek Date: Fri, 11 Oct 2024 15:04:29 +0200 Subject: [PATCH 2/2] worker/jobs: Inline `enqueue_sync_to_index()` fn --- src/admin/delete_crate.rs | 7 +++++-- src/admin/delete_version.rs | 8 ++++++-- src/admin/yank_version.rs | 7 +++---- src/controllers/krate/publish.rs | 3 ++- src/controllers/version/metadata.rs | 5 +++-- src/tests/worker/git.rs | 4 +++- src/worker/jobs/mod.rs | 24 ------------------------ 7 files changed, 22 insertions(+), 36 deletions(-) diff --git a/src/admin/delete_crate.rs b/src/admin/delete_crate.rs index db86e90890..9d6c6b1ba0 100644 --- a/src/admin/delete_crate.rs +++ b/src/admin/delete_crate.rs @@ -97,8 +97,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { }; info!("{name}: Enqueuing index sync jobs…"); - if let Err(error) = jobs::enqueue_sync_to_index(name, conn) { - warn!("{name}: Failed to enqueue index sync jobs: {error}"); + if let Err(error) = jobs::SyncToGitIndex::new(name).enqueue(conn) { + warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}"); + } + if let Err(error) = jobs::SyncToSparseIndex::new(name).enqueue(conn) { + warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}"); } info!("{name}: Enqueuing DeleteCrateFromStorage job…"); diff --git a/src/admin/delete_version.rs b/src/admin/delete_version.rs index 16665d4f93..e2c23cfe25 100644 --- a/src/admin/delete_version.rs +++ b/src/admin/delete_version.rs @@ -5,6 +5,7 @@ use crate::tasks::spawn_blocking; use crate::worker::jobs; use crate::{admin::dialoguer, db, schema::versions}; use anyhow::Context; +use crates_io_worker::BackgroundJob; use diesel::{Connection, ExpressionMethods, QueryDsl}; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; @@ -102,8 +103,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> { })?; info!(%crate_name, "Enqueuing index sync jobs"); - if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) { - warn!(%crate_name, ?error, "Failed to enqueue index sync jobs"); + if let Err(error) = jobs::SyncToGitIndex::new(crate_name).enqueue(conn) { + warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job"); + } + if let Err(error) = jobs::SyncToSparseIndex::new(crate_name).enqueue(conn) { + warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job"); } Ok(opts) diff --git a/src/admin/yank_version.rs b/src/admin/yank_version.rs index 0d7b66075a..1f4efb0e99 100644 --- a/src/admin/yank_version.rs +++ b/src/admin/yank_version.rs @@ -3,8 +3,7 @@ use crate::db; use crate::models::{Crate, Version}; use crate::schema::versions; use crate::tasks::spawn_blocking; -use crate::worker::jobs; -use crate::worker::jobs::UpdateDefaultVersion; +use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion}; use crates_io_worker::BackgroundJob; use diesel::prelude::*; @@ -63,8 +62,8 @@ fn yank(opts: Opts, conn: &mut PgConnection) -> anyhow::Result<()> { .set(versions::yanked.eq(true)) .execute(conn)?; - jobs::enqueue_sync_to_index(&krate.name, conn)?; - + SyncToGitIndex::new(&krate.name).enqueue(conn)?; + SyncToSparseIndex::new(&krate.name).enqueue(conn)?; UpdateDefaultVersion::new(krate.id).enqueue(conn)?; Ok(()) diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index 8715b95235..0c1f5da9f6 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -442,7 +442,8 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult( - krate: T, - conn: &mut impl Conn, -) -> Result<(), EnqueueError> { - let krate = krate.to_string(); - - SyncToGitIndex::new(krate.clone()).enqueue(conn)?; - SyncToSparseIndex::new(krate).enqueue(conn)?; - - Ok(()) -}