diff --git a/src/ingest_validation_tests/fastq_validator_logic.py b/src/ingest_validation_tests/fastq_validator_logic.py index baf8e45..3c917db 100644 --- a/src/ingest_validation_tests/fastq_validator_logic.py +++ b/src/ingest_validation_tests/fastq_validator_logic.py @@ -2,32 +2,70 @@ import gzip import logging import re -from collections import defaultdict -from multiprocessing import Lock, Manager, Pool +from collections import defaultdict, namedtuple +from multiprocessing import Manager, Pool +from multiprocessing.managers import ListProxy +from os import cpu_count from pathlib import Path -from typing import Callable, List, Optional, TextIO +from typing import Callable, List, Optional, TextIO, Union import fastq_utils +from typing_extensions import Self + +filename_pattern = namedtuple("filename_pattern", ["prefix", "read_type", "set_num"]) def is_valid_filename(filename: str) -> bool: return bool(fastq_utils.FASTQ_PATTERN.fullmatch(filename)) +def get_prefix_read_type_and_set(filename: str) -> Optional[filename_pattern]: + """ + Looking for fastq filenames matching pattern: + __#_ + e.g. arbitrary_string_L001_R1_001.fastq + + Regex documentation: + PREFIX + - (?P | named capture group "prefix" + - .*(?:L\\d+) | match anything before and including pattern L# (where # represents 1 or more digits) + - only capture prefix if read_type and set_num found + READ_TYPE + - (?=_ | assert that the character "_" is present at the beginning of this next group + - (?:(?PR|read(?=[123])|I(?=[12]))) | only capture above match if followed by the + sequence _[R1,R2,R3,read1,read2,read3,I1,I2]; capture R/I/read as read_type, discarding digit + SET_NUM + - (?:\\d_ | ensure presence of "#_" before group, ignore characters + - (?P\\d+) | capture group set_num of 1 or more digits + Note: if set_num needs to be optional, could change this portion to (?:\\d_?(?P\\d+)|) + """ + if not bool(fastq_utils.FASTQ_PATTERN.fullmatch(filename)): + return + pattern = re.compile( + r"(?P.*(?:L\d+)(?=_(?PR|read(?=[123])|I(?=[12]))(?:\d_(?P\d+))))" + ) + groups = pattern.match(filename) + if groups and all(x in groups.groupdict().keys() for x in ["prefix", "read_type", "set_num"]): + return filename_pattern( + groups.group("prefix"), groups.group("read_type"), groups.group("set_num") + ) + + def _open_fastq_file(file: Path) -> TextIO: return gzip.open(file, "rt") if file.name.endswith(".gz") else file.open() -def _log(message: str) -> str: - print(message) - return message +def _log(message: str, verbose: bool = True) -> Optional[str]: + if verbose: + print(message) + return message class Engine(object): def __init__(self, validate_object): self.validate_object = validate_object - def __call__(self, fastq_file): + def __call__(self, fastq_file) -> list[Optional[str]]: errors = [] _log(f"Validating matching fastq file {fastq_file}") self.validate_object.validate_fastq_file(fastq_file) @@ -56,8 +94,9 @@ class FASTQValidatorLogic: def __init__(self, verbose=False): self.errors: List[Optional[str]] = [] self.files_were_found = False + self.file_list = Manager().list() self._file_record_counts = Manager().dict() - self._file_prefix_counts = Manager().dict() + self._ungrouped_files = Manager().list() self._filename = "" self._line_number = 0 @@ -73,8 +112,7 @@ def _format_error(self, error: str) -> str: message = f"{location}: {error}" - if self._verbose: - _log(message) + print(message) return message def _validate_fastq_line_1(self, line: str) -> List[str]: @@ -123,9 +161,9 @@ def _validate_fastq_line_4(self, line: str) -> List[str]: def validate_fastq_record(self, line: str, line_number: int) -> List[str]: line_index = line_number % 4 + 1 - validator_method: Callable[[FASTQValidatorLogic, str], List[str]] = ( - self._VALIDATE_FASTQ_LINE_METHODS[line_index] - ) + validator_method: Callable[[Self, str], List[str]] = self._VALIDATE_FASTQ_LINE_METHODS[ + line_index + ] assert validator_method, f"No validator method defined for record index {line_index}" @@ -179,97 +217,109 @@ def validate_fastq_file(self, fastq_file: Path) -> None: self._file_record_counts[str(fastq_file)] = records_read def validate_fastq_files_in_path(self, paths: List[Path], threads: int) -> None: - data_found_one = [] - dirs_and_files = defaultdict(dict) + """ + - Builds a list of filepaths; checks for duplicate filenames in upload. + - [parallel] Opens, validates, and gets line count of each file in list, and then + populates self._file_record_counts as {filepath: record_count}. + - If successful, groups files with matching prefix/read_type/set_num values. + - Compares record_counts across grouped files, logs any that don't match or are ungrouped. + """ for path in paths: - dirs_and_files[path] = fastq_utils.collect_fastq_files_by_directory(path) - _log( - f"FASTQValidatorLogic: Added files from {path} to dirs_and_files: {dirs_and_files}" - ) - file_list = [] + fastq_utils_output = fastq_utils.collect_fastq_files_by_directory(path) + for files in fastq_utils_output.values(): + self.file_list.extend(files) + _log(f"FASTQValidatorLogic: Added files from {path} to file_list: {files}") + self.files_were_found = bool(self.file_list) + data_found_one = [] with Manager() as manager: - # TODO: re-evaluate dicts/for loops lock = manager.Lock() - for path, rel_paths in dirs_and_files.items(): - for rel_path, files in rel_paths.items(): - for file in files: - file_list.append(Path(path / rel_path / file)) - if file_list: - self.files_were_found = True + pool = Pool(threads) try: logging.info( - f"Passing file list for paths {paths} to engine. File list: {file_list}." + f"Passing file list for paths {self._printable_filenames(paths, newlines=False)} to engine. File list:" ) - pool = Pool(threads) + logging.info(self._printable_filenames(self.file_list, newlines=True)) engine = Engine(self) - data_output = pool.imap_unordered(engine, file_list) + data_output = pool.imap_unordered(engine, self.file_list) + [data_found_one.extend(output) for output in data_output if output] except Exception as e: - _log(f"Error {e}") pool.close() pool.join() - data_found_one.append(f"Error {e}") + _log(f"Error {e}") + self.errors.append(f"Error {e}") else: pool.close() pool.join() - self._find_duplicates(dirs_and_files) - self._find_shared_prefixes(lock) - [data_found_one.extend(output) for output in data_output if output] - + self._find_duplicates() + groups = self._make_groups() + self._find_counts(groups, lock) + if self._ungrouped_files: + _log(f"Ungrouped files, counts not checked: {self._ungrouped_files}") if len(data_found_one) > 0: self.errors.extend(data_found_one) - def _find_duplicates(self, dirs_and_files): - # TODO: re-evaluate dicts/for loops - for data_path, sub_dirs in dirs_and_files.items(): - # Creates a dict of filenames to list of full filepaths for each - # fastq file in a given data_path (dataset dir). - files_per_path = defaultdict(list) - for sub_path, filepaths in sub_dirs.items(): - for filepath in filepaths: - files_per_path[filepath.name].append(data_path / sub_path) - for filename, filepaths in files_per_path.items(): - if len(filepaths) > 1: + def _make_groups(self) -> dict[filename_pattern, list[Path]]: + groups = defaultdict(list) + for file in self.file_list: + potential_match = get_prefix_read_type_and_set(file.name) + if potential_match: + groups[potential_match].append(file) + else: + self._ungrouped_files.append(file) + for group in groups.values(): + group.sort() + return groups + + def _find_duplicates(self): + """ + Transforms data from {path: [filepaths]} to {filepath.name: [paths]} + to ensure that each filename only appears once in an upload + """ + files_per_path = defaultdict(list) + for filepath in self.file_list: + files_per_path[filepath.name].append(filepath.parents[0]) + for filename, filepaths in files_per_path.items(): + if len(filepaths) > 1: + self.errors.append( + _log( + f"{filename} has been found multiple times during this validation. " + f"Locations of duplicates: {str(filepaths)}." + ) + ) + + def _find_counts(self, groups: dict[filename_pattern, list[Path]], lock): + with lock: + for pattern, paths in groups.items(): + if len(paths) == 1: + # This would happen if there was a file that matched the prefix_read_set pattern + # but did not have a counterpart for comparison; this probably should not happen but + # is currently only logged and does not throw an exception + self._ungrouped_files.append(paths[0]) + continue + comparison = {} + for path in paths: + comparison[str(path)] = self._file_record_counts.get(str(path)) + if not (len(set(comparison.values())) == 1): self.errors.append( - _log( - f"{filename} has been found multiple times during this validation. " - f"Locations of duplicates: {filepaths}." # noqa: E501 - ) + f"Counts do not match among files matching pattern {pattern.prefix}_{pattern.read_type}#_{pattern.set_num}: {comparison}" + ) + else: + _log( + f"PASSED: Record count comparison for files matching pattern {pattern.prefix}_{pattern.read_type}#_{pattern.set_num}: {comparison}" ) - def _find_shared_prefixes(self, lock): - # This pattern seeks out the string that includes the lane number (since - # that is expected to be present to help anchor the prefix) that comes - # before any of _I1, _I2, _R1, or _R2. - fastq_file_prefix_regex = re.compile(r"(.+_L\d+.*)_[IR][12][._]") - for fastq_file, records_read in self._file_record_counts.items(): - match = fastq_file_prefix_regex.match(Path(fastq_file).name) - with lock: - if match: - filename_prefix = match.group(1) - if filename_prefix in self._file_prefix_counts.keys(): - extant_count = self._file_prefix_counts[filename_prefix] - if extant_count != records_read: - # Find a file we've validated already that matches this - # prefix. - extant_files = [ - str(Path(filepath).name) - for filepath, record_count in self._file_record_counts.items() - if record_count == extant_count - and Path(filepath).name.startswith(filename_prefix) - ] - # Based on how the dictionaries are created, there should - # always be at least one matching filename. - assert extant_files - - self.errors.append( - _log( - f"{Path(fastq_file).name} ({records_read} lines) " - f"does not match length of {extant_files[0]} " - f"({extant_count} lines)." - ) - ) - else: - self._file_prefix_counts[filename_prefix] = records_read + def _printable_filenames( + self, files: Union[list, ListProxy, Path, str], newlines: bool = True + ): + if type(files) is list or type(files) is ListProxy: + file_list = [str(file) for file in files] + if newlines: + return "\n".join(file_list) + return file_list + elif type(files) is Path: + return str(files) + elif type(files) is str: + return files def main(): @@ -277,6 +327,7 @@ def main(): parser.add_argument( "filepaths", type=Path, nargs="+", help="Files to validate for FASTQ syntax" ) + parser.add_argument("coreuse", type=int, help="Number of cores to use") args = parser.parse_args() if isinstance(args.filepaths, List): @@ -289,7 +340,10 @@ def main(): raise Exception(f"Validator init received base_paths arg as type {type(args.filepaths)}") validator = FASTQValidatorLogic(True) - validator.validate_fastq_files_in_path(filepaths, Lock()) + if not (threads := args.coreuse): + cpus = cpu_count() + threads = cpus // 4 if cpus else 1 + validator.validate_fastq_files_in_path(filepaths, threads) if __name__ == "__main__": diff --git a/tests/test_fastq_validator_logic.py b/tests/test_fastq_validator_logic.py index 31bd68a..bc55920 100644 --- a/tests/test_fastq_validator_logic.py +++ b/tests/test_fastq_validator_logic.py @@ -1,10 +1,15 @@ import gzip -from pathlib import Path +from operator import attrgetter +from pathlib import Path, PosixPath from typing import TextIO import pytest -from src.ingest_validation_tests.fastq_validator_logic import FASTQValidatorLogic +from src.ingest_validation_tests.fastq_validator_logic import ( + FASTQValidatorLogic, + filename_pattern, + get_prefix_read_type_and_set, +) _GOOD_RECORDS = """\ @A12345:123:A12BCDEFG:1:1234:1000:1234 1:N:0:NACTGACTGA+CTGACTGACT @@ -197,8 +202,157 @@ def test_fastq_validator_record_counts_bad(self, fastq_validator, tmp_path): fastq_validator.validate_fastq_files_in_path([tmp_path], 2) - # Order of the files being processed is not guaranteed, however these - # strings ensure that a mismatch was found. - assert "(4 lines)" in fastq_validator.errors[0] - assert "does not match" in fastq_validator.errors[0] - assert "(8 lines)" in fastq_validator.errors[0] + # Non-matching records only stored in errors, need to do ugly string match + assert "Counts do not match" in fastq_validator.errors[0] + assert ( + "SREQ-1_1-ACTGACTGAC-TGACTGACTG_S1_L001_I1_001.fastq': 4" in fastq_validator.errors[0] + ) + assert ( + "SREQ-1_1-ACTGACTGAC-TGACTGACTG_S1_L001_I2_001.fastq': 8" in fastq_validator.errors[0] + ) + + def test_fastq_comparison_good(self, fastq_validator, tmp_path): + filenames = [ + "3252_ftL_RNA_T1_S31_L003_R1_001.fastq", + "3252_ftL_RNA_T1_S31_L003_R2_001.fastq", + "3252_ftL_RNA_T1_S31_L003_R1_002.fastq", + "3252_ftL_RNA_T1_S31_L003_R2_002.fastq", + ] + for filename in filenames: + new_file = tmp_path.joinpath(filename) + with _open_output_file(new_file, False) as output: + output.write(_GOOD_RECORDS) + + fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + + assert not fastq_validator.errors + + def test_fastq_comparison_bad_unequal_line_counts(self, fastq_validator, tmp_path): + good_file = "3252_ftL_RNA_T1_S31_L003_R1_001.fastq" + bad_file = "3252_ftL_RNA_T1_S31_L003_R2_001.fastq" + new_good_file = tmp_path.joinpath(good_file) + with _open_output_file(new_good_file, False) as output: + output.write(_GOOD_RECORDS) + new_bad_file = tmp_path.joinpath(bad_file) + with _open_output_file(new_bad_file, False) as output: + output.write("bad") + + fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + + assert fastq_validator.errors + + def test_fastq_groups_good(self, fastq_validator, tmp_path): + files = [ + "20147_Healthy_PA_S1_L001_R1_001.fastq", + "20147_Healthy_PA_S1_L001_R2_001.fastq", + "20147_Healthy_PA_S1_L001_R3_001.fastq", + "20147_Healthy_PA_S1_L001_R1_002.fastq", + "20147_Healthy_PA_S1_L001_R2_002.fastq", + ] + for file in files: + with _open_output_file(tmp_path.joinpath(file), False) as output: + output.write(_GOOD_RECORDS) + + fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + assert fastq_validator._make_groups() == { + filename_pattern(prefix="20147_Healthy_PA_S1_L001", read_type="R", set_num="001"): [ + PosixPath(tmp_path.joinpath("20147_Healthy_PA_S1_L001_R1_001.fastq")), + PosixPath(tmp_path.joinpath("20147_Healthy_PA_S1_L001_R2_001.fastq")), + PosixPath(tmp_path.joinpath("20147_Healthy_PA_S1_L001_R3_001.fastq")), + ], + filename_pattern(prefix="20147_Healthy_PA_S1_L001", read_type="R", set_num="002"): [ + PosixPath(tmp_path.joinpath("20147_Healthy_PA_S1_L001_R1_002.fastq")), + PosixPath(tmp_path.joinpath("20147_Healthy_PA_S1_L001_R2_002.fastq")), + ], + } + assert not fastq_validator.errors + + def test_fastq_groups_bad(self, fastq_validator, tmp_path): + good_files = [ + "20147_Healthy_PA_S1_L001_R1_001.fastq", + "20147_Healthy_PA_S1_L001_R2_001.fastq", + "20147_Healthy_PA_S1_L001_R1_002.fastq", + "test_not_grouped_but_valid", + ] + bad_files = [ + "20147_Healthy_PA_S1_L001_R3_001.fastq", + "20147_Healthy_PA_S1_L001_R2_002.fastq", + ] + for file in good_files: + with _open_output_file(tmp_path.joinpath(file), False) as output: + output.write(_GOOD_RECORDS) + for file in bad_files: + with _open_output_file(tmp_path.joinpath(file), False) as output: + output.write("@bad") + + fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + assert "Counts do not match" in fastq_validator.errors[0] + assert "20147_Healthy_PA_S1_L001_R2_002.fastq" in fastq_validator.errors[0] + assert "Counts do not match" in fastq_validator.errors[1] + assert "20147_Healthy_PA_S1_L001_R3_001.fastq" in fastq_validator.errors[1] + + def test_filename_valid_and_fastq_valid_but_not_grouped(self, fastq_validator, tmp_path): + # good_filenames[0:6] are valid in pipeline processing but would not be grouped for comparison + good_filenames = [ + "B001A001_1.fastq", # no lane, read_type, or set + "B001A001_R1.fq", # no lane or set + "B001A001_I1.fastq.gz", # no lane or set + "H4L1-4_S64_R1_001.fastq.gz", # no lane + "H4L1-4_S64_L001_001.fastq.gz", # no read_type + "H4L1-4_S64_L001_R1.fastq.gz", # no set + "L001_H4L1-4_S64_R1_001.fastq.gz", # out of order + "H4L1-4_S64_L001_R1_001.fastq.gz", + "H4L1-4_S64_L001_R2_001.fastq.gz", + "H4L1-4_S64_L001_I1_001.fastq.gz", + "Undetermined_S0_L001_R1_001.W105_Small_bowel_ileum.trimmed.fastq.gz", # annotated but otherwise fits pattern + ] + for file in good_filenames: + use_gzip = bool("gz" in file) + with _open_output_file(tmp_path.joinpath(file), use_gzip) as output: + output.write(_GOOD_RECORDS) + + fastq_validator.validate_fastq_files_in_path([tmp_path], 2) + # All files in good_filenames should be in file_list + assert { + PosixPath(tmp_path.joinpath(file)) in fastq_validator.file_list + for file in good_filenames + } == {True} + # No errors should be found in any of those files + assert not fastq_validator.errors + # Only some files from good_filenames will match criteria for grouping + valid_filename_patterns = [ + get_prefix_read_type_and_set(str(file)) + for file in fastq_validator.file_list + if get_prefix_read_type_and_set(str(file)) is not None + ] + assert sorted( + valid_filename_patterns, key=attrgetter("prefix", "read_type", "set_num") + ) == sorted( + [ + filename_pattern( + prefix=f"{tmp_path}/H4L1-4_S64_L001", read_type="R", set_num="001" + ), + filename_pattern( + prefix=f"{tmp_path}/H4L1-4_S64_L001", read_type="R", set_num="001" + ), + filename_pattern( + prefix=f"{tmp_path}/H4L1-4_S64_L001", read_type="I", set_num="001" + ), + filename_pattern( + prefix=f"{tmp_path}/Undetermined_S0_L001", read_type="R", set_num="001" + ), + ], + key=attrgetter("prefix", "read_type", "set_num"), + ) + assert ( + fastq_validator._ungrouped_files.sort() + == [ + "B001A001_1.fastq", # no lane, read_type, or set + "B001A001_R1.fq", # no lane or set + "B001A001_I1.fastq.gz", # no lane or set + "H4L1-4_S64_R1_001.fastq.gz", # no lane + "H4L1-4_S64_L001_001.fastq.gz", # no read_type + "H4L1-4_S64_L001_R1.fastq.gz", # no set + "L001_H4L1-4_S64_R1_001.fastq.gz", # out of order + ].sort() + )