Skip to content

Commit

Permalink
allow expanded physical signal in calc_adc_params
Browse files Browse the repository at this point in the history
  • Loading branch information
briangow committed Oct 29, 2024
1 parent ddf4e20 commit c9808ad
Showing 1 changed file with 122 additions and 79 deletions.
201 changes: 122 additions & 79 deletions wfdb/io/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,21 +699,25 @@ def dac(self, expanded=False, return_res=64, inplace=False):

return p_signal

def calc_adc_params(self):
def calc_adc_gain_baseline(self, ch, minvals, maxvals):
"""
Compute appropriate adc_gain and baseline parameters for adc
conversion, given the physical signal and the fmts.
Compute adc_gain and baseline parameters for a given channel.
Parameters
----------
N/A
ch: int
The channel that the adc_gain and baseline are being computed for.
minvals: list
The minimum values for each channel.
maxvals: list
The maximum values for each channel.
Returns
-------
adc_gains : list
List of calculated `adc_gain` values for each channel.
baselines : list
List of calculated `baseline` values for each channel.
adc_gain : float
Calculated `adc_gain` value for a given channel.
baseline : int
Calculated `baseline` value for a given channel.
Notes
-----
Expand All @@ -728,86 +732,125 @@ def calc_adc_params(self):
int32 `baseline` values, but does not consider over/underflow
for calculated float `adc_gain` values.
"""
# Get the minimum and maximum (valid) storage values
dmin, dmax = _digi_bounds(self.fmt[ch])
# add 1 because the lowest value is used to store nans
dmin = dmin + 1

pmin = minvals[ch]
pmax = maxvals[ch]

# Figure out digital samples used to store physical samples

# If the entire signal is NAN, gain/baseline won't be used
if pmin == np.nan:
adc_gain = 1
baseline = 1
# If the signal is just one value, store one digital value.
elif pmin == pmax:
if pmin == 0:
adc_gain = 1
baseline = 1
else:
# All digital values are +1 or -1. Keep adc_gain > 0
adc_gain = abs(1 / pmin)
baseline = 0
# Regular varied signal case.
else:
# The equation is: p = (d - b) / g

# Approximately, pmax maps to dmax, and pmin maps to
# dmin. Gradient will be equal to, or close to
# delta(d) / delta(p), since intercept baseline has
# to be an integer.

# Constraint: baseline must be between +/- 2**31
adc_gain = (dmax - dmin) / (pmax - pmin)
baseline = dmin - adc_gain * pmin

# Make adjustments for baseline to be an integer
# This up/down round logic of baseline is to ensure
# there is no overshoot of dmax. Now pmax will map
# to dmax or dmax-1 which is also fine.
if pmin > 0:
baseline = int(np.ceil(baseline))
else:
baseline = int(np.floor(baseline))

# After baseline is set, adjust gain correspondingly.Set
# the gain to map pmin to dmin, and p==0 to baseline.
# In the case where pmin == 0 and dmin == baseline,
# adc_gain is already correct. Avoid dividing by 0.
if dmin != baseline:
adc_gain = (dmin - baseline) / pmin

# Remap signal if baseline exceeds boundaries.
# This may happen if pmax < 0
if baseline > MAX_I32:
# pmin maps to dmin, baseline maps to 2**31 - 1
# pmax will map to a lower value than before
adc_gain = (MAX_I32) - dmin / abs(pmin)
baseline = MAX_I32
# This may happen if pmin > 0
elif baseline < MIN_I32:
# pmax maps to dmax, baseline maps to -2**31 + 1
adc_gain = (dmax - MIN_I32) / pmax
baseline = MIN_I32

return adc_gain, baseline

def calc_adc_params(self):
"""
Compute appropriate adc_gain and baseline parameters for adc
conversion, given the physical signal and the fmts.
Parameters
----------
N/A
Returns
-------
adc_gains : list
List of calculated `adc_gain` values for each channel.
baselines : list
List of calculated `baseline` values for each channel
"""
adc_gains = []
baselines = []

if np.where(np.isinf(self.p_signal))[0].size:
raise ValueError("Signal contains inf. Cannot perform adc.")
if self.p_signal is not None:
if np.where(np.isinf(self.p_signal))[0].size:
raise ValueError("Signal contains inf. Cannot perform adc.")

# min and max ignoring nans, unless whole channel is NAN.
# Should suppress warning message.
minvals = np.nanmin(self.p_signal, axis=0)
maxvals = np.nanmax(self.p_signal, axis=0)
# min and max ignoring nans, unless whole channel is NAN.
# Should suppress warning message.
minvals = np.nanmin(self.p_signal, axis=0)
maxvals = np.nanmax(self.p_signal, axis=0)

for ch in range(np.shape(self.p_signal)[1]):
# Get the minimum and maximum (valid) storage values
dmin, dmax = _digi_bounds(self.fmt[ch])
# add 1 because the lowest value is used to store nans
dmin = dmin + 1
for ch in range(np.shape(self.p_signal)[1]):
adc_gain, baseline = self.calc_adc_gain_baseline(ch, minvals, maxvals)
adc_gains.append(adc_gain)
baselines.append(baseline)

pmin = minvals[ch]
pmax = maxvals[ch]
elif self.e_p_signal is not None:
minvals = []
maxvals = []
for ch in self.e_p_signal:
minvals.append(min(x for x in ch if not math.isnan(x)))
maxvals.append(max(x for x in ch if not math.isnan(x)))

# Figure out digital samples used to store physical samples
if any(x == math.inf for x in minvals) or any(x == math.inf for x in maxvals):
raise ValueError("Signal contains inf. Cannot perform adc.")

# If the entire signal is NAN, gain/baseline won't be used
if pmin == np.nan:
adc_gain = 1
baseline = 1
# If the signal is just one value, store one digital value.
elif pmin == pmax:
if pmin == 0:
adc_gain = 1
baseline = 1
else:
# All digital values are +1 or -1. Keep adc_gain > 0
adc_gain = abs(1 / pmin)
baseline = 0
# Regular varied signal case.
else:
# The equation is: p = (d - b) / g

# Approximately, pmax maps to dmax, and pmin maps to
# dmin. Gradient will be equal to, or close to
# delta(d) / delta(p), since intercept baseline has
# to be an integer.

# Constraint: baseline must be between +/- 2**31
adc_gain = (dmax - dmin) / (pmax - pmin)
baseline = dmin - adc_gain * pmin

# Make adjustments for baseline to be an integer
# This up/down round logic of baseline is to ensure
# there is no overshoot of dmax. Now pmax will map
# to dmax or dmax-1 which is also fine.
if pmin > 0:
baseline = int(np.ceil(baseline))
else:
baseline = int(np.floor(baseline))

# After baseline is set, adjust gain correspondingly.Set
# the gain to map pmin to dmin, and p==0 to baseline.
# In the case where pmin == 0 and dmin == baseline,
# adc_gain is already correct. Avoid dividing by 0.
if dmin != baseline:
adc_gain = (dmin - baseline) / pmin

# Remap signal if baseline exceeds boundaries.
# This may happen if pmax < 0
if baseline > MAX_I32:
# pmin maps to dmin, baseline maps to 2**31 - 1
# pmax will map to a lower value than before
adc_gain = (MAX_I32) - dmin / abs(pmin)
baseline = MAX_I32
# This may happen if pmin > 0
elif baseline < MIN_I32:
# pmax maps to dmax, baseline maps to -2**31 + 1
adc_gain = (dmax - MIN_I32) / pmax
baseline = MIN_I32

adc_gains.append(adc_gain)
baselines.append(baseline)
for ch, _ in enumerate(self.e_p_signal):
adc_gain, baseline = self.calc_adc_gain_baseline(ch, minvals, maxvals)
adc_gains.append(adc_gain)
baselines.append(baseline)

else:
raise Exception('Must supply p_signal or e_p_signal to calc_adc_params')

return (adc_gains, baselines)

Expand Down

0 comments on commit c9808ad

Please sign in to comment.