Skip to content

Commit

Permalink
Merge pull request #9629 from Turbo87/inline-sync-index
Browse files Browse the repository at this point in the history
worker/jobs: Simplify and inline `enqueue_sync_to_index()` fn
  • Loading branch information
Turbo87 authored Oct 11, 2024
2 parents 868a502 + b19bdfd commit 261f91e
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 91 deletions.
7 changes: 5 additions & 2 deletions src/admin/delete_crate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
};

info!("{name}: Enqueuing index sync jobs…");
if let Err(error) = jobs::enqueue_sync_to_index(name, conn) {
warn!("{name}: Failed to enqueue index sync jobs: {error}");
if let Err(error) = jobs::SyncToGitIndex::new(name).enqueue(conn) {
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
}
if let Err(error) = jobs::SyncToSparseIndex::new(name).enqueue(conn) {
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");
}

info!("{name}: Enqueuing DeleteCrateFromStorage job…");
Expand Down
8 changes: 6 additions & 2 deletions src/admin/delete_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::tasks::spawn_blocking;
use crate::worker::jobs;
use crate::{admin::dialoguer, db, schema::versions};
use anyhow::Context;
use crates_io_worker::BackgroundJob;
use diesel::{Connection, ExpressionMethods, QueryDsl};
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;

Expand Down Expand Up @@ -102,8 +103,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
})?;

info!(%crate_name, "Enqueuing index sync jobs");
if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) {
warn!(%crate_name, ?error, "Failed to enqueue index sync jobs");
if let Err(error) = jobs::SyncToGitIndex::new(crate_name).enqueue(conn) {
warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job");
}
if let Err(error) = jobs::SyncToSparseIndex::new(crate_name).enqueue(conn) {
warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job");
}

Ok(opts)
Expand Down
7 changes: 3 additions & 4 deletions src/admin/yank_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use crate::db;
use crate::models::{Crate, Version};
use crate::schema::versions;
use crate::tasks::spawn_blocking;
use crate::worker::jobs;
use crate::worker::jobs::UpdateDefaultVersion;
use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion};
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;

Expand Down Expand Up @@ -63,8 +62,8 @@ fn yank(opts: Opts, conn: &mut PgConnection) -> anyhow::Result<()> {
.set(versions::yanked.eq(true))
.execute(conn)?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;

SyncToGitIndex::new(&krate.name).enqueue(conn)?;
SyncToSparseIndex::new(&krate.name).enqueue(conn)?;
UpdateDefaultVersion::new(krate.id).enqueue(conn)?;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCra
))
.map_err(|e| internal(format!("failed to upload crate: {e}")))?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;
jobs::SyncToGitIndex::new(&krate.name).enqueue(conn)?;
jobs::SyncToSparseIndex::new(&krate.name).enqueue(conn)?;

SendPublishNotificationsJob::new(version.id).enqueue(conn)?;

Expand Down
5 changes: 3 additions & 2 deletions src/controllers/version/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::util::errors::{bad_request, custom, version_not_found, AppResult};
use crate::views::{EncodableDependency, EncodableVersion};
use crate::worker::jobs::{self, UpdateDefaultVersion};
use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion};

use super::version_and_crate;

Expand Down Expand Up @@ -233,7 +233,8 @@ pub fn perform_version_yank_update(
};
insert_version_owner_action(conn, version.id, user.id, api_token_id, action)?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;
SyncToGitIndex::new(&krate.name).enqueue(conn)?;
SyncToSparseIndex::new(&krate.name).enqueue(conn)?;
UpdateDefaultVersion::new(krate.id).enqueue(conn)?;

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion src/tests/worker/git.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::models::Crate;
use crate::tests::builders::PublishBuilder;
use crate::tests::util::{RequestHelper, TestApp};
use crate::worker::jobs;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use http::StatusCode;

Expand Down Expand Up @@ -51,7 +52,8 @@ async fn index_smoke_test() {
let krate: Crate = assert_ok!(Crate::by_name("serde").first(conn));
assert_ok!(diesel::delete(crates::table.find(krate.id)).execute(conn));

assert_ok!(jobs::enqueue_sync_to_index("serde", conn));
assert_ok!(jobs::SyncToGitIndex::new("serde").enqueue(conn));
assert_ok!(jobs::SyncToSparseIndex::new("serde").enqueue(conn));
});

app.run_pending_background_jobs().await;
Expand Down
79 changes: 0 additions & 79 deletions src/worker/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
use crate::util::diesel::Conn;
use crates_io_worker::schema::background_jobs;
use crates_io_worker::{BackgroundJob, EnqueueError};
use diesel::dsl::{exists, not};
use diesel::prelude::*;
use diesel::sql_types::{Int2, Jsonb, Text};
use std::fmt::Display;

mod archive_version_downloads;
mod daily_db_maintenance;
mod delete_crate;
Expand Down Expand Up @@ -36,74 +28,3 @@ pub use self::send_publish_notifications::SendPublishNotificationsJob;
pub use self::sync_admins::SyncAdmins;
pub use self::typosquat::CheckTyposquat;
pub use self::update_default_version::UpdateDefaultVersion;

/// Enqueue both index sync jobs (git and sparse) for a crate, unless they
/// already exist in the background job queue.
///
/// Note that there are currently no explicit tests for this functionality,
/// since our test suite only allows us to use a single database connection
/// and the background worker queue locking only work when using multiple
/// connections.
#[instrument(name = "swirl.enqueue", skip_all, fields(message = "sync_to_index", krate = %krate))]
pub fn enqueue_sync_to_index<T: Display>(
krate: T,
conn: &mut impl Conn,
) -> Result<(), EnqueueError> {
// Returns jobs with matching `job_type`, `data` and `priority`,
// skipping ones that are already locked by the background worker.
let find_similar_jobs_query =
|job_type: &'static str, data: serde_json::Value, priority: i16| {
background_jobs::table
.select(background_jobs::id)
.filter(background_jobs::job_type.eq(job_type))
.filter(background_jobs::data.eq(data))
.filter(background_jobs::priority.eq(priority))
.for_update()
.skip_locked()
};

// Returns one `job_type, data, priority` row with values from the
// passed-in `job`, unless a similar row already exists.
let deduplicated_select_query =
|job_type: &'static str, data: serde_json::Value, priority: i16| {
diesel::select((
job_type.into_sql::<Text>(),
data.clone().into_sql::<Jsonb>(),
priority.into_sql::<Int2>(),
))
.filter(not(exists(find_similar_jobs_query(
job_type, data, priority,
))))
};

let to_git = deduplicated_select_query(
SyncToGitIndex::JOB_NAME,
serde_json::to_value(SyncToGitIndex::new(krate.to_string()))?,
SyncToGitIndex::PRIORITY,
);

let to_sparse = deduplicated_select_query(
SyncToSparseIndex::JOB_NAME,
serde_json::to_value(SyncToSparseIndex::new(krate.to_string()))?,
SyncToSparseIndex::PRIORITY,
);

// Insert index update background jobs, but only if they do not
// already exist.
let added_jobs_count = diesel::insert_into(background_jobs::table)
.values(to_git.union_all(to_sparse))
.into_columns((
background_jobs::job_type,
background_jobs::data,
background_jobs::priority,
))
.execute(conn)?;

// Print a log event if we skipped inserting a job due to deduplication.
if added_jobs_count != 2 {
let skipped_jobs_count = 2 - added_jobs_count;
info!(%skipped_jobs_count, "Skipped adding duplicate jobs to the background worker queue");
}

Ok(())
}

0 comments on commit 261f91e

Please sign in to comment.