diff --git a/docs/changes/newsfragments/254.feature b/docs/changes/newsfragments/254.feature new file mode 100644 index 000000000..cdc461cb6 --- /dev/null +++ b/docs/changes/newsfragments/254.feature @@ -0,0 +1 @@ +Introduce :class:`junifer.pipeline.WorkDirManager` singleton class to manage working and temporary directories across pipeline by `Synchon Mandal`_ diff --git a/junifer/api/functions.py b/junifer/api/functions.py index 727d4581a..774f29c82 100644 --- a/junifer/api/functions.py +++ b/junifer/api/functions.py @@ -15,6 +15,7 @@ from ..datagrabber.base import BaseDataGrabber from ..markers.base import BaseMarker from ..markers.collection import MarkerCollection +from ..pipeline import WorkDirManager from ..pipeline.registry import build from ..preprocess.base import BasePreprocessor from ..storage.base import BaseFeatureStorage @@ -115,6 +116,8 @@ def run( # Convert str to Path if isinstance(workdir, str): workdir = Path(workdir) + # Initiate working directory manager + WorkDirManager(workdir) if not isinstance(elements, list) and elements is not None: elements = [elements] diff --git a/junifer/markers/falff/falff_estimator.py b/junifer/markers/falff/falff_estimator.py index f6bf57cae..e6e5bcf56 100644 --- a/junifer/markers/falff/falff_estimator.py +++ b/junifer/markers/falff/falff_estimator.py @@ -4,12 +4,9 @@ # Federico Raimondo # License: AGPL -import shutil import subprocess -import tempfile import typing from functools import lru_cache -from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union import nibabel as nib @@ -17,8 +14,9 @@ from nilearn import image as nimg from scipy.fft import fft, fftfreq +from ...pipeline import WorkDirManager +from ...pipeline.singleton import singleton from ...utils import logger, raise_error -from ..utils import singleton if TYPE_CHECKING: @@ -41,19 +39,24 @@ class ALFFEstimator: use_afni : bool Whether to use afni for computation. If False, will use python. + Attributes + ---------- + temp_dir_path : pathlib.Path + Path to the temporary directory for assets storage. + """ def __init__(self) -> None: self._file_path = None # Create temporary directory for intermittent storage of assets during - # computation via afni's 3dReHo - self.temp_dir_path = Path(tempfile.mkdtemp()) + # computation via afni's 3dRSFC + self.temp_dir_path = None def __del__(self) -> None: """Cleanup.""" - print("Cleaning up temporary directory...") # Delete temporary directory and ignore errors for read-only files - shutil.rmtree(self.temp_dir_path, ignore_errors=True) + if self.temp_dir_path is not None: + WorkDirManager().delete_tempdir(self.temp_dir_path) @staticmethod def _run_afni_cmd(cmd: str) -> None: @@ -123,6 +126,8 @@ def _compute_alff_afni( If the AFNI commands fails due to some issues """ + # Note: self.temp_dir_path is sure to exist before proceeding, so + # types checks are ignored further on. # Save niimg to nii.gz nifti_in_file_path = self.temp_dir_path / "input.nii" @@ -162,7 +167,7 @@ def _compute_alff_afni( self._run_afni_cmd(convert_cmd) # Cleanup intermediate files - for fname in self.temp_dir_path.glob("temp_*"): + for fname in self.temp_dir_path.glob("temp_*"): # type: ignore fname.unlink() # Load niftis @@ -268,6 +273,8 @@ def _compute( fALFF map. """ if use_afni: + # Create new temporary directory before using AFNI + self.temp_dir_path = WorkDirManager().get_tempdir(prefix="falff") output = self._compute_alff_afni( data=data, highpass=highpass, @@ -318,8 +325,8 @@ def fit_transform( # Clear the cache self._compute.cache_clear() # Clear temporary directory files - for file_ in self.temp_dir_path.iterdir(): - file_.unlink(missing_ok=True) + if self.temp_dir_path is not None: + WorkDirManager().delete_tempdir(self.temp_dir_path) # Set the new file path self._file_path = bold_path else: diff --git a/junifer/markers/falff/tests/test_falff_estimator.py b/junifer/markers/falff/tests/test_falff_estimator.py index d9e64b5b0..dc6b338dc 100644 --- a/junifer/markers/falff/tests/test_falff_estimator.py +++ b/junifer/markers/falff/tests/test_falff_estimator.py @@ -4,6 +4,7 @@ # License: AGPL import time +from pathlib import Path import pytest from nibabel import Nifti1Image @@ -11,23 +12,36 @@ from junifer.datareader import DefaultDataReader from junifer.markers.falff.falff_estimator import ALFFEstimator +from junifer.pipeline import WorkDirManager from junifer.pipeline.utils import _check_afni from junifer.testing.datagrabbers import PartlyCloudyTestingDataGrabber from junifer.utils import logger -def test_ALFFEstimator_cache_python() -> None: - """Test that the cache works properly when using python.""" - with PartlyCloudyTestingDataGrabber() as dg: - input = dg["sub-01"] +def test_ALFFEstimator_cache_python(tmp_path: Path) -> None: + """Test that the cache works properly when using python. - input = DefaultDataReader().fit_transform(input) + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + """ + # Get subject from datagrabber + with PartlyCloudyTestingDataGrabber() as dg: + subject = dg["sub-01"] + # Read data for subject + subject_data = DefaultDataReader().fit_transform(subject) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path + # Setup estimator estimator = ALFFEstimator() + + # Compute without cache start_time = time.time() alff, falff = estimator.fit_transform( use_afni=False, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=None, @@ -36,14 +50,12 @@ def test_ALFFEstimator_cache_python() -> None: logger.info(f"ALFF Estimator First time: {first_time}") assert isinstance(alff, Nifti1Image) assert isinstance(falff, Nifti1Image) - n_files = len(list(estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python - # Now fit again, should be faster + # Compute again with cache, should be faster start_time = time.time() alff, falff = estimator.fit_transform( use_afni=False, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=None, @@ -51,15 +63,12 @@ def test_ALFFEstimator_cache_python() -> None: second_time = time.time() - start_time logger.info(f"ALFF Estimator Second time: {second_time}") assert second_time < (first_time / 1000) - n_files = len(list(estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python - # Now change a parameter, should compute again, without clearing the - # cache + # Change a parameter and compute again without cache start_time = time.time() alff, falff = estimator.fit_transform( use_afni=False, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.11, tr=None, @@ -67,14 +76,12 @@ def test_ALFFEstimator_cache_python() -> None: third_time = time.time() - start_time logger.info(f"ALFF Estimator Third time: {third_time}") assert third_time > (first_time / 10) - n_files = len(list(estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python - # Now fit again with the previous params, should be fast + # Compute again with cache, should be faster start_time = time.time() alff, falff = estimator.fit_transform( use_afni=False, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=None, @@ -82,19 +89,17 @@ def test_ALFFEstimator_cache_python() -> None: fourth = time.time() - start_time logger.info(f"ALFF Estimator Fourth time: {fourth}") assert fourth < (first_time / 1000) - n_files = len(list(estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python - # Now change the data, it should clear the cache + # Change the data and it should clear the cache with PartlyCloudyTestingDataGrabber() as dg: - input = dg["sub-02"] - - input = DefaultDataReader().fit_transform(input) + subject = dg["sub-02"] + # Read data for new subject + subject_data = DefaultDataReader().fit_transform(subject) start_time = time.time() alff, falff = estimator.fit_transform( use_afni=False, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=None, @@ -102,25 +107,35 @@ def test_ALFFEstimator_cache_python() -> None: fifth = time.time() - start_time logger.info(f"ALFF Estimator Fifth time: {fifth}") assert fifth > (first_time / 10) - n_files = len(list(estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python @pytest.mark.skipif( _check_afni() is False, reason="requires afni to be in PATH" ) -def test_ALFFEstimator_cache_afni() -> None: - """Test that the cache works properly when using afni.""" - with PartlyCloudyTestingDataGrabber() as dg: - input = dg["sub-01"] +def test_ALFFEstimator_cache_afni(tmp_path: Path) -> None: + """Test that the cache works properly when using afni. - input = DefaultDataReader().fit_transform(input) + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + """ + # Get subject from datagrabber + with PartlyCloudyTestingDataGrabber() as dg: + subject = dg["sub-01"] + # Read data for subject + subject_data = DefaultDataReader().fit_transform(subject) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path + # Setup estimator estimator = ALFFEstimator() + + # Compute with cache start_time = time.time() alff, falff = estimator.fit_transform( use_afni=True, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=None, @@ -132,11 +147,11 @@ def test_ALFFEstimator_cache_afni() -> None: n_files = len(list(estimator.temp_dir_path.glob("*"))) assert n_files == 3 # input + alff + falff - # Now fit again, should be faster + # Compute again with cache, should be faster start_time = time.time() alff, falff = estimator.fit_transform( use_afni=True, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=None, @@ -147,12 +162,11 @@ def test_ALFFEstimator_cache_afni() -> None: n_files = len(list(estimator.temp_dir_path.glob("*"))) assert n_files == 3 # input + alff + falff - # Now change a parameter, should compute again, without clearing the - # cache + # Change a parameter and compute again without cache start_time = time.time() alff, falff = estimator.fit_transform( use_afni=True, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.11, tr=None, @@ -161,13 +175,13 @@ def test_ALFFEstimator_cache_afni() -> None: logger.info(f"ALFF Estimator Third time: {third_time}") assert third_time > (first_time / 10) n_files = len(list(estimator.temp_dir_path.glob("*"))) - assert n_files == 5 # input + 2 * alff + 2 * falff + assert n_files == 3 # input + alff + falff - # Now fit again with the previous params, should be fast + # Compute with cache, should be faster start_time = time.time() alff, falff = estimator.fit_transform( use_afni=True, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=None, @@ -176,18 +190,18 @@ def test_ALFFEstimator_cache_afni() -> None: logger.info(f"ALFF Estimator Fourth time: {fourth}") assert fourth < (first_time / 1000) n_files = len(list(estimator.temp_dir_path.glob("*"))) - assert n_files == 5 # input + 2 * alff + 2 * falff + assert n_files == 3 # input + alff + falff - # Now change the data, it should clear the cache + # Change the data and it should clear the cache with PartlyCloudyTestingDataGrabber() as dg: - input = dg["sub-02"] - - input = DefaultDataReader().fit_transform(input) + subject = dg["sub-02"] + # Read data for new subject + subject_data = DefaultDataReader().fit_transform(subject) start_time = time.time() alff, falff = estimator.fit_transform( use_afni=True, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=None, @@ -202,18 +216,29 @@ def test_ALFFEstimator_cache_afni() -> None: @pytest.mark.skipif( _check_afni() is False, reason="requires afni to be in PATH" ) -def test_ALFFEstimator_afni_vs_python() -> None: - """Test that the cache works properly when using afni.""" - with PartlyCloudyTestingDataGrabber() as dg: - input = dg["sub-01"] +def test_ALFFEstimator_afni_vs_python(tmp_path: Path) -> None: + """Test that the cache works properly when using afni. - input = DefaultDataReader().fit_transform(input) + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ + # Get subject from datagrabber + with PartlyCloudyTestingDataGrabber() as dg: + subject = dg["sub-01"] + # Read data for subject + subject_data = DefaultDataReader().fit_transform(subject) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path + # Setup estimator estimator = ALFFEstimator() # Use an arbitrary TR to test the AFNI vs Python implementation afni_alff, afni_falff = estimator.fit_transform( use_afni=True, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=2.5, @@ -221,7 +246,7 @@ def test_ALFFEstimator_afni_vs_python() -> None: python_alff, python_falff = estimator.fit_transform( use_afni=False, - input_data=input["BOLD"], + input_data=subject_data["BOLD"], highpass=0.01, lowpass=0.1, tr=2.5, diff --git a/junifer/markers/falff/tests/test_falff_parcels.py b/junifer/markers/falff/tests/test_falff_parcels.py index be5fac3be..b6ec7cdf1 100644 --- a/junifer/markers/falff/tests/test_falff_parcels.py +++ b/junifer/markers/falff/tests/test_falff_parcels.py @@ -12,6 +12,7 @@ from junifer.datareader import DefaultDataReader from junifer.markers.falff import ALFFParcels +from junifer.pipeline import WorkDirManager from junifer.pipeline.utils import _check_afni from junifer.storage import SQLiteFeatureStorage from junifer.testing.datagrabbers import PartlyCloudyTestingDataGrabber @@ -21,15 +22,20 @@ _PARCELLATION = "Schaefer100x7" -def test_ALFFParcels_python() -> None: - """Test ALFFParcels using python.""" - # Get the SPM auditory data: +def test_ALFFParcels_python(tmp_path: Path) -> None: + """Test ALFFParcels using python. + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ with PartlyCloudyTestingDataGrabber() as dg: input = dg["sub-01"] input = DefaultDataReader().fit_transform(input) - # Create ParcelAggregation object + WorkDirManager().workdir = tmp_path marker = ALFFParcels( parcellation=_PARCELLATION, method="mean", @@ -46,14 +52,20 @@ def test_ALFFParcels_python() -> None: @pytest.mark.skipif( _check_afni() is False, reason="requires afni to be in PATH" ) -def test_ALFFParcels_afni() -> None: - """Test ALFFParcels using afni.""" - # Get the SPM auditory data: +def test_ALFFParcels_afni(tmp_path: Path) -> None: + """Test ALFFParcels using afni. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ with PartlyCloudyTestingDataGrabber() as dg: input = dg["sub-01"] input = DefaultDataReader().fit_transform(input) - # Create ParcelAggregation object + WorkDirManager().workdir = tmp_path marker = ALFFParcels( parcellation=_PARCELLATION, method="mean", @@ -83,12 +95,15 @@ def test_ALFFParcels_afni() -> None: "fractional", [True, False], ids=["fractional", "non-fractional"] ) def test_ALFFParcels_python_vs_afni( + tmp_path: Path, fractional: bool, ) -> None: """Test ALFFParcels using python. Parameters ---------- + tmp_path : pathlib.Path + The path to the test directory. fractional : bool Whether to compute fractional ALFF or not. @@ -98,7 +113,7 @@ def test_ALFFParcels_python_vs_afni( input = dg["sub-01"] input = DefaultDataReader().fit_transform(input) - # Create ParcelAggregation object + WorkDirManager().workdir = tmp_path marker_python = ALFFParcels( parcellation=_PARCELLATION, method="mean", @@ -137,12 +152,13 @@ def test_ALFFParcels_storage( ---------- tmp_path : pathlib.Path The path to the test directory. + """ with PartlyCloudyTestingDataGrabber() as dg: # Use first subject input = dg["sub-01"] input = DefaultDataReader().fit_transform(input) - # Create ParcelAggregation object + WorkDirManager().workdir = tmp_path marker = ALFFParcels( parcellation=_PARCELLATION, method="mean", diff --git a/junifer/markers/falff/tests/test_falff_spheres.py b/junifer/markers/falff/tests/test_falff_spheres.py index 556cd3844..825dc6a40 100644 --- a/junifer/markers/falff/tests/test_falff_spheres.py +++ b/junifer/markers/falff/tests/test_falff_spheres.py @@ -12,6 +12,7 @@ from junifer.datareader import DefaultDataReader from junifer.markers.falff import ALFFSpheres +from junifer.pipeline import WorkDirManager from junifer.pipeline.utils import _check_afni from junifer.storage import SQLiteFeatureStorage from junifer.testing.datagrabbers import PartlyCloudyTestingDataGrabber @@ -21,15 +22,21 @@ _COORDINATES = "DMNBuckner" -def test_ALFFSpheres_python() -> None: - """Test ALFFSpheres using python.""" - # Get the SPM auditory data: +def test_ALFFSpheres_python(tmp_path: Path) -> None: + """Test ALFFSpheres using python. + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ with PartlyCloudyTestingDataGrabber() as dg: input = dg["sub-01"] input = DefaultDataReader().fit_transform(input) - # Create ParcelAggregation object + + WorkDirManager().workdir = tmp_path marker = ALFFSpheres( coords=_COORDINATES, radius=5, @@ -47,14 +54,21 @@ def test_ALFFSpheres_python() -> None: @pytest.mark.skipif( _check_afni() is False, reason="requires afni to be in PATH" ) -def test_ALFFSpheres_afni() -> None: - """Test ALFFSpheres using afni.""" - # Get the SPM auditory data: +def test_ALFFSpheres_afni(tmp_path: Path) -> None: + """Test ALFFSpheres using afni. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ with PartlyCloudyTestingDataGrabber() as dg: input = dg["sub-01"] input = DefaultDataReader().fit_transform(input) - # Create ParcelAggregation object + + WorkDirManager().workdir = tmp_path marker = ALFFSpheres( coords=_COORDINATES, radius=5, @@ -88,20 +102,25 @@ def test_ALFFSpheres_afni() -> None: "fractional", [True, False], ids=["fractional", "non-fractional"] ) def test_ALFFSpheres_python_vs_afni( + tmp_path: Path, fractional: bool, ) -> None: """Test ALFFSpheres python vs afni results. Parameters ---------- + tmp_path : pathlib.Path + The path to the test directory. fractional : bool Whether to compute fractional ALFF or not. + """ with PartlyCloudyTestingDataGrabber() as dg: input = dg["sub-01"] input = DefaultDataReader().fit_transform(input) - # Create ParcelAggregation object + + WorkDirManager().workdir = tmp_path marker_python = ALFFSpheres( coords=_COORDINATES, radius=5, @@ -142,12 +161,13 @@ def test_ALFFSpheres_storage( ---------- tmp_path : pathlib.Path The path to the test directory. + """ with PartlyCloudyTestingDataGrabber() as dg: # Use first subject input = dg["sub-01"] input = DefaultDataReader().fit_transform(input) - # Create ParcelAggregation object + WorkDirManager().workdir = tmp_path marker = ALFFSpheres( coords=_COORDINATES, radius=5, diff --git a/junifer/markers/reho/reho_estimator.py b/junifer/markers/reho/reho_estimator.py index 211c5d44c..d8a6c7e81 100644 --- a/junifer/markers/reho/reho_estimator.py +++ b/junifer/markers/reho/reho_estimator.py @@ -6,12 +6,9 @@ import hashlib -import shutil import subprocess -import tempfile from functools import lru_cache from itertools import product -from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast import nibabel as nib @@ -20,8 +17,9 @@ from nilearn import masking as nmask from scipy.stats import rankdata +from ...pipeline import WorkDirManager +from ...pipeline.singleton import singleton from ...utils import logger, raise_error -from ..utils import singleton if TYPE_CHECKING: @@ -50,12 +48,13 @@ def __init__(self) -> None: self._file_path = None # Create temporary directory for intermittent storage of assets during # computation via afni's 3dReHo - self.temp_dir_path = Path(tempfile.mkdtemp()) + self.temp_dir_path = None def __del__(self) -> None: """Cleanup.""" # Delete temporary directory and ignore errors for read-only files - shutil.rmtree(self.temp_dir_path, ignore_errors=True) + if self.temp_dir_path is not None: + WorkDirManager().delete_tempdir(self.temp_dir_path) def _compute_reho_afni( self, @@ -137,12 +136,15 @@ def _compute_reho_afni( https://doi.org/10.1089/brain.2013.0154 """ + # Note: self.temp_dir_path is sure to exist before proceeding, so + # types checks are ignored further on. + # Save niimg to nii.gz - nifti_in_file_path = self.temp_dir_path / "input.nii" + nifti_in_file_path = self.temp_dir_path / "input.nii" # type: ignore nib.save(data, nifti_in_file_path) # Set 3dReHo command - reho_afni_out_path_prefix = self.temp_dir_path / "reho" + reho_afni_out_path_prefix = self.temp_dir_path / "reho" # type: ignore reho_cmd: List[str] = [ "3dReHo", f"-prefix {reho_afni_out_path_prefix.resolve()}", @@ -194,7 +196,8 @@ def _compute_reho_afni( sha256_params = hashlib.sha256(bytes(reho_cmd_str, "utf-8")) # Convert afni to nifti reho_afni_to_nifti_out_path = ( - self.temp_dir_path / f"output_{sha256_params.hexdigest()}.nii" + self.temp_dir_path + / f"output_{sha256_params.hexdigest()}.nii" # type: ignore ) convert_cmd: List[str] = [ "3dAFNItoNIFTI", @@ -225,7 +228,7 @@ def _compute_reho_afni( ) # Cleanup intermediate files - for fname in self.temp_dir_path.glob("reho*"): + for fname in self.temp_dir_path.glob("reho*"): # type: ignore fname.unlink() # Load nifti @@ -405,7 +408,7 @@ def _compute_reho_python( ) output = nimg.new_img_like(data, reho_map, copy_header=False) - return output + return output # type: ignore @lru_cache(maxsize=None, typed=True) def _compute( @@ -431,6 +434,8 @@ def _compute( """ if use_afni: + # Create new temporary directory before using AFNI + self.temp_dir_path = WorkDirManager().get_tempdir(prefix="reho") output = self._compute_reho_afni(data, **reho_params) else: output = self._compute_reho_python(data, **reho_params) @@ -466,8 +471,8 @@ def fit_transform( # Clear the cache self._compute.cache_clear() # Clear temporary directory files - for file_ in self.temp_dir_path.iterdir(): - file_.unlink(missing_ok=True) + if self.temp_dir_path is not None: + WorkDirManager().delete_tempdir(self.temp_dir_path) # Set the new file path self._file_path = bold_path else: diff --git a/junifer/markers/reho/tests/test_reho_estimator.py b/junifer/markers/reho/tests/test_reho_estimator.py index 39bdca76c..a08280749 100644 --- a/junifer/markers/reho/tests/test_reho_estimator.py +++ b/junifer/markers/reho/tests/test_reho_estimator.py @@ -4,6 +4,7 @@ # License: AGPL import time +from pathlib import Path import nibabel as nib import pytest @@ -11,21 +12,32 @@ from junifer.datareader.default import DefaultDataReader from junifer.markers.reho.reho_estimator import ReHoEstimator +from junifer.pipeline import WorkDirManager from junifer.pipeline.utils import _check_afni from junifer.testing.datagrabbers import PartlyCloudyTestingDataGrabber from junifer.utils.logging import logger -def test_reho_estimator_cache_python() -> None: - """Test that the cache works properly when using Python implementation.""" +def test_reho_estimator_cache_python(tmp_path: Path) -> None: + """Test that the cache works properly when using Python implementation. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ # Get subject from datagrabber with PartlyCloudyTestingDataGrabber() as dg: subject = dg["sub-01"] # Read data for subject subject_data = DefaultDataReader().fit_transform(subject) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Setup estimator reho_estimator = ReHoEstimator() + # Compute without cache first_tic = time.time() reho_map_without_cache = reho_estimator.fit_transform( use_afni=False, @@ -37,11 +49,8 @@ def test_reho_estimator_cache_python() -> None: f"ReHo estimator in Python without cache: {first_toc - first_tic}" ) assert isinstance(reho_map_without_cache, nib.Nifti1Image) - # Count intermediate files - n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python - # Now fit again, should be faster + # Compute again with cache, should be faster second_tic = time.time() reho_map_with_cache = reho_estimator.fit_transform( use_afni=False, @@ -55,12 +64,8 @@ def test_reho_estimator_cache_python() -> None: assert isinstance(reho_map_with_cache, nib.Nifti1Image) # Check that cache is being used assert (second_toc - second_tic) < ((first_toc - first_tic) / 1000) - # Count intermediate files - n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python - # Now change a parameter, should compute again, without clearing the - # cache + # Change a parameter and compute again without cache third_tic = time.time() reho_map_with_partial_cache = reho_estimator.fit_transform( use_afni=False, @@ -74,11 +79,8 @@ def test_reho_estimator_cache_python() -> None: assert isinstance(reho_map_with_partial_cache, nib.Nifti1Image) # Should require more time assert (third_toc - third_tic) > ((first_toc - first_tic) / 10) - # Count intermediate files - n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python - # Now fit again with the previous params, should be fast + # Compute again with cache, should be faster fourth_tic = time.time() reho_map_with_new_cache = reho_estimator.fit_transform( use_afni=False, @@ -92,11 +94,8 @@ def test_reho_estimator_cache_python() -> None: assert isinstance(reho_map_with_new_cache, nib.Nifti1Image) # Should require less time assert (fourth_toc - fourth_tic) < ((first_toc - first_tic) / 1000) - # Count intermediate files - n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python - # Now change the data, it should clear the cache + # Change the data and it should clear the cache with PartlyCloudyTestingDataGrabber() as dg: subject = dg["sub-02"] # Read data for new subject @@ -116,24 +115,31 @@ def test_reho_estimator_cache_python() -> None: assert isinstance(reho_map_with_different_cache, nib.Nifti1Image) # Should take less time assert (fifth_toc - fifth_tic) > ((first_toc - first_tic) / 10) - # Count intermediate files - n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) - assert n_files == 0 # no files in python @pytest.mark.skipif( _check_afni() is False, reason="requires afni to be in PATH" ) -def test_reho_estimator_cache_afni() -> None: - """Test that the cache works properly when using afni.""" +def test_reho_estimator_cache_afni(tmp_path: Path) -> None: + """Test that the cache works properly when using afni. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ # Get subject from datagrabber with PartlyCloudyTestingDataGrabber() as dg: subject = dg["sub-01"] # Read data for subject subject_data = DefaultDataReader().fit_transform(subject) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Setup estimator reho_estimator = ReHoEstimator() + # Compute without cache first_tic = time.time() reho_map_without_cache = reho_estimator.fit_transform( use_afni=True, @@ -149,7 +155,7 @@ def test_reho_estimator_cache_afni() -> None: n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) assert n_files == 2 # input + reho - # Now fit again, should be faster + # Compute again with cache, should be faster second_tic = time.time() reho_map_with_cache = reho_estimator.fit_transform( use_afni=True, @@ -166,8 +172,7 @@ def test_reho_estimator_cache_afni() -> None: n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) assert n_files == 2 # input + reho - # Now change a parameter, should compute again, without clearing the - # cache + # Change a parameter and compute again without cache third_tic = time.time() reho_map_with_partial_cache = reho_estimator.fit_transform( use_afni=True, @@ -183,9 +188,9 @@ def test_reho_estimator_cache_afni() -> None: assert (third_toc - third_tic) > ((first_toc - first_tic) / 10) # Count intermediate files n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) - assert n_files == 3 # input + 2 * reho + assert n_files == 2 # input + reho - # Now fit again with the previous params, should be fast + # Compute with cache, should be faster fourth_tic = time.time() reho_map_with_new_cache = reho_estimator.fit_transform( use_afni=True, @@ -200,9 +205,9 @@ def test_reho_estimator_cache_afni() -> None: # Should require less time assert (fourth_toc - fourth_tic) < ((first_toc - first_tic) / 1000) n_files = len(list(reho_estimator.temp_dir_path.glob("*"))) - assert n_files == 3 # input + 2 * reho + assert n_files == 2 # input + reho - # Now change the data, it should clear the cache + # Change the data and it should clear the cache with PartlyCloudyTestingDataGrabber() as dg: subject = dg["sub-02"] # Read data for new subject @@ -229,13 +234,22 @@ def test_reho_estimator_cache_afni() -> None: @pytest.mark.skipif( _check_afni() is False, reason="requires afni to be in PATH" ) -def test_reho_estimator_afni_vs_python() -> None: - """Compare afni and Python implementations.""" +def test_reho_estimator_afni_vs_python(tmp_path: Path) -> None: + """Compare afni and Python implementations. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ # Get subject from datagrabber with PartlyCloudyTestingDataGrabber() as dg: subject = dg["sub-01"] # Read data for subject subject_data = DefaultDataReader().fit_transform(subject) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Setup estimator reho_estimator = ReHoEstimator() diff --git a/junifer/markers/reho/tests/test_reho_parcels.py b/junifer/markers/reho/tests/test_reho_parcels.py index 285560cff..488eda53a 100644 --- a/junifer/markers/reho/tests/test_reho_parcels.py +++ b/junifer/markers/reho/tests/test_reho_parcels.py @@ -10,6 +10,7 @@ from scipy.stats import pearsonr from junifer.markers.reho.reho_parcels import ReHoParcels +from junifer.pipeline import WorkDirManager from junifer.pipeline.utils import _check_afni from junifer.storage.sqlite import SQLiteFeatureStorage from junifer.testing.datagrabbers import SPMAuditoryTestingDataGrabber @@ -18,13 +19,22 @@ PARCELLATION = "Schaefer100x7" -def test_reho_parcels_computation() -> None: - """Test ReHoParcels fit-transform.""" +def test_reho_parcels_computation(tmp_path: Path) -> None: + """Test ReHoParcels fit-transform. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ with SPMAuditoryTestingDataGrabber() as dg: # Use first subject subject_data = dg["sub001"] # Load image to memory fmri_img = nimg.load_img(subject_data["BOLD"]["path"]) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Initialize marker reho_parcels_marker = ReHoParcels(parcellation=PARCELLATION) # Fit transform marker on data @@ -49,14 +59,23 @@ def test_reho_parcels_computation() -> None: @pytest.mark.skipif( _check_afni() is False, reason="requires afni to be in PATH" ) -def test_reho_parcels_computation_comparison() -> None: - """Test ReHoParcels fit-transform implementation comparison..""" +def test_reho_parcels_computation_comparison(tmp_path: Path) -> None: + """Test ReHoParcels fit-transform implementation comparison. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ with SPMAuditoryTestingDataGrabber() as dg: # Use first subject subject_data = dg["sub001"] # Load image to memory fmri_img = nimg.load_img(subject_data["BOLD"]["path"]) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Initialize marker with use_afni=False reho_parcels_marker_python = ReHoParcels( parcellation=PARCELLATION, use_afni=False @@ -101,6 +120,8 @@ def test_reho_parcels_storage(tmp_path: Path) -> None: subject_data = dg["sub001"] # Load image to memory fmri_img = nimg.load_img(subject_data["BOLD"]["path"]) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Initialize marker reho_parcels_marker = ReHoParcels(parcellation=PARCELLATION) # Initialize storage diff --git a/junifer/markers/reho/tests/test_reho_spheres.py b/junifer/markers/reho/tests/test_reho_spheres.py index 3026bc9f5..c662980be 100644 --- a/junifer/markers/reho/tests/test_reho_spheres.py +++ b/junifer/markers/reho/tests/test_reho_spheres.py @@ -10,6 +10,7 @@ from scipy.stats import pearsonr from junifer.markers.reho.reho_spheres import ReHoSpheres +from junifer.pipeline import WorkDirManager from junifer.pipeline.utils import _check_afni from junifer.storage.sqlite import SQLiteFeatureStorage from junifer.testing.datagrabbers import SPMAuditoryTestingDataGrabber @@ -18,13 +19,22 @@ COORDINATES = "DMNBuckner" -def test_reho_spheres_computation() -> None: - """Test ReHoSpheres fit-transform.""" +def test_reho_spheres_computation(tmp_path: Path) -> None: + """Test ReHoSpheres fit-transform. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ with SPMAuditoryTestingDataGrabber() as dg: # Use first subject subject_data = dg["sub001"] # Load image to memory fmri_img = nimg.load_img(subject_data["BOLD"]["path"]) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Initialize marker reho_spheres_marker = ReHoSpheres(coords=COORDINATES, radius=10.0) # Fit transform marker on data @@ -49,14 +59,23 @@ def test_reho_spheres_computation() -> None: @pytest.mark.skipif( _check_afni() is False, reason="requires afni to be in PATH" ) -def test_reho_spheres_computation_comparison() -> None: - """Test ReHoSpheres fit-transform implementation comparison..""" +def test_reho_spheres_computation_comparison(tmp_path: Path) -> None: + """Test ReHoSpheres fit-transform implementation comparison. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ with SPMAuditoryTestingDataGrabber() as dg: # Use first subject subject_data = dg["sub001"] # Load image to memory fmri_img = nimg.load_img(subject_data["BOLD"]["path"]) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Initialize marker with use_afni=False reho_spheres_marker_python = ReHoSpheres( coords=COORDINATES, radius=10.0, use_afni=False @@ -101,6 +120,8 @@ def test_reho_spheres_storage(tmp_path: Path) -> None: subject_data = dg["sub001"] # Load image to memory fmri_img = nimg.load_img(subject_data["BOLD"]["path"]) + # Update workdir to current test's tmp_path + WorkDirManager().workdir = tmp_path # Initialize marker reho_spheres_marker = ReHoSpheres(coords=COORDINATES, radius=10.0) # Initialize storage diff --git a/junifer/markers/utils.py b/junifer/markers/utils.py index 2b5f3ded0..7e785a174 100644 --- a/junifer/markers/utils.py +++ b/junifer/markers/utils.py @@ -7,7 +7,7 @@ # Federico Raimondo # License: AGPL -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union +from typing import Callable, List, Optional, Tuple, Union import numpy as np import pandas as pd @@ -16,45 +16,6 @@ from ..utils import raise_error -def singleton(cls: Type) -> Type: - """Make a class singleton. - - Parameters - ---------- - cls : class - The class to designate as singleton. - - Returns - ------- - class - The only instance of the class. - - """ - instances: Dict = {} - - def get_instance(*args: Any, **kwargs: Any) -> Type: - """Get the only instance for a class. - - Parameters - ---------- - *args : tuple - The positional arguments to pass to the class. - **kwargs : dict - The keyword arguments to pass to the class. - - Returns - ------- - class - The only instance of the class. - - """ - if cls not in instances: - instances[cls] = cls(*args, **kwargs) - return instances[cls] - - return get_instance - - def _ets( bold_ts: np.ndarray, roi_names: Union[None, List[str]] = None, diff --git a/junifer/pipeline/__init__.py b/junifer/pipeline/__init__.py index 8d1681388..1ea157d60 100644 --- a/junifer/pipeline/__init__.py +++ b/junifer/pipeline/__init__.py @@ -6,3 +6,4 @@ from . import registry from .pipeline_step_mixin import PipelineStepMixin from .update_meta_mixin import UpdateMetaMixin +from .workdir_manager import WorkDirManager diff --git a/junifer/pipeline/singleton.py b/junifer/pipeline/singleton.py new file mode 100644 index 000000000..b04d7507d --- /dev/null +++ b/junifer/pipeline/singleton.py @@ -0,0 +1,45 @@ +"""Provide a singleton class to be used by pipeline components.""" + +# Authors: Synchon Mandal +# License: AGPL + +from typing import Any, Dict, Type + + +def singleton(cls: Type) -> Type: + """Make a class singleton. + + Parameters + ---------- + cls : class + The class to designate as singleton. + + Returns + ------- + class + The only instance of the class. + + """ + instances: Dict = {} + + def get_instance(*args: Any, **kwargs: Any) -> Type: + """Get the only instance for a class. + + Parameters + ---------- + *args : tuple + The positional arguments to pass to the class. + **kwargs : dict + The keyword arguments to pass to the class. + + Returns + ------- + class + The only instance of the class. + + """ + if cls not in instances: + instances[cls] = cls(*args, **kwargs) + return instances[cls] + + return get_instance diff --git a/junifer/pipeline/tests/test_workdir_manager.py b/junifer/pipeline/tests/test_workdir_manager.py new file mode 100644 index 000000000..7cebfe815 --- /dev/null +++ b/junifer/pipeline/tests/test_workdir_manager.py @@ -0,0 +1,53 @@ +"""Provide tests for WorkDirManager.""" + +# Authors: Synchon Mandal +# License: AGPL + +from pathlib import Path + +from junifer.pipeline import WorkDirManager + + +def test_workdir_manager_singleton() -> None: + """Test that WorkDirManager is a singleton.""" + workdir_mgr_1 = WorkDirManager() + workdir_mgr_2 = WorkDirManager() + assert id(workdir_mgr_1) == id(workdir_mgr_2) + + +def test_workdir_manager_workdir(tmp_path: Path) -> None: + """Test WorkDirManager correctly sets workdir. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ + workdir_mgr = WorkDirManager() + workdir_mgr.workdir = tmp_path + assert workdir_mgr.workdir == tmp_path + + +def test_workdir_manager_get_and_delete_tempdir(tmp_path: Path) -> None: + """Test WorkDirManager gets and deletes temporary directories correctly. + + Parameters + ---------- + tmp_path : pathlib.Path + The path to the test directory. + + """ + workdir_mgr = WorkDirManager() + workdir_mgr.workdir = tmp_path + # Check no root temporary directory + assert workdir_mgr.root_tempdir is None + + tempdir = workdir_mgr.get_tempdir() + # Should create a temporary directory + assert workdir_mgr.root_tempdir is not None + + workdir_mgr.delete_tempdir(tempdir) + workdir_mgr._cleanup() + # Should remove temporary directory + assert workdir_mgr.root_tempdir is None diff --git a/junifer/pipeline/workdir_manager.py b/junifer/pipeline/workdir_manager.py new file mode 100644 index 000000000..4f2b0c787 --- /dev/null +++ b/junifer/pipeline/workdir_manager.py @@ -0,0 +1,162 @@ +"""Provide a work directory manager class to be used by pipeline components.""" + +# Authors: Synchon Mandal +# Federico Raimondo +# License: AGPL + +import shutil +import tempfile +from pathlib import Path +from typing import Optional, Union + +from ..utils import logger +from .singleton import singleton + + +@singleton +class WorkDirManager: + """Class for working directory manager. + + This class is a singleton and is used for managing temporary and working + directories used across the pipeline by datagrabbers, preprocessors, + markers and so on. It maintains a single super-directory and provides + directories on-demand and cleans after itself thus keeping the user + filesystem clean. + + Parameters + ---------- + workdir : str or pathlib.Path, optional + The path to the super-directory. If None, "TMPDIR/junifer" is used + where TMPDIR is the platform-dependent temporary directory. + + Attributes + ---------- + workdir : pathlib.Path + The path to the working directory. + root_tempdir : pathlib.Path or None + The path to the root temporary directory. + + """ + + def __init__(self, workdir: Optional[Union[str, Path]] = None) -> None: + """Initialize the class.""" + self._workdir = workdir + self._root_tempdir = None + + self._set_default_workdir() + + def _set_default_workdir(self) -> None: + """Set the default working directory if not set already.""" + # Check and set topmost level directory if not provided + if self._workdir is None: + self._workdir = Path(tempfile.gettempdir()) / "junifer" + # Create directory if not found + if not self._workdir.is_dir(): + logger.debug( + "Creating working directory at " + f"{self._workdir.resolve()!s}" + ) + self._workdir.mkdir(parents=True) + logger.debug( + f"Setting working directory to {self._workdir.resolve()!s}" + ) + + def __del__(self) -> None: + """Destructor.""" + self._cleanup() + + def _cleanup(self) -> None: + """Clean up the temporary directories.""" + # Remove root temporary directory + if self._root_tempdir is not None: + logger.debug( + "Deleting temporary directory at " + f"{self._root_tempdir.resolve()!s}" + ) + shutil.rmtree(self._root_tempdir, ignore_errors=True) + self._root_tempdir = None + + @property + def workdir(self) -> Path: + """Get working directory.""" + return self._workdir # type: ignore + + @workdir.setter + def workdir(self, path: Union[str, Path]) -> None: + """Set working directory. + + The directory path is created if it doesn't exist yet. + + Parameters + ---------- + path : str or pathlib.Path + The path to the working directory. + + """ + # Check if existing working directory is same or not; + # if not, then clean up + if self._workdir != Path(path): + self._cleanup() + # Set working directory + self._workdir = Path(path) + logger.debug( + f"Changing working directory to {self._workdir.resolve()!s}" + ) + # Create directory if not found + if not self._workdir.is_dir(): + logger.debug( + f"Creating working directory at {self._workdir.resolve()!s}" + ) + self._workdir.mkdir(parents=True) + + @property + def root_tempdir(self) -> Optional[Path]: + """Get root temporary directory.""" + return self._root_tempdir + + def get_tempdir( + self, prefix: Optional[str] = None, suffix: Optional[str] = None + ) -> Path: + """Get a temporary directory. + + Parameters + ---------- + prefix : str + The temporary directory prefix. If None, a default prefix is used. + suffix : str + The temporary directory suffix. If None, no suffix is added. + + Returns + ------- + pathlib.Path + The path to the temporary directory. + + """ + # Create root temporary directory if not created already + if self._root_tempdir is None: + logger.debug( + "Setting up temporary directory under " + f"{self._workdir.resolve()!s}" # type: ignore + ) + self._root_tempdir = Path(tempfile.mkdtemp(dir=self._workdir)) + + logger.debug( + f"Creating temporary directory at {self._root_tempdir.resolve()!s}" + ) + return Path( + tempfile.mkdtemp( + dir=self._root_tempdir, prefix=prefix, suffix=suffix + ) + ) + + def delete_tempdir(self, tempdir: Path) -> None: + """Delete a temporary directory. + + Parameters + ---------- + tempdir : pathlib.Path + The temporary directory path to be deleted. + + """ + logger.debug(f"Deleting temporary directory at {tempdir}") + shutil.rmtree(tempdir, ignore_errors=True)