Skip to content

Commit

Permalink
feat: Add multi-threaded analysis on import
Browse files Browse the repository at this point in the history
  • Loading branch information
Holzhaus committed Oct 12, 2024
1 parent ec2cf62 commit e95b3a7
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 81 deletions.
66 changes: 66 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ rusty-chromaprint = "0.2.0"
base64 = "0.22.1"
float_eq = "1"
ebur128 = "0.1"
async-channel = "2.3.1"
num_cpus = "1.16.0"

[dev-dependencies]
paste = "1"
Expand Down
84 changes: 4 additions & 80 deletions src/cli/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@
//! Functions related to importing files.

use super::ui;
use crate::analyzer;
use crate::musicbrainz::{MusicBrainzClient, MusicBrainzRelease};
use crate::release::ReleaseLike;
use crate::release_candidate::{ReleaseCandidate, ReleaseCandidateCollection};
use crate::util::walk_dir;
use crate::scanner::Scanner;
use crate::Cache;
use crate::{Config, TaggedFile, TaggedFileCollection};
use crate::{Config, TaggedFileCollection};
use clap::Parser;
use futures::StreamExt;
use std::collections::HashSet;
use std::path::PathBuf;

/// Command line arguments for the `import` CLI command.
Expand All @@ -28,55 +26,6 @@ pub struct Args {
path: PathBuf,
}

/// Find track collections in the given path.
fn find_track_collections(
config: &Config,
input_path: PathBuf,
) -> impl Iterator<Item = TaggedFileCollection> + '_ {
let supported_extensions = HashSet::from(["mp3", "flac"]);
walk_dir(input_path)
.filter_map(Result::ok)
.filter_map(move |(path, _dirs, files)| {
let tagged_files: Vec<TaggedFile> = files
.iter()
.filter(|path| {
path.extension()
.map(std::ffi::OsStr::to_ascii_lowercase)
.and_then(|extension| {
extension
.to_str()
.map(|extension| supported_extensions.contains(extension))
})
.unwrap_or(false)
})
.filter_map(|path| match TaggedFile::read_from_path(path) {
Ok(file) => Some(
file.with_analysis_results(
analyzer::analyze(config, path)
.inspect_err(|err| {
log::warn!(
"Analysis of {path} failed: {err}",
path = path.display()
);
})
.ok(),
),
),
Err(err) => {
log::warn!("Failed to read {}: {}", path.display(), err);
None
}
})
.collect();
if tagged_files.is_empty() {
return None;
}

log::info!("Found {} tracks in {}", tagged_files.len(), path.display(),);
Some(TaggedFileCollection::new(tagged_files))
})
}

/// Result returned from the [`select_release()`] function.
enum SelectionResult {
/// A candidate was selected and should be assigned to the track collection.
Expand Down Expand Up @@ -175,32 +124,7 @@ async fn select_release<'a>(
/// If the underlying [`walk_dir`] function encounters any form of I/O or other error, an error
/// variant will be returned.
pub async fn run(config: &Config, cache: Option<&Cache>, args: Args) -> crate::Result<()> {
let input_path = args.path;

let (scanner_tx, mut scanner_rx) = tokio::sync::mpsc::channel(20);
let cloned_config = config.clone();
let cloned_cache = cache.cloned();
let _scanner_handle = tokio::task::spawn(async move {
let musicbrainz = MusicBrainzClient::new(&cloned_config, cloned_cache.as_ref());
for track_collection in find_track_collections(&cloned_config, input_path) {
let candidates = match musicbrainz
.find_releases_by_similarity(&track_collection)
.await
{
Ok(releases) => ReleaseCandidateCollection::new(releases),
Err(err) => {
log::error!("Receiver dropped: {err}");
continue;
}
};

let item = (track_collection, candidates);
if let Err(err) = scanner_tx.send(item).await {
log::error!("Receiver dropped: {err}");
continue;
}
}
});
let mut scanner = Scanner::scan(config.clone(), cache.cloned(), args.path);

let (importer_tx, mut importer_rx) = tokio::sync::mpsc::channel::<(
TaggedFileCollection,
Expand All @@ -216,7 +140,7 @@ pub async fn run(config: &Config, cache: Option<&Cache>, args: Args) -> crate::R
});

let musicbrainz = MusicBrainzClient::new(config, cache);
while let Some((track_collection, candidates)) = scanner_rx.recv().await {
while let Some((track_collection, candidates)) = scanner.recv().await {
match select_release(config, &musicbrainz, track_collection, candidates).await? {
SelectionResult::Selected(track_collection, selected_candidate) => {
if let Err(err) = importer_tx
Expand Down
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ pub struct UiConfig {
pub struct AnalyzerConfig {
/// Analyzers that are enabled and will be used.
pub enabled: Vec<AnalyzerType>,
/// Number of parallel analyzer jobs (use 0 for the number of CPUs)
pub num_parallel_jobs: usize,
}

/// Analyzer type.
Expand Down
1 change: 1 addition & 0 deletions src/default_config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[analyzers]
num_parallel_jobs = 0
# The chromaprint_fingerprint analyzer is disabled by default.
enabled = ["track_length"]

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ mod media;
mod musicbrainz;
mod release;
mod release_candidate;
mod scanner;
mod tag;
mod taggedfile;
mod taggedfilecollection;
Expand Down
Loading

0 comments on commit e95b3a7

Please sign in to comment.