Skip to content

Commit

Permalink
Add the ability to pass sig and hea streams to rdrecord, rdheader and…
Browse files Browse the repository at this point in the history
… rdann, in case the file is not read from disk.
  • Loading branch information
Ivorforce committed Jul 1, 2024
1 parent 34b989e commit 0378030
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 142 deletions.
273 changes: 152 additions & 121 deletions wfdb/io/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ def _rd_segment(
ignore_skew,
no_file=False,
sig_data=None,
sig_stream=None,
return_res=64,
):
"""
Expand Down Expand Up @@ -1211,6 +1212,7 @@ def _rd_segment(
sampto=sampto,
no_file=no_file,
sig_data=sig_data,
sig_stream=sig_stream,
)

# Copy over the wanted signals
Expand All @@ -1235,6 +1237,7 @@ def _rd_dat_signals(
sampto,
no_file=False,
sig_data=None,
sig_stream=None,
):
"""
Read all signals from a WFDB dat file.
Expand Down Expand Up @@ -1324,20 +1327,31 @@ def _rd_dat_signals(
if no_file:
data_to_read = sig_data
elif fmt in COMPRESSED_FMTS:
data_to_read = _rd_compressed_file(
file_name=file_name,
dir_name=dir_name,
pn_dir=pn_dir,
fmt=fmt,
sample_offset=byte_offset,
n_sig=n_sig,
samps_per_frame=samps_per_frame,
start_frame=sampfrom,
end_frame=sampto,
)
if sig_stream is not None:
data_to_read = _rd_compressed_stream(
fp=sig_stream,
fmt=fmt,
sample_offset=byte_offset,
n_sig=n_sig,
samps_per_frame=samps_per_frame,
start_frame=sampfrom,
end_frame=sampto,
)
else:
data_to_read = _rd_compressed_file(
file_name=file_name,
dir_name=dir_name,
pn_dir=pn_dir,
fmt=fmt,
sample_offset=byte_offset,
n_sig=n_sig,
samps_per_frame=samps_per_frame,
start_frame=sampfrom,
end_frame=sampto,
)
else:
data_to_read = _rd_dat_file(
file_name, dir_name, pn_dir, fmt, start_byte, n_read_samples
file_name, dir_name, pn_dir, fmt, start_byte, n_read_samples, sig_stream
)

if extra_flat_samples:
Expand Down Expand Up @@ -1577,7 +1591,7 @@ def _required_byte_num(mode, fmt, n_samp):
return int(n_bytes)


def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp):
def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp, sig_stream):
"""
Read data from a dat file, either local or remote, into a 1d numpy
array.
Expand Down Expand Up @@ -1635,8 +1649,14 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp):
element_count = n_samp
byte_count = n_samp * BYTES_PER_SAMPLE[fmt]

# Memory Stream
if sig_stream is not None:
sig_stream.seek(start_byte)
sig_data = np.frombuffer(
sig_stream.read(), dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count
)
# Local dat file
if pn_dir is None:
elif pn_dir is None:
with open(os.path.join(dir_name, file_name), "rb") as fp:
fp.seek(start_byte)
sig_data = np.fromfile(
Expand All @@ -1651,7 +1671,6 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp):

return sig_data


def _blocks_to_samples(sig_data, n_samp, fmt):
"""
Convert uint8 blocks into signal samples for unaligned dat formats.
Expand Down Expand Up @@ -1770,6 +1789,123 @@ def _blocks_to_samples(sig_data, n_samp, fmt):
return sig


def _rd_compressed_stream(
fp,
fmt,
sample_offset,
n_sig,
samps_per_frame,
start_frame,
end_frame,
):
signature = fp.read(4)
if signature != b"fLaC":
raise ValueError(f"{fp.name} is not a FLAC file")
fp.seek(0)

with soundfile.SoundFile(fp) as sf:
# Determine the actual resolution of the FLAC stream and the
# data type will use when reading it. Note that soundfile
# doesn't support int8.
if sf.subtype == "PCM_S8":
format_bits = 8
read_dtype = "int16"
elif sf.subtype == "PCM_16":
format_bits = 16
read_dtype = "int16"
elif sf.subtype == "PCM_24":
format_bits = 24
read_dtype = "int32"
else:
raise ValueError(f"unknown subtype in {fp.name} ({sf.subtype})")

max_bits = int(fmt) - 500
if format_bits > max_bits:
raise ValueError(
f"wrong resolution in {fp.name} "
f"({format_bits}, expected <= {max_bits})"
)

if sf.channels != n_sig:
raise ValueError(
f"wrong number of channels in {fp.name} "
f"({sf.channels}, expected {n_sig})"
)

# Read the samples.
start_samp = start_frame * samps_per_frame[0]
end_samp = end_frame * samps_per_frame[0]
sf.seek(start_samp + sample_offset)

# We could do this:
# sig_data = sf.read(end_samp - start_samp, dtype=read_dtype)
# However, sf.read fails for huge blocks (over 2**24 total
# samples) due to a bug in libsndfile:
# https://github.com/libsndfile/libsndfile/issues/431
# So read the data in chunks instead.
n_samp = end_samp - start_samp
sig_data = np.empty((n_samp, n_sig), dtype=read_dtype)
CHUNK_SIZE = 1024 * 1024
for chunk_start in range(0, n_samp, CHUNK_SIZE):
chunk_end = chunk_start + CHUNK_SIZE
chunk_data = sf.read(out=sig_data[chunk_start:chunk_end])
samples_read = chunk_data.shape[0]
if samples_read != CHUNK_SIZE:
sig_data = sig_data[: chunk_start + samples_read]
break

# If we read an 8-bit stream as int16 or a 24-bit stream as
# int32, soundfile shifts each sample left by 8 bits. We
# want to undo this shift (and, in the case of 8-bit data,
# convert to an int8 array.)
if format_bits == 8:
# np.right_shift(sig_data, 8, dtype='int8') doesn't work.
# This seems wrong, but the numpy documentation is unclear.
sig_data2 = np.empty(sig_data.shape, dtype="int8")
sig_data = np.right_shift(sig_data, 8, out=sig_data2)
elif format_bits == 24:
# Shift 32-bit array in-place.
np.right_shift(sig_data, 8, out=sig_data)

# Suppose we have 3 channels and 2 samples per frame. The array
# returned by sf.read looks like this:
#
# channel 0 channel 1 channel 2
# time 0 [0,0] [0,1] [0,2]
# time 1 [1,0] [1,1] [1,2]
# time 2 [2,0] [2,1] [2,2]
# time 3 [3,0] [3,1] [3,2]
#
# We reshape this first into the following:
#
# channel 0 channel 1 channel 2
# time 0 [0,0,0] [0,0,1] [0,0,2]
# time 1 [0,1,0] [0,1,1] [0,1,2]
# time 2 [1,0,0] [1,0,1] [1,0,2]
# time 3 [1,1,0] [1,1,1] [1,1,2]
#
# Then we transpose axes 1 and 2:
#
# channel 0 channel 1 channel 2
# time 0 [0,0,0] [0,1,0] [0,2,0]
# time 1 [0,0,1] [0,1,1] [0,2,1]
# time 2 [1,0,0] [1,1,0] [1,2,0]
# time 3 [1,0,1] [1,1,1] [1,2,1]
#
# Then when we reshape the array to 1D, the result is in dat file
# order:
#
# channel 0 channel 1 channel 2
# time 0 [0] [2] [4]
# time 1 [1] [3] [5]
# time 2 [6] [8] [10]
# time 3 [7] [9] [11]

sig_data = sig_data.reshape(-1, samps_per_frame[0], n_sig)
sig_data = sig_data.transpose(0, 2, 1)
return sig_data.reshape(-1)


def _rd_compressed_file(
file_name,
dir_name,
Expand Down Expand Up @@ -1834,112 +1970,7 @@ def _rd_compressed_file(
file_name = os.path.join(dir_name, file_name)

with _coreio._open_file(pn_dir, file_name, "rb") as fp:
signature = fp.read(4)
if signature != b"fLaC":
raise ValueError(f"{fp.name} is not a FLAC file")
fp.seek(0)

with soundfile.SoundFile(fp) as sf:
# Determine the actual resolution of the FLAC stream and the
# data type will use when reading it. Note that soundfile
# doesn't support int8.
if sf.subtype == "PCM_S8":
format_bits = 8
read_dtype = "int16"
elif sf.subtype == "PCM_16":
format_bits = 16
read_dtype = "int16"
elif sf.subtype == "PCM_24":
format_bits = 24
read_dtype = "int32"
else:
raise ValueError(f"unknown subtype in {fp.name} ({sf.subtype})")

max_bits = int(fmt) - 500
if format_bits > max_bits:
raise ValueError(
f"wrong resolution in {fp.name} "
f"({format_bits}, expected <= {max_bits})"
)

if sf.channels != n_sig:
raise ValueError(
f"wrong number of channels in {fp.name} "
f"({sf.channels}, expected {n_sig})"
)

# Read the samples.
start_samp = start_frame * samps_per_frame[0]
end_samp = end_frame * samps_per_frame[0]
sf.seek(start_samp + sample_offset)

# We could do this:
# sig_data = sf.read(end_samp - start_samp, dtype=read_dtype)
# However, sf.read fails for huge blocks (over 2**24 total
# samples) due to a bug in libsndfile:
# https://github.com/libsndfile/libsndfile/issues/431
# So read the data in chunks instead.
n_samp = end_samp - start_samp
sig_data = np.empty((n_samp, n_sig), dtype=read_dtype)
CHUNK_SIZE = 1024 * 1024
for chunk_start in range(0, n_samp, CHUNK_SIZE):
chunk_end = chunk_start + CHUNK_SIZE
chunk_data = sf.read(out=sig_data[chunk_start:chunk_end])
samples_read = chunk_data.shape[0]
if samples_read != CHUNK_SIZE:
sig_data = sig_data[: chunk_start + samples_read]
break

# If we read an 8-bit stream as int16 or a 24-bit stream as
# int32, soundfile shifts each sample left by 8 bits. We
# want to undo this shift (and, in the case of 8-bit data,
# convert to an int8 array.)
if format_bits == 8:
# np.right_shift(sig_data, 8, dtype='int8') doesn't work.
# This seems wrong, but the numpy documentation is unclear.
sig_data2 = np.empty(sig_data.shape, dtype="int8")
sig_data = np.right_shift(sig_data, 8, out=sig_data2)
elif format_bits == 24:
# Shift 32-bit array in-place.
np.right_shift(sig_data, 8, out=sig_data)

# Suppose we have 3 channels and 2 samples per frame. The array
# returned by sf.read looks like this:
#
# channel 0 channel 1 channel 2
# time 0 [0,0] [0,1] [0,2]
# time 1 [1,0] [1,1] [1,2]
# time 2 [2,0] [2,1] [2,2]
# time 3 [3,0] [3,1] [3,2]
#
# We reshape this first into the following:
#
# channel 0 channel 1 channel 2
# time 0 [0,0,0] [0,0,1] [0,0,2]
# time 1 [0,1,0] [0,1,1] [0,1,2]
# time 2 [1,0,0] [1,0,1] [1,0,2]
# time 3 [1,1,0] [1,1,1] [1,1,2]
#
# Then we transpose axes 1 and 2:
#
# channel 0 channel 1 channel 2
# time 0 [0,0,0] [0,1,0] [0,2,0]
# time 1 [0,0,1] [0,1,1] [0,2,1]
# time 2 [1,0,0] [1,1,0] [1,2,0]
# time 3 [1,0,1] [1,1,1] [1,2,1]
#
# Then when we reshape the array to 1D, the result is in dat file
# order:
#
# channel 0 channel 1 channel 2
# time 0 [0] [2] [4]
# time 1 [1] [3] [5]
# time 2 [6] [8] [10]
# time 3 [7] [9] [11]

sig_data = sig_data.reshape(-1, samps_per_frame[0], n_sig)
sig_data = sig_data.transpose(0, 2, 1)
return sig_data.reshape(-1)
return _rd_compressed_stream(fp, fmt, sample_offset, n_sig, samps_per_frame, start_frame, end_frame)


def _skew_sig(
Expand Down
6 changes: 5 additions & 1 deletion wfdb/io/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,7 @@ def rdann(
pn_dir=None,
return_label_elements=["symbol"],
summarize_labels=False,
ann_stream=None,
):
"""
Read a WFDB annotation file record_name.extension and return an
Expand Down Expand Up @@ -1947,7 +1948,10 @@ def rdann(
)

# Read the file in byte pairs
filebytes = load_byte_pairs(record_name, extension, pn_dir)
if ann_stream is not None:
filebytes = np.frombuffer(ann_stream.read(), "<u1").reshape([-1, 2]).astype(np.int32)
else:
filebytes = load_byte_pairs(record_name, extension, pn_dir).astype(np.int32)

# Get WFDB annotation fields from the file bytes
(sample, label_store, subtype, chan, num, aux_note) = proc_ann_bytes(
Expand Down
Loading

0 comments on commit 0378030

Please sign in to comment.