Skip to content

Commit

Permalink
refactor test_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
JessyBarrette committed Jul 28, 2023
1 parent de24b77 commit 3709d9c
Showing 1 changed file with 94 additions and 82 deletions.
176 changes: 94 additions & 82 deletions tests/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,47 @@ def test_config_glob(self):


class TestBatchMode:
def test_batch_conversion_onset_parser_single_runner(self, tmp_path):
self._run_batch_processing(
1, tmp_path / "single", tmp_path / "single_registry.csv"
)
@staticmethod
def _get_config(input_path=None, cwd=None, **kwargs):
"""Generate a batch configuration file"""
config = {
**load_config(),
**kwargs,
"input_path": input_path or "tests/parsers_test_files/onset/**/*.csv",
}
if cwd:
config["registry"]["path"] = str(cwd / "registry.csv")
config["file_output"]["path"] = str(cwd / "output")
config["sentry"]["dsn"] = None
return config

@staticmethod
def _save_config(cwd, config):
config_path = cwd / "config.yaml"
with open(config_path, "w", encoding="UTF-8") as file:
yaml.dump(config, file)

def test_batch_conversion_onset_parser_multiprocessing_2_workers(self, tmp_path):
self._run_batch_processing(
2, tmp_path / "2_workers", tmp_path / "2_workers_registry.csv"
)
return config_path

def test_batch_conversion_onset_parser_multiprocessing_all_workers(self, tmp_path):
self._run_batch_processing(
True,
tmp_path / "multiprocessing_files",
tmp_path / "multi_registry.csv",
@staticmethod
def _run_batch_process(config):
registry = BatchConversion(config=config).run()
assert not registry.data.empty
assert not registry.data["error_message"].any()

@staticmethod
def _run_cli_batch_process(config_path):
runner = CliRunner()
result = runner.invoke(
cli_files,
[f"--config={config_path}"],
)
assert result.exit_code == 0, result

@pytest.mark.parametrize("multiprocessing", [1, 2, None])
def test_batch_conversion_multiprocessing(self, tmp_path, multiprocessing):
config = self._get_config(cwd=tmp_path, multiprocessing=multiprocessing)
self._run_batch_process(config)

def _run_batch_processing(self, multiprocessing, output_path, registry_path):
config = {
Expand All @@ -81,90 +106,69 @@ def _run_batch_processing(self, multiprocessing, output_path, registry_path):
assert not registry.data["error_message"].any()

def test_failed_cli_batch_conversion(self, tmp_path):
config = load_config()
test_file_path = str(tmp_path / "failed_cli_test_file.cnv")
registry_path = str(tmp_path / "failed_cli_registry.csv")
config_path = tmp_path / "failed_cli_config.yaml"
config = self._get_config(
cwd=tmp_path,
input_path=test_file_path,
parser="seabird.cnv",
overwrite=True,
multiprocessing=1,
errors="ignore",
)

with open(test_file_path, "w") as file_handle:
file_handle.write("test file")
config_path = self._save_config(tmp_path, config)

config["input_path"] = test_file_path
config["parser"] = "seabird.cnv"
config["errors"] = "ignore"
config["overwrite"] = True
config["multiprocessing"] = True
config["file_output"]["path"] = str(tmp_path / "failed_files/")
config["file_output"]["source"] = "{source}"
config["registry"]["path"] = registry_path
config["sentry"]["dsn"] = None

# Save config to yaml
with open(config_path, "w", encoding="utf-8") as file:
yaml.dump(config, file)
# Save temp bad data file
with open(test_file_path, "w", encoding="utf-8") as file_handle:
file_handle.write("test file")

runner = CliRunner()
result = runner.invoke(
cli_files,
[f"--config={config_path}"],
)
assert result.exit_code == 0, result
self._run_cli_batch_process(config_path)
# load registry
registry = FileConversionRegistry(path=registry_path)
registry = FileConversionRegistry(path=config["registry"]["path"])
assert not registry.data.empty
assert test_file_path in registry.data.index
assert "No columns to parse from file" in str(
registry.data["error_message"][test_file_path]
)

# Delete test files
Path(test_file_path).unlink()
Path(registry_path).unlink()

def test_failed_batch_conversion(self, tmp_path):
config = load_config()
test_file_path = str(tmp_path / "bad_test_file.cnv")
registry_path = str(tmp_path / "failed_registry.csv")
test_file_path = str(tmp_path / "failed_cli_test_file.cnv")
config = self._get_config(
cwd=tmp_path,
input_path=test_file_path,
parser="seabird.cnv",
overwrite=True,
multiprocessing=1,
errors="ignore",
)

with open(test_file_path, "w") as file_handle:
file_handle.write("test file")

config["input_path"] = test_file_path
config["parser"] = "seabird.cnv"
config["errors"] = "ignore"
config["overwrite"] = True
config["multiprocessing"] = True
config["file_output"]["path"] = str(tmp_path / "failed_files/")
config["file_output"]["source"] = "{source}"
config["registry"]["path"] = registry_path
config["sentry"]["dsn"] = None
registry = BatchConversion(config=config).run()
assert not registry.data.empty
assert test_file_path in registry.data.index
assert "No columns to parse from file" in str(
registry.data["error_message"][test_file_path]
)

def test_batch_cli_conversion_onset_parser(self):
def test_batch_cli_conversion_onset_parser(self, tmp_path):
config = self._get_config(cwd=tmp_path)
config_path = self._save_config(tmp_path, config)
runner = CliRunner()
result = runner.invoke(
cli_files,
["--config=tests/batch_test_configs/batch_convert_test_onset_csv.yaml"],
env={"LOGURU_LEVEL": "INFO"},
[f"--config={config_path}"],
)
assert result.exit_code == 0, result.output
assert (
"Run conversion" in result.output
or "Run parallel batch conversion" in result.output
)

def test_batch_cli_new_config_creation(self):
def test_batch_cli_new_config_creation(self, tmp_path):
runner = CliRunner()
new_config_test_file = Path("temp/test_config_copy.yaml")
if new_config_test_file.exists():
new_config_test_file.unlink()

assert not new_config_test_file.exists()
new_config_test_file = tmp_path / "test_config_copy.yaml"
result = runner.invoke(cli_files, ["--new_config", str(new_config_test_file)])
assert (
result.exit_code == 0
Expand All @@ -174,15 +178,6 @@ def test_batch_cli_new_config_creation(self):
assert not new_config_test_file.exists()


def get_test_file_registry():
test_file_registry = FileConversionRegistry(
path=Path("tests/test_file_registry.csv")
).load()
test_file = Path(test_file_registry.data.index[0])
test_file.touch()
return test_file_registry, test_file


test_ds = xr.Dataset()
test_ds.attrs["organization"] = "organization"
test_ds.attrs["instrument"] = "InstrumentName"
Expand All @@ -195,27 +190,42 @@ def get_test_file_registry():


class TestBatchGenerateName:
@staticmethod
def _get_test_dataset():
ds = xr.Dataset()
ds.attrs["organization"] = "organization"
ds.attrs["instrument"] = "InstrumentName"
ds.attrs["instrument_serial_number"] = "64651354"
ds.attrs["source"] = "source_file.csv"
ds["time"] = pd.to_datetime(
pd.Series(["2022-01-01T00:00:00Z", "2022-03-02T00:00:00Z"])
)
ds["time"].attrs["timezone"] = "UTC"
return ds

def test_generate_default_name(self):
name = generate_output_path(test_ds)
name = generate_output_path(self._get_test_dataset())
assert isinstance(name, Path)

def test_generate_output_from_source_attribute(self):
source_ds = test_ds.copy()
source_ds = self._get_test_dataset()
source_ds.attrs["source"] = "source_file.csv"
name = generate_output_path(source_ds)
assert isinstance(name, Path)
assert str(name) == "source_file.nc"

def test_generate_filename_with_path(self):
name = generate_output_path(
test_ds, source="{organization}_{instrument}_test", output_format=".nc"
self._get_test_dataset(),
source="{organization}_{instrument}_test",
output_format=".nc",
)
assert isinstance(name, Path)
assert str(name) == "organization_InstrumentName_test.nc"

def test_generate_filename_with_time(self):
name = generate_output_path(
test_ds,
self._get_test_dataset(),
source="{organization}_{instrument}_{time_min:%Y%m%d}-{time_max:%Y%m%d}",
output_format=".nc",
)
Expand All @@ -224,34 +234,36 @@ def test_generate_filename_with_time(self):

def test_generate_filename_with_variable_attribute(self):
name = generate_output_path(
test_ds,
self._get_test_dataset(),
source="{organization}_{instrument}_{variable_time_timezone}",
output_format=".nc",
)
assert isinstance(name, Path)
assert str(name) == "organization_InstrumentName_UTC.nc"

def test_generate_filename_with_missing_source(self):
fail_ds = test_ds.copy()
fail_ds = self._get_test_dataset()
fail_ds.attrs["source"] = None
with pytest.raises(Exception):
generate_output_path(fail_ds)

def test_generate_filename_with_prefix(self):
name = generate_output_path(test_ds, file_preffix="test_")
name = generate_output_path(self._get_test_dataset(), file_preffix="test_")
assert str(name) == "test_source_file.nc"

def test_generate_filename_with_suffix(self):
name = generate_output_path(test_ds, file_suffix="_test")
name = generate_output_path(self._get_test_dataset(), file_suffix="_test")
assert str(name) == "source_file_test.nc"

def test_generate_filename_with_prefix_and_suffix(self):
name = generate_output_path(test_ds, file_preffix="test_", file_suffix="_test")
name = generate_output_path(
self._get_test_dataset(), file_preffix="test_", file_suffix="_test"
)
assert str(name) == "test_source_file_test.nc"

def test_generate_filename_with_defaults(self):
name = generate_output_path(
test_ds,
self._get_test_dataset(),
source="test_{missing_global}",
defaults={"missing_global": "this-is-the-default"},
)
Expand Down

0 comments on commit 3709d9c

Please sign in to comment.