From f874b1cef8d2922d26dd93de136d690d5a5e9d29 Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Tue, 9 Jul 2024 16:45:34 -0400 Subject: [PATCH 1/4] Add write_dir argument to csv_to_wfdb(). Ref #490. --- wfdb/io/convert/csv.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/wfdb/io/convert/csv.py b/wfdb/io/convert/csv.py index 3cfd25a2..7288bb30 100644 --- a/wfdb/io/convert/csv.py +++ b/wfdb/io/convert/csv.py @@ -33,6 +33,7 @@ def csv_to_wfdb( header=True, delimiter=",", verbose=False, + write_dir="", ): """ Read a WFDB header file and return either a `Record` object with the @@ -235,6 +236,10 @@ def csv_to_wfdb( verbose : bool, optional Whether to print all the information read about the file (True) or not (False). + write_dir : str, optional + The directory where the output files will be saved. If write_dir is not + provided, the output files will be saved in the same directory as the + input file. Returns ------- @@ -291,6 +296,7 @@ def csv_to_wfdb( df_CSV = pd.read_csv(file_name, delimiter=delimiter, header=None) if verbose: print("Successfully read CSV") + # Extract the entire signal from the dataframe p_signal = df_CSV.values # The dataframe should be in (`sig_len`, `n_sig`) dimensions @@ -300,6 +306,7 @@ def csv_to_wfdb( n_sig = p_signal.shape[1] if verbose: print("Number of signals: {}".format(n_sig)) + # Check if signal names are valid and set defaults if not sig_name: if header: @@ -318,15 +325,12 @@ def csv_to_wfdb( if verbose: print("Signal names: {}".format(sig_name)) - # Set the output header file name to be the same, remove path - if os.sep in file_name: - file_name = file_name.split(os.sep)[-1] - record_name = file_name.replace(".csv", "") + record_name = os.path.splitext(os.path.basename(file_name))[0] if verbose: - print("Output header: {}.hea".format(record_name)) + print("Record name: {}.hea".format(record_name)) # Replace the CSV file tag with DAT - dat_file_name = file_name.replace(".csv", ".dat") + dat_file_name = record_name + ".dat" dat_file_name = [dat_file_name] * n_sig if verbose: print("Output record: {}".format(dat_file_name[0])) @@ -450,7 +454,6 @@ def csv_to_wfdb( if verbose: print("Record generated successfully") return record - else: # Write the information to a record and header file wrsamp( @@ -465,6 +468,7 @@ def csv_to_wfdb( comments=comments, base_time=base_time, base_date=base_date, + write_dir=write_dir, ) if verbose: print("File generated successfully") From e6b3b695f27b7a995d65106d80490e11d6c72154 Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Tue, 9 Jul 2024 16:45:48 -0400 Subject: [PATCH 2/4] Add test for csv_to_wfdb(). --- tests/io/test_convert.py | 75 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 3 deletions(-) diff --git a/tests/io/test_convert.py b/tests/io/test_convert.py index aa7ba78a..cf97f700 100644 --- a/tests/io/test_convert.py +++ b/tests/io/test_convert.py @@ -1,14 +1,22 @@ +import os +import shutil +import unittest + import numpy as np from wfdb.io.record import rdrecord from wfdb.io.convert.edf import read_edf +from wfdb.io.convert.csv import csv_to_wfdb + +class TestEdfToWfdb: + """ + Tests for the io.convert.edf module. + """ -class TestConvert: def test_edf_uniform(self): """ EDF format conversion to MIT for uniform sample rates. - """ # Uniform sample rates record_MIT = rdrecord("sample-data/n16").__dict__ @@ -60,7 +68,6 @@ def test_edf_uniform(self): def test_edf_non_uniform(self): """ EDF format conversion to MIT for non-uniform sample rates. - """ # Non-uniform sample rates record_MIT = rdrecord("sample-data/wave_4").__dict__ @@ -108,3 +115,65 @@ def test_edf_non_uniform(self): target_results = len(fields) * [True] assert np.array_equal(test_results, target_results) + + +class TestCsvToWfdb(unittest.TestCase): + """ + Tests for the io.convert.csv module. + """ + + def setUp(self): + """ + Create a temporary directory containing data for testing. + + Load 100.dat file for comparison to 100.csv file. + """ + self.test_dir = "test_output" + os.makedirs(self.test_dir, exist_ok=True) + + self.record_100_csv = "sample-data/100.csv" + self.record_100_dat = rdrecord("sample-data/100", physical=True) + + def tearDown(self): + """ + Remove the temporary directory after the test. + """ + if os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + + def test_write_dir(self): + """ + Call the function with the write_dir argument. + """ + csv_to_wfdb( + file_name=self.record_100_csv, + fs=360, + units="mV", + write_dir=self.test_dir, + ) + + # Check if the output files are created in the specified directory + base_name = os.path.splitext(os.path.basename(self.record_100_csv))[0] + expected_dat_file = os.path.join(self.test_dir, f"{base_name}.dat") + expected_hea_file = os.path.join(self.test_dir, f"{base_name}.hea") + + self.assertTrue(os.path.exists(expected_dat_file)) + self.assertTrue(os.path.exists(expected_hea_file)) + + # Check that newly written file matches the 100.dat file + record_write = rdrecord(os.path.join(self.test_dir, base_name)) + + self.assertEqual(record_write.fs, 360) + self.assertEqual(record_write.fs, self.record_100_dat.fs) + self.assertEqual(record_write.units, ["mV", "mV"]) + self.assertEqual(record_write.units, self.record_100_dat.units) + self.assertEqual(record_write.sig_name, ["MLII", "V5"]) + self.assertEqual(record_write.sig_name, self.record_100_dat.sig_name) + self.assertEqual(record_write.p_signal.size, 1300000) + self.assertEqual( + record_write.p_signal.size, self.record_100_dat.p_signal.size + ) + + +if __name__ == "__main__": + unittest.main() From feb6b0c99ac390ec301309b43d6753a9a5e764f8 Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Tue, 9 Jul 2024 17:06:46 -0400 Subject: [PATCH 3/4] The to_list() method was introduced in Pandas v0.24.0. Catch error for earlier versions. Tests are failing on the test-deb10-i386 build because it is running an old version of Pandas. --- wfdb/io/convert/csv.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/wfdb/io/convert/csv.py b/wfdb/io/convert/csv.py index 7288bb30..385ede50 100644 --- a/wfdb/io/convert/csv.py +++ b/wfdb/io/convert/csv.py @@ -310,7 +310,12 @@ def csv_to_wfdb( # Check if signal names are valid and set defaults if not sig_name: if header: - sig_name = df_CSV.columns.to_list() + try: + sig_name = df_CSV.columns.to_list() + except AttributeError: + # to_list() was introduced in Pandas v0.24.0 + # https://pandas.pydata.org/pandas-docs/stable/whatsnew/v0.24.0.html#other-api-changes + sig_name = df_CSV.columns.tolist() if any(map(str.isdigit, sig_name)): print( "WARNING: One or more of your signal names are numbers, this " From f3d633db09da3f57487daa5336a120c83fc7d55a Mon Sep 17 00:00:00 2001 From: Tom Pollard Date: Thu, 11 Jul 2024 13:54:10 -0400 Subject: [PATCH 4/4] Use df.tolist(), not df.to_list(). --- wfdb/io/convert/csv.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/wfdb/io/convert/csv.py b/wfdb/io/convert/csv.py index 385ede50..4817a0e5 100644 --- a/wfdb/io/convert/csv.py +++ b/wfdb/io/convert/csv.py @@ -310,12 +310,7 @@ def csv_to_wfdb( # Check if signal names are valid and set defaults if not sig_name: if header: - try: - sig_name = df_CSV.columns.to_list() - except AttributeError: - # to_list() was introduced in Pandas v0.24.0 - # https://pandas.pydata.org/pandas-docs/stable/whatsnew/v0.24.0.html#other-api-changes - sig_name = df_CSV.columns.tolist() + sig_name = df_CSV.columns.tolist() if any(map(str.isdigit, sig_name)): print( "WARNING: One or more of your signal names are numbers, this "