Skip to content

Commit

Permalink
worker/jobs: Inline enqueue_sync_to_index() fn
Browse files Browse the repository at this point in the history
  • Loading branch information
Turbo87 committed Oct 11, 2024
1 parent 4b46a71 commit b19bdfd
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 36 deletions.
7 changes: 5 additions & 2 deletions src/admin/delete_crate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
};

info!("{name}: Enqueuing index sync jobs…");
if let Err(error) = jobs::enqueue_sync_to_index(name, conn) {
warn!("{name}: Failed to enqueue index sync jobs: {error}");
if let Err(error) = jobs::SyncToGitIndex::new(name).enqueue(conn) {
warn!("{name}: Failed to enqueue SyncToGitIndex job: {error}");
}
if let Err(error) = jobs::SyncToSparseIndex::new(name).enqueue(conn) {
warn!("{name}: Failed to enqueue SyncToSparseIndex job: {error}");

Check warning on line 104 in src/admin/delete_crate.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_crate.rs#L100-L104

Added lines #L100 - L104 were not covered by tests
}

info!("{name}: Enqueuing DeleteCrateFromStorage job…");
Expand Down
8 changes: 6 additions & 2 deletions src/admin/delete_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::tasks::spawn_blocking;
use crate::worker::jobs;
use crate::{admin::dialoguer, db, schema::versions};
use anyhow::Context;
use crates_io_worker::BackgroundJob;
use diesel::{Connection, ExpressionMethods, QueryDsl};
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;

Expand Down Expand Up @@ -102,8 +103,11 @@ pub async fn run(opts: Opts) -> anyhow::Result<()> {
})?;

info!(%crate_name, "Enqueuing index sync jobs");
if let Err(error) = jobs::enqueue_sync_to_index(crate_name, conn) {
warn!(%crate_name, ?error, "Failed to enqueue index sync jobs");
if let Err(error) = jobs::SyncToGitIndex::new(crate_name).enqueue(conn) {
warn!(%crate_name, ?error, "Failed to enqueue SyncToGitIndex job");
}
if let Err(error) = jobs::SyncToSparseIndex::new(crate_name).enqueue(conn) {
warn!(%crate_name, ?error, "Failed to enqueue SyncToSparseIndex job");

Check warning on line 110 in src/admin/delete_version.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/delete_version.rs#L106-L110

Added lines #L106 - L110 were not covered by tests
}

Ok(opts)
Expand Down
7 changes: 3 additions & 4 deletions src/admin/yank_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use crate::db;
use crate::models::{Crate, Version};
use crate::schema::versions;
use crate::tasks::spawn_blocking;
use crate::worker::jobs;
use crate::worker::jobs::UpdateDefaultVersion;
use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion};
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;

Expand Down Expand Up @@ -63,8 +62,8 @@ fn yank(opts: Opts, conn: &mut PgConnection) -> anyhow::Result<()> {
.set(versions::yanked.eq(true))
.execute(conn)?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;

SyncToGitIndex::new(&krate.name).enqueue(conn)?;
SyncToSparseIndex::new(&krate.name).enqueue(conn)?;

Check warning on line 66 in src/admin/yank_version.rs

View check run for this annotation

Codecov / codecov/patch

src/admin/yank_version.rs#L65-L66

Added lines #L65 - L66 were not covered by tests
UpdateDefaultVersion::new(krate.id).enqueue(conn)?;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ pub async fn publish(app: AppState, req: BytesRequest) -> AppResult<Json<GoodCra
))
.map_err(|e| internal(format!("failed to upload crate: {e}")))?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;
jobs::SyncToGitIndex::new(&krate.name).enqueue(conn)?;
jobs::SyncToSparseIndex::new(&krate.name).enqueue(conn)?;

SendPublishNotificationsJob::new(version.id).enqueue(conn)?;

Expand Down
5 changes: 3 additions & 2 deletions src/controllers/version/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::tasks::spawn_blocking;
use crate::util::diesel::Conn;
use crate::util::errors::{bad_request, custom, version_not_found, AppResult};
use crate::views::{EncodableDependency, EncodableVersion};
use crate::worker::jobs::{self, UpdateDefaultVersion};
use crate::worker::jobs::{SyncToGitIndex, SyncToSparseIndex, UpdateDefaultVersion};

use super::version_and_crate;

Expand Down Expand Up @@ -233,7 +233,8 @@ pub fn perform_version_yank_update(
};
insert_version_owner_action(conn, version.id, user.id, api_token_id, action)?;

jobs::enqueue_sync_to_index(&krate.name, conn)?;
SyncToGitIndex::new(&krate.name).enqueue(conn)?;
SyncToSparseIndex::new(&krate.name).enqueue(conn)?;
UpdateDefaultVersion::new(krate.id).enqueue(conn)?;

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion src/tests/worker/git.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::models::Crate;
use crate::tests::builders::PublishBuilder;
use crate::tests::util::{RequestHelper, TestApp};
use crate::worker::jobs;
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use http::StatusCode;

Expand Down Expand Up @@ -51,7 +52,8 @@ async fn index_smoke_test() {
let krate: Crate = assert_ok!(Crate::by_name("serde").first(conn));
assert_ok!(diesel::delete(crates::table.find(krate.id)).execute(conn));

assert_ok!(jobs::enqueue_sync_to_index("serde", conn));
assert_ok!(jobs::SyncToGitIndex::new("serde").enqueue(conn));
assert_ok!(jobs::SyncToSparseIndex::new("serde").enqueue(conn));
});

app.run_pending_background_jobs().await;
Expand Down
24 changes: 0 additions & 24 deletions src/worker/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use crate::util::diesel::Conn;
use crates_io_worker::{BackgroundJob, EnqueueError};
use std::fmt::Display;

mod archive_version_downloads;
mod daily_db_maintenance;
mod delete_crate;
Expand Down Expand Up @@ -32,23 +28,3 @@ pub use self::send_publish_notifications::SendPublishNotificationsJob;
pub use self::sync_admins::SyncAdmins;
pub use self::typosquat::CheckTyposquat;
pub use self::update_default_version::UpdateDefaultVersion;

/// Enqueue both index sync jobs (git and sparse) for a crate, unless they
/// already exist in the background job queue.
///
/// Note that there are currently no explicit tests for this functionality,
/// since our test suite only allows us to use a single database connection
/// and the background worker queue locking only work when using multiple
/// connections.
#[instrument(name = "swirl.enqueue", skip_all, fields(message = "sync_to_index", krate = %krate))]
pub fn enqueue_sync_to_index<T: Display>(
krate: T,
conn: &mut impl Conn,
) -> Result<(), EnqueueError> {
let krate = krate.to_string();

SyncToGitIndex::new(krate.clone()).enqueue(conn)?;
SyncToSparseIndex::new(krate).enqueue(conn)?;

Ok(())
}

0 comments on commit b19bdfd

Please sign in to comment.