Skip to content

Commit

Permalink
Merge pull request #65 from hubmapconsortium/devel
Browse files Browse the repository at this point in the history
Dev -> Main
  • Loading branch information
jpuerto-psc authored Oct 25, 2024
2 parents 4a196e7 + 73d7aff commit 650a8ec
Show file tree
Hide file tree
Showing 2 changed files with 301 additions and 93 deletions.
226 changes: 140 additions & 86 deletions src/ingest_validation_tests/fastq_validator_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,70 @@
import gzip
import logging
import re
from collections import defaultdict
from multiprocessing import Lock, Manager, Pool
from collections import defaultdict, namedtuple
from multiprocessing import Manager, Pool
from multiprocessing.managers import ListProxy
from os import cpu_count
from pathlib import Path
from typing import Callable, List, Optional, TextIO
from typing import Callable, List, Optional, TextIO, Union

import fastq_utils
from typing_extensions import Self

filename_pattern = namedtuple("filename_pattern", ["prefix", "read_type", "set_num"])


def is_valid_filename(filename: str) -> bool:
return bool(fastq_utils.FASTQ_PATTERN.fullmatch(filename))


def get_prefix_read_type_and_set(filename: str) -> Optional[filename_pattern]:
"""
Looking for fastq filenames matching pattern:
<prefix>_<lane:L#+>_<read_type:I,R,read>#_<set_num:#+>
e.g. arbitrary_string_L001_R1_001.fastq
Regex documentation:
PREFIX
- (?P<prefix> | named capture group "prefix"
- .*(?:L\\d+) | match anything before and including pattern L# (where # represents 1 or more digits)
- only capture prefix if read_type and set_num found
READ_TYPE
- (?=_ | assert that the character "_" is present at the beginning of this next group
- (?:(?P<read_type>R|read(?=[123])|I(?=[12]))) | only capture above match if followed by the
sequence _[R1,R2,R3,read1,read2,read3,I1,I2]; capture R/I/read as read_type, discarding digit
SET_NUM
- (?:\\d_ | ensure presence of "#_" before group, ignore characters
- (?P<set_num>\\d+) | capture group set_num of 1 or more digits
Note: if set_num needs to be optional, could change this portion to (?:\\d_?(?P<set_num>\\d+)|)
"""
if not bool(fastq_utils.FASTQ_PATTERN.fullmatch(filename)):
return
pattern = re.compile(
r"(?P<prefix>.*(?:L\d+)(?=_(?P<read_type>R|read(?=[123])|I(?=[12]))(?:\d_(?P<set_num>\d+))))"
)
groups = pattern.match(filename)
if groups and all(x in groups.groupdict().keys() for x in ["prefix", "read_type", "set_num"]):
return filename_pattern(
groups.group("prefix"), groups.group("read_type"), groups.group("set_num")
)


def _open_fastq_file(file: Path) -> TextIO:
return gzip.open(file, "rt") if file.name.endswith(".gz") else file.open()


def _log(message: str) -> str:
print(message)
return message
def _log(message: str, verbose: bool = True) -> Optional[str]:
if verbose:
print(message)
return message


class Engine(object):
def __init__(self, validate_object):
self.validate_object = validate_object

def __call__(self, fastq_file):
def __call__(self, fastq_file) -> list[Optional[str]]:
errors = []
_log(f"Validating matching fastq file {fastq_file}")
self.validate_object.validate_fastq_file(fastq_file)
Expand Down Expand Up @@ -56,8 +94,9 @@ class FASTQValidatorLogic:
def __init__(self, verbose=False):
self.errors: List[Optional[str]] = []
self.files_were_found = False
self.file_list = Manager().list()
self._file_record_counts = Manager().dict()
self._file_prefix_counts = Manager().dict()
self._ungrouped_files = Manager().list()
self._filename = ""
self._line_number = 0

Expand All @@ -73,8 +112,7 @@ def _format_error(self, error: str) -> str:

message = f"{location}: {error}"

if self._verbose:
_log(message)
print(message)
return message

def _validate_fastq_line_1(self, line: str) -> List[str]:
Expand Down Expand Up @@ -123,9 +161,9 @@ def _validate_fastq_line_4(self, line: str) -> List[str]:
def validate_fastq_record(self, line: str, line_number: int) -> List[str]:
line_index = line_number % 4 + 1

validator_method: Callable[[FASTQValidatorLogic, str], List[str]] = (
self._VALIDATE_FASTQ_LINE_METHODS[line_index]
)
validator_method: Callable[[Self, str], List[str]] = self._VALIDATE_FASTQ_LINE_METHODS[
line_index
]

assert validator_method, f"No validator method defined for record index {line_index}"

Expand Down Expand Up @@ -179,104 +217,117 @@ def validate_fastq_file(self, fastq_file: Path) -> None:
self._file_record_counts[str(fastq_file)] = records_read

def validate_fastq_files_in_path(self, paths: List[Path], threads: int) -> None:
data_found_one = []
dirs_and_files = defaultdict(dict)
"""
- Builds a list of filepaths; checks for duplicate filenames in upload.
- [parallel] Opens, validates, and gets line count of each file in list, and then
populates self._file_record_counts as {filepath: record_count}.
- If successful, groups files with matching prefix/read_type/set_num values.
- Compares record_counts across grouped files, logs any that don't match or are ungrouped.
"""
for path in paths:
dirs_and_files[path] = fastq_utils.collect_fastq_files_by_directory(path)
_log(
f"FASTQValidatorLogic: Added files from {path} to dirs_and_files: {dirs_and_files}"
)
file_list = []
fastq_utils_output = fastq_utils.collect_fastq_files_by_directory(path)
for files in fastq_utils_output.values():
self.file_list.extend(files)
_log(f"FASTQValidatorLogic: Added files from {path} to file_list: {files}")
self.files_were_found = bool(self.file_list)
data_found_one = []
with Manager() as manager:
# TODO: re-evaluate dicts/for loops
lock = manager.Lock()
for path, rel_paths in dirs_and_files.items():
for rel_path, files in rel_paths.items():
for file in files:
file_list.append(Path(path / rel_path / file))
if file_list:
self.files_were_found = True
pool = Pool(threads)
try:
logging.info(
f"Passing file list for paths {paths} to engine. File list: {file_list}."
f"Passing file list for paths {self._printable_filenames(paths, newlines=False)} to engine. File list:"
)
pool = Pool(threads)
logging.info(self._printable_filenames(self.file_list, newlines=True))
engine = Engine(self)
data_output = pool.imap_unordered(engine, file_list)
data_output = pool.imap_unordered(engine, self.file_list)
[data_found_one.extend(output) for output in data_output if output]
except Exception as e:
_log(f"Error {e}")
pool.close()
pool.join()
data_found_one.append(f"Error {e}")
_log(f"Error {e}")
self.errors.append(f"Error {e}")
else:
pool.close()
pool.join()
self._find_duplicates(dirs_and_files)
self._find_shared_prefixes(lock)
[data_found_one.extend(output) for output in data_output if output]

self._find_duplicates()
groups = self._make_groups()
self._find_counts(groups, lock)
if self._ungrouped_files:
_log(f"Ungrouped files, counts not checked: {self._ungrouped_files}")
if len(data_found_one) > 0:
self.errors.extend(data_found_one)

def _find_duplicates(self, dirs_and_files):
# TODO: re-evaluate dicts/for loops
for data_path, sub_dirs in dirs_and_files.items():
# Creates a dict of filenames to list of full filepaths for each
# fastq file in a given data_path (dataset dir).
files_per_path = defaultdict(list)
for sub_path, filepaths in sub_dirs.items():
for filepath in filepaths:
files_per_path[filepath.name].append(data_path / sub_path)
for filename, filepaths in files_per_path.items():
if len(filepaths) > 1:
def _make_groups(self) -> dict[filename_pattern, list[Path]]:
groups = defaultdict(list)
for file in self.file_list:
potential_match = get_prefix_read_type_and_set(file.name)
if potential_match:
groups[potential_match].append(file)
else:
self._ungrouped_files.append(file)
for group in groups.values():
group.sort()
return groups

def _find_duplicates(self):
"""
Transforms data from {path: [filepaths]} to {filepath.name: [paths]}
to ensure that each filename only appears once in an upload
"""
files_per_path = defaultdict(list)
for filepath in self.file_list:
files_per_path[filepath.name].append(filepath.parents[0])
for filename, filepaths in files_per_path.items():
if len(filepaths) > 1:
self.errors.append(
_log(
f"{filename} has been found multiple times during this validation. "
f"Locations of duplicates: {str(filepaths)}."
)
)

def _find_counts(self, groups: dict[filename_pattern, list[Path]], lock):
with lock:
for pattern, paths in groups.items():
if len(paths) == 1:
# This would happen if there was a file that matched the prefix_read_set pattern
# but did not have a counterpart for comparison; this probably should not happen but
# is currently only logged and does not throw an exception
self._ungrouped_files.append(paths[0])
continue
comparison = {}
for path in paths:
comparison[str(path)] = self._file_record_counts.get(str(path))
if not (len(set(comparison.values())) == 1):
self.errors.append(
_log(
f"{filename} has been found multiple times during this validation. "
f"Locations of duplicates: {filepaths}." # noqa: E501
)
f"Counts do not match among files matching pattern {pattern.prefix}_{pattern.read_type}#_{pattern.set_num}: {comparison}"
)
else:
_log(
f"PASSED: Record count comparison for files matching pattern {pattern.prefix}_{pattern.read_type}#_{pattern.set_num}: {comparison}"
)

def _find_shared_prefixes(self, lock):
# This pattern seeks out the string that includes the lane number (since
# that is expected to be present to help anchor the prefix) that comes
# before any of _I1, _I2, _R1, or _R2.
fastq_file_prefix_regex = re.compile(r"(.+_L\d+.*)_[IR][12][._]")
for fastq_file, records_read in self._file_record_counts.items():
match = fastq_file_prefix_regex.match(Path(fastq_file).name)
with lock:
if match:
filename_prefix = match.group(1)
if filename_prefix in self._file_prefix_counts.keys():
extant_count = self._file_prefix_counts[filename_prefix]
if extant_count != records_read:
# Find a file we've validated already that matches this
# prefix.
extant_files = [
str(Path(filepath).name)
for filepath, record_count in self._file_record_counts.items()
if record_count == extant_count
and Path(filepath).name.startswith(filename_prefix)
]
# Based on how the dictionaries are created, there should
# always be at least one matching filename.
assert extant_files

self.errors.append(
_log(
f"{Path(fastq_file).name} ({records_read} lines) "
f"does not match length of {extant_files[0]} "
f"({extant_count} lines)."
)
)
else:
self._file_prefix_counts[filename_prefix] = records_read
def _printable_filenames(
self, files: Union[list, ListProxy, Path, str], newlines: bool = True
):
if type(files) is list or type(files) is ListProxy:
file_list = [str(file) for file in files]
if newlines:
return "\n".join(file_list)
return file_list
elif type(files) is Path:
return str(files)
elif type(files) is str:
return files


def main():
parser = argparse.ArgumentParser(description="Validate FASTQ files.")
parser.add_argument(
"filepaths", type=Path, nargs="+", help="Files to validate for FASTQ syntax"
)
parser.add_argument("coreuse", type=int, help="Number of cores to use")

args = parser.parse_args()
if isinstance(args.filepaths, List):
Expand All @@ -289,7 +340,10 @@ def main():
raise Exception(f"Validator init received base_paths arg as type {type(args.filepaths)}")

validator = FASTQValidatorLogic(True)
validator.validate_fastq_files_in_path(filepaths, Lock())
if not (threads := args.coreuse):
cpus = cpu_count()
threads = cpus // 4 if cpus else 1
validator.validate_fastq_files_in_path(filepaths, threads)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 650a8ec

Please sign in to comment.