Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write_dir argument to csv_to_wfdb. Fixes #67. #492

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 72 additions & 3 deletions tests/io/test_convert.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import os
import shutil
import unittest

import numpy as np

from wfdb.io.record import rdrecord
from wfdb.io.convert.edf import read_edf
from wfdb.io.convert.csv import csv_to_wfdb


class TestEdfToWfdb:
"""
Tests for the io.convert.edf module.
"""

class TestConvert:
def test_edf_uniform(self):
"""
EDF format conversion to MIT for uniform sample rates.

"""
# Uniform sample rates
record_MIT = rdrecord("sample-data/n16").__dict__
Expand Down Expand Up @@ -60,7 +68,6 @@ def test_edf_uniform(self):
def test_edf_non_uniform(self):
"""
EDF format conversion to MIT for non-uniform sample rates.

"""
# Non-uniform sample rates
record_MIT = rdrecord("sample-data/wave_4").__dict__
Expand Down Expand Up @@ -108,3 +115,65 @@ def test_edf_non_uniform(self):

target_results = len(fields) * [True]
assert np.array_equal(test_results, target_results)


class TestCsvToWfdb(unittest.TestCase):
"""
Tests for the io.convert.csv module.
"""

def setUp(self):
"""
Create a temporary directory containing data for testing.

Load 100.dat file for comparison to 100.csv file.
"""
self.test_dir = "test_output"
os.makedirs(self.test_dir, exist_ok=True)

self.record_100_csv = "sample-data/100.csv"
self.record_100_dat = rdrecord("sample-data/100", physical=True)

def tearDown(self):
"""
Remove the temporary directory after the test.
"""
if os.path.exists(self.test_dir):
shutil.rmtree(self.test_dir)

def test_write_dir(self):
"""
Call the function with the write_dir argument.
"""
csv_to_wfdb(
file_name=self.record_100_csv,
fs=360,
units="mV",
write_dir=self.test_dir,
)

# Check if the output files are created in the specified directory
base_name = os.path.splitext(os.path.basename(self.record_100_csv))[0]
expected_dat_file = os.path.join(self.test_dir, f"{base_name}.dat")
expected_hea_file = os.path.join(self.test_dir, f"{base_name}.hea")

self.assertTrue(os.path.exists(expected_dat_file))
self.assertTrue(os.path.exists(expected_hea_file))

# Check that newly written file matches the 100.dat file
record_write = rdrecord(os.path.join(self.test_dir, base_name))

self.assertEqual(record_write.fs, 360)
self.assertEqual(record_write.fs, self.record_100_dat.fs)
self.assertEqual(record_write.units, ["mV", "mV"])
self.assertEqual(record_write.units, self.record_100_dat.units)
self.assertEqual(record_write.sig_name, ["MLII", "V5"])
self.assertEqual(record_write.sig_name, self.record_100_dat.sig_name)
self.assertEqual(record_write.p_signal.size, 1300000)
self.assertEqual(
record_write.p_signal.size, self.record_100_dat.p_signal.size
)


if __name__ == "__main__":
unittest.main()
20 changes: 12 additions & 8 deletions wfdb/io/convert/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def csv_to_wfdb(
header=True,
delimiter=",",
verbose=False,
write_dir="",
):
"""
Read a WFDB header file and return either a `Record` object with the
Expand Down Expand Up @@ -235,6 +236,10 @@ def csv_to_wfdb(
verbose : bool, optional
Whether to print all the information read about the file (True) or
not (False).
write_dir : str, optional
The directory where the output files will be saved. If write_dir is not
provided, the output files will be saved in the same directory as the
input file.

Returns
-------
Expand Down Expand Up @@ -291,6 +296,7 @@ def csv_to_wfdb(
df_CSV = pd.read_csv(file_name, delimiter=delimiter, header=None)
if verbose:
print("Successfully read CSV")

# Extract the entire signal from the dataframe
p_signal = df_CSV.values
# The dataframe should be in (`sig_len`, `n_sig`) dimensions
Expand All @@ -300,10 +306,11 @@ def csv_to_wfdb(
n_sig = p_signal.shape[1]
if verbose:
print("Number of signals: {}".format(n_sig))

# Check if signal names are valid and set defaults
if not sig_name:
if header:
sig_name = df_CSV.columns.to_list()
sig_name = df_CSV.columns.tolist()
if any(map(str.isdigit, sig_name)):
print(
"WARNING: One or more of your signal names are numbers, this "
Expand All @@ -318,15 +325,12 @@ def csv_to_wfdb(
if verbose:
print("Signal names: {}".format(sig_name))

# Set the output header file name to be the same, remove path
if os.sep in file_name:
file_name = file_name.split(os.sep)[-1]
record_name = file_name.replace(".csv", "")
record_name = os.path.splitext(os.path.basename(file_name))[0]
if verbose:
print("Output header: {}.hea".format(record_name))
print("Record name: {}.hea".format(record_name))

# Replace the CSV file tag with DAT
dat_file_name = file_name.replace(".csv", ".dat")
dat_file_name = record_name + ".dat"
dat_file_name = [dat_file_name] * n_sig
if verbose:
print("Output record: {}".format(dat_file_name[0]))
Expand Down Expand Up @@ -450,7 +454,6 @@ def csv_to_wfdb(
if verbose:
print("Record generated successfully")
return record

else:
# Write the information to a record and header file
wrsamp(
Expand All @@ -465,6 +468,7 @@ def csv_to_wfdb(
comments=comments,
base_time=base_time,
base_date=base_date,
write_dir=write_dir,
)
if verbose:
print("File generated successfully")
Expand Down
Loading