from functools import partial
from typing import Callable, List, Tuple, Union
from warnings import warn
import h5py
import numpy as np
from deprecation import deprecated
from tqdm.auto import trange
import cait as ai
import cait.versatile as vai
from ..data._baselines import calculate_mean_nps
from ..features._mp import calc_additional_parameters, calc_main_parameters
from ..features._ph_corr import calc_correlated_ph
from ..filter._ma import rem_off
from ..filter._of import get_amplitudes, optimal_transfer_function
from ..fit._pm_fit import fit_pulse_shape
from ..fit._sev import generate_standard_event
from ..fit._templates import pulse_template
from ..styles._print_styles import txt_fmt
from ..trigger._peakdet import get_triggers
# convenience function used in calc_mp
@deprecated(deprecated_in='1.3.0', removed_in="2.0.0", details="Helper functions for dh.calc_mp will be removed together with dh.calc_mp")
def _calc_mp_helper(event, down, max_bounds):
return calc_main_parameters(event, down, max_bounds).getArray()
# convenience function used in apply_of
@deprecated(deprecated_in='1.3.0', removed_in="2.0.0", details="Helper functions for dh.apply_of will be removed together with dh.apply_of")
def _apply_of_helper(events,
functions: List[callable],
correlated: bool,
onsets: List[int]):
n_channels = len(functions)
# if only one channel is present, we just call the filtering function on this channel
if n_channels == 1:
return functions[0](events)
# if multiple channels are present but first_channel_dominant=False, we just apply the
# filtering to the channels separately
elif not correlated:
out = [functions[i](events[:,i]) for i in range(n_channels)]
# restructure the outputs such that same outputs for all channels are grouped
return tuple([np.array(list(i)).T for i in zip(*out)])
# if multiple channels are present and first_channel_dominant=True, we fist apply the
# filtering to the first channel and use its peak_pos to filter the remaining channels
else:
out = [functions[0](events[:,0])]
# peak_pos is second output of the function call above
peak_pos = out[0][1]
for i in range(1, n_channels):
out.append(partial(functions[i], peakpos=peak_pos+onsets[i])(events[:,i]))
# restructure the outputs such that same outputs for all channels are grouped
return tuple([np.array(list(i)).T for i in zip(*out)])
# -----------------------------------------------------------
# CLASS
# -----------------------------------------------------------
[docs]
class FeaturesMixin(object):
"""
A Mixin Class to the DataHandler Class with methods for the calculation of features of the data.
"""
# -----------------------------------------------------------
# FEATURE CALCULATION
# -----------------------------------------------------------
# Calculate MP
[docs]
@deprecated(deprecated_in='1.3.0', removed_in="2.0.0", details="Use DataHandler.cmp() instead.")
def calc_mp(self,
type: str = 'events',
path_h5: str = None,
processes: int = -1,
down: int = 1,
max_bounds: Tuple[int] = None):
"""
Calculate the Main Parameters for the Events in an HDF5 File.
This method is described in "CRESST Collaboration, First results from the CRESST-III low-mass dark matter program"
(10.1103/PhysRevD.100.102002).
:param type: The group in the HDF5 set, either events or testpulses.
:type type: string
:param path_h5: An alternative full path to a hdf5 file, e.g. "data/bck_001.h5".
:type path_h5: string or None
:param processes: The number of processes to use for the calculation. If -1, all available resources are used.
:type processes: int
:param down: The events get downsampled by this factor for the calculation of main parameters.
:type down: int
:param max_bounds: The interval of indices to which we restrict the maximum search for the pulse height.
:type max_bounds: tuple of two ints
"""
if not path_h5: path_h5 = self.path_h5
if processes == -1: processes = ai._available_workers
f = partial(_calc_mp_helper, down=down, max_bounds=max_bounds)
events = [self.get_event_iterator(type, channel=ch, batch_size=100) for ch in range(self.nmbr_channels)]
n_events = len(events[0])
out = []
print(txt_fmt('Calculating main parameters ...', style="bold"))
for ch in range(self.nmbr_channels):
out.append( vai.apply(f,
events[ch],
n_processes=processes if n_events>1000 else 1,
pb_prefix=f"Channel {ch}") )
self.set(type,
mainpar=np.array(out),
dtype=np.float32,
overwrite_existing=True,
write_to_virtual=False)
print("\n")
with h5py.File(path_h5, 'r+') as h5f:
group = h5f[type]
group['mainpar'].attrs.create(name='pulse_height', data=0)
group['mainpar'].attrs.create(name='t_zero', data=1)
group['mainpar'].attrs.create(name='t_rise', data=2)
group['mainpar'].attrs.create(name='t_max', data=3)
group['mainpar'].attrs.create(name='t_decaystart', data=4)
group['mainpar'].attrs.create(name='t_half', data=5)
group['mainpar'].attrs.create(name='t_end', data=6)
group['mainpar'].attrs.create(name='offset', data=7)
group['mainpar'].attrs.create(name='linear_drift', data=8)
group['mainpar'].attrs.create(name='quadratic_drift', data=9)
# calc stdevent testpulses
[docs]
def calc_sev(self,
type='events',
use_labels=False,
correct_label=None,
use_idx=None,
name_appendix='',
pulse_height_interval=None,
left_right_cutoff=None,
rise_time_interval=None,
decay_time_interval=None,
onset_interval=None,
remove_offset=True,
baseline_model='constant',
verb=True,
scale_fit_height=True,
scale_to_unit=None,
sample_length=None,
t0_start=None,
opt_start=False,
memsafe=True,
batch_size=1000,
lower_bound_tau=None,
upper_bound_tau=None,
pretrigger_samples=500,
):
"""
Calculate the Standard Event for the Events in the HDF5 File.
This method is described in "CRESST Collaboration, First results from the CRESST-III low-mass dark matter program"
(10.1103/PhysRevD.100.102002).
:param type: The group name in the HDF5 set, either "events" or "testpulses".
:type type: string
:param use_labels: Tf True a labels file must be included in the hdf5 file,
then only the events labeled as events or testpulses are included in the calculation.
:type use_labels: bool
:param correct_label: The label to be used for the sev generation.
:type correct_label: int
:param use_idx: Only these indices are included for the sev generation.
:type use_idx: list of ints
:param name_appendix: This gets appended to the group name stdevent in the HDF5 set.
:type name_appendix: string
:param pulse_height_interval: The upper and lower bound for the pulse heights to include into the creation
of the SEV.
:type pulse_height_interval: list of NMBR_CHANNELS lists of length 2 (intervals)
:param left_right_cutoff: The maximal abs value of the R-L baseline difference of events
to be included in the SEV calculation.
:type left_right_cutoff: list of NMBR_CHANNELS floats
:param rise_time_interval: The upper
and lower bound for the rise time in ms to include into the creation of the SEV.
:type rise_time_interval: list of NMBR_CHANNELS lists of length 2 (intervals)
:param decay_time_interval: The upper
and lower bound for the decay time in ms to include into the creation of the SEV.
:type decay_time_interval: list of NMBR_CHANNELS lists of length 2 (intervals)
:param onset_interval: The upper
and lower bound for the onset time in ms to include into the creation of the SEV.
:type onset_interval: list of NMBR_CHANNELS lists of length 2 (intervals)
:param remove_offset: Tf True the offset is removed before the events are superposed for the
sev calculation. Highly recommended!
:type remove_offset: bool
:param baseline_model: Either 'constant', 'linear' or 'exponential'. The baseline model substracted from all
events.
:type baseline_model: str
:param verb: If True some verbal feedback is output about the progress of the method.
:type verb: bool
:param scale_fit_height: If True the parametric fit to the sev is normalized to height 1 after
the fit is done.
:type scale_fit_height: bool
:param scale_to_unit: If True corresponding to a channel, the standard event is scaled to 1. Default True. If
False for a channel, the parametric fit is not applied but automatically set to values that produce an empty array.
In this case, also the scale_fit_height is not done for this channel.
:type scale_to_unit: bool list of length nmbr_channels or None
:param sample_length: The length of one sample in milliseconds. If None, this is calculated from the sample
frequency.
:type sample_length: float
:param t0_start: The start values for t0 in the fit.
:type t0_start: 2-tupel of floats
:param opt_start: If true, a pre-fit is applied to find optimal start values.
:type opt_start: bool
:param memsafe: Recommended! If activated, not all events get loaded into memory.
:type memsafe: bool
:param batch_size: The batch size for the calculation of the SEV.
:type batch_size: int
:param lower_bound_tau: The lower bound for all tau values in the fit.
:type lower_bound_tau: float
:param upper_bound_tau: The upper bound for all tau values in the fit.
:type upper_bound_tau: float
:param pretrigger_samples: The number of samples from start of the record window that are considered the pre
trigger region.
:type pretrigger_samples: int
"""
assert not memsafe or (use_labels == False and correct_label is None and pulse_height_interval is None and
left_right_cutoff is None and rise_time_interval is None and decay_time_interval is None
and onset_interval is None), \
'The memsafe option does not allow for correct_label, ' \
'pulse_height_interval, left_right_cutoff, rise_time_interval,' \
'decay_time_interval, onset_interval argument. It is ' \
'recommended to hand a use_labels flag instead!'
assert memsafe or (lower_bound_tau is None and upper_bound_tau is None and
baseline_model == 'constant'), 'For using arguments lower_bound_tau, ' \
'upper_bound_tau and baseline_model activate memsafe option!'
if scale_to_unit is None:
scale_to_unit = [True for i in range(self.nmbr_channels)]
if sample_length is None:
sample_length = 1 / self.sample_frequency * 1000
if lower_bound_tau is None:
lower_bound_tau = [1e-2 for i in range(self.nmbr_channels)]
if upper_bound_tau is None:
upper_bound_tau = [3e3 for i in range(self.nmbr_channels)]
# set the start values for t0
if t0_start is None:
t0_start = [-3 for i in range(self.nmbr_channels)]
with h5py.File(self.path_h5, 'r+') as h5f:
events = h5f[type]['event']
mainpar = h5f[type]['mainpar']
std_evs = []
# if no pulse_height_interval is specified set it to average values for all channels
if pulse_height_interval == None:
pulse_height_interval = [None
for c in range(self.nmbr_channels)]
# fix the issue with different arguments for different channels
inp = [left_right_cutoff, rise_time_interval, decay_time_interval, onset_interval]
for i, var in enumerate(inp):
if var is None:
inp[i] = [None for c in range(self.nmbr_channels)]
if use_labels:
labels = h5f[type]['labels']
else:
labels = [None for c in range(self.nmbr_channels)]
if correct_label is None:
if type == 'events':
sev = h5f.require_group('stdevent' + name_appendix)
correct_label = 1
elif type == 'testpulses':
sev = h5f.require_group('stdevent_tp' + name_appendix)
correct_label = 2
else:
raise NotImplementedError('Type must be events or testpulses!')
else:
sev = h5f.require_group('stdevent' + name_appendix)
if use_idx is None:
use_idx = list(range(events.shape[1]))
for c in range(self.nmbr_channels):
print('\nCalculating SEV for Channel {}'.format(c))
if not memsafe:
std_evs.append(generate_standard_event(events=events[c, use_idx, :],
main_parameters=mainpar[c, use_idx, :],
labels=labels[c],
correct_label=correct_label,
pulse_height_interval=pulse_height_interval[c],
left_right_cutoff=inp[0][c],
rise_time_interval=inp[1][c],
decay_time_interval=inp[2][c],
onset_interval=inp[3][c],
remove_offset=remove_offset,
verb=verb,
scale_fit_height=scale_fit_height,
scale_to_unit=scale_to_unit[c],
sample_length=sample_length,
t0_start=t0_start[c],
opt_start=opt_start,
))
else:
print('{} Events used to generate Standardevent.'.format(len(use_idx)))
nmbr_batches = int(len(use_idx) / batch_size)
std_evs.append([np.zeros(events.shape[2]), ])
for b in range(nmbr_batches):
start = int(b * batch_size)
stop = int((b + 1) * batch_size)
ev = np.array(events[c, use_idx[start:stop]])
if remove_offset:
rem_off(ev, baseline_model, pretrigger_samples)
std_evs[c][0] += np.sum(ev, axis=0)
# last batch
start = int(nmbr_batches * batch_size)
ev = np.array(events[c, use_idx[start:]])
if remove_offset:
rem_off(ev, baseline_model, pretrigger_samples)
std_evs[c][0] += np.sum(ev, axis=0)
std_evs[c][0] /= len(use_idx)
if scale_to_unit[c]:
std_evs[c][0] /= np.max(std_evs[c][0])
par = fit_pulse_shape(std_evs[c][0], sample_length=sample_length,
t0_start=t0_start[c],
opt_start=opt_start,
lower_bound_tau=lower_bound_tau[c],
upper_bound_tau=upper_bound_tau[c], )
if scale_fit_height:
t = (np.arange(0, len(std_evs[c][0]), dtype=float) - len(std_evs[c][0]) / 4) * sample_length
fit_max = np.max(pulse_template(t, *par))
print('Parameters [t0, An, At, tau_n, tau_in, tau_t]:\n', par)
if not np.isclose(fit_max, 0, rtol=3e-3):
par[1] /= fit_max
par[2] /= fit_max
else:
par = [0, 0, 0, 1, 1, 1]
std_evs[c].append(par)
sev.require_dataset('event',
shape=(self.nmbr_channels, len(std_evs[0][0])), # this is then length of sev
dtype='f')
sev['event'][...] = np.array([x[0] for x in std_evs])
sev.require_dataset('fitpar',
shape=(self.nmbr_channels, len(std_evs[0][1])),
dtype='float')
sev['fitpar'][...] = np.array([x[1] for x in std_evs])
# description of the fitparameters (data=column_in_fitpar)
sev['fitpar'].attrs.create(name='t_0', data=0)
sev['fitpar'].attrs.create(name='A_n', data=1)
sev['fitpar'].attrs.create(name='A_t', data=2)
sev['fitpar'].attrs.create(name='tau_n', data=3)
sev['fitpar'].attrs.create(name='tau_in', data=4)
sev['fitpar'].attrs.create(name='tau_t', data=5)
mp = np.array([calc_main_parameters(x[0]).getArray() for x in std_evs])
sev.require_dataset('mainpar',
shape=mp.shape,
dtype='float')
sev['mainpar'][...] = mp
# description of the mainpar (data=col_in_mainpar)
sev['mainpar'].attrs.create(name='pulse_height', data=0)
sev['mainpar'].attrs.create(name='t_zero', data=1)
sev['mainpar'].attrs.create(name='t_rise', data=2)
sev['mainpar'].attrs.create(name='t_max', data=3)
sev['mainpar'].attrs.create(name='t_decaystart', data=4)
sev['mainpar'].attrs.create(name='t_half', data=5)
sev['mainpar'].attrs.create(name='t_end', data=6)
sev['mainpar'].attrs.create(name='offset', data=7)
sev['mainpar'].attrs.create(name='linear_drift', data=8)
sev['mainpar'].attrs.create(name='quadratic_drift', data=9)
print('{} SEV calculated.'.format(type))
[docs]
def calc_of(self, down: int = 1, name_appendix: str = '', window: bool = True, use_this_sev: list = None):
"""
Calculate the Optimum Filer from the NPS and the SEV.
The data format and method was described in "(2018) N. Ferreiro Iachellini, Increasing the sensitivity to
low mass dark matter in cresst-iii witha new daq and signal processing", doi 10.5282/edoc.23762.
:param down: The downsample factor of the optimal filter transfer function.
:type down: int
:param name_appendix: A string that is appended to the group name stdevent and optimumfilter.
:type name_appendix: string
:param window: Include a window function to the standard event before building the filter.
:type window: bool
:param use_this_sev: Here you can hand an alternativ list of standard events for all channels, in case you
do not want to use one that is stored in the HDF5 set.
:type use_this_sev: list
"""
with h5py.File(self.path_h5, 'r+') as h5f:
if use_this_sev is None:
stdevent_pulse = np.array([h5f['stdevent' + name_appendix]['event'][i]
for i in range(self.nmbr_channels)])
else:
stdevent_pulse = use_this_sev
mean_nps = np.array([h5f['noise']['nps'][i] for i in range(self.nmbr_channels)])
if down > 1:
stdevent_pulse = np.mean(stdevent_pulse.reshape(-1, int(stdevent_pulse.shape[1] / down), down), axis=2)
first_nps_val = mean_nps[:, 0]
mean_nps = mean_nps[:, 1:]
mean_nps = np.mean(mean_nps.reshape(-1, int(mean_nps.shape[1] / down), down), axis=2)
mean_nps = np.concatenate((first_nps_val.reshape(-1, 1), mean_nps), axis=1)
print('CREATE OPTIMUM FILTER.')
of = np.array([optimal_transfer_function(
stdevent_pulse[i], mean_nps[i], window) for i in range(self.nmbr_channels)])
optimumfilter = h5f.require_group('optimumfilter' + name_appendix)
if down > 1:
optimumfilter.require_dataset('optimumfilter_real_down{}'.format(down),
dtype='float',
shape=of.real.shape)
optimumfilter.require_dataset('optimumfilter_imag_down{}'.format(down),
dtype='float',
shape=of.real.shape)
optimumfilter['optimumfilter_real_down{}'.format(down)][...] = of.real
optimumfilter['optimumfilter_imag_down{}'.format(down)][...] = of.imag
else:
optimumfilter.require_dataset('optimumfilter_real',
dtype='float',
shape=of.real.shape)
optimumfilter.require_dataset('optimumfilter_imag',
dtype='float',
shape=of.real.shape)
optimumfilter['optimumfilter_real'][...] = of.real
optimumfilter['optimumfilter_imag'][...] = of.imag
print('OF updated.')
# apply the optimum filter
[docs]
@deprecated(deprecated_in='1.3.0', removed_in="2.0.0", details="Use dh.apply_ofilter instead.")
def apply_of(self,
type: str = 'events',
name_appendix_group: str = '',
name_appendix_set: str = '',
chunk_size: int = 100,
hard_restrict: bool = False,
down: int = 1,
window: bool = True,
first_channel_dominant: bool = False,
baseline_model: str = 'constant',
pretrigger_samples: int = 500,
onset_to_dominant_channel: List[int] = None,
flexibility: int = 1,
calc_rms: bool = False,
processes: int = -1
):
"""
Calculates the height of events or testpulses after applying the optimum filter.
:param type: The group name in the HDF5 set, either events of testpulses.
:type type: string
:param name_appendix_group: A string that is appended to the stdevent group in the HDF5 set.
:type name_appendix_group: string
:param name_appendix_set: A string that is appended to the of_ph set in the HDF5 set.
:type name_appendix_set: string
:param chunk_size: The size how many events are processes simultaneously to avoid memory error.
:type chunk_size: int
:param hard_restrict: If True, the maximum search is restricted to 20-30% of the record window.
:type hard_restrict: bool
:param down: The events get downsampled with this factor before application of the filter.
:type down: int
:param window: If true, a window function is applied to the record window, before filtering. This is recommended,
to avoid artifacts from left-right offset differences of the baseline.
:type window: bool
:param first_channel_dominant: Take the maximum position from the first channel and evaluate the others at the
same position.
:type first_channel_dominant: bool
:param baseline_model: Either 'constant', 'linear' or 'exponential'. The baseline model substracted from all
events.
:type baseline_model: str
:param pretrigger_samples: The number of samples from start of the record window that are considered the pre
trigger region.
:type pretrigger_samples: int
:param onset_to_dominant_channel: The difference in the onset value to the dominant channel. If e.g. the second
channel has a typical max_pos value of 4000, but the first of 4100, then the onset for this would be -100.
:type onset_to_dominant_channel: list of ints
:param flexibility: In case a peak position is provided, the maximum search can still deviate by this amount of samples.
:type flexibility: int
:param calc_rms: If true, calculated also the rms of the filtered pulses.
:type calc_rms: bool
:param processes: The number of processes to use for the calculation. If -1, all available resources are used.
:type processes: int
"""
if processes == -1: processes = ai._available_workers
if onset_to_dominant_channel is None:
onset_to_dominant_channel = np.zeros(self.nmbr_channels)
assert len(onset_to_dominant_channel) == self.nmbr_channels, \
'onset_to_dominant_channel must have length nmbr_channels!'
sev = self.get(f'stdevent{name_appendix_group}', 'event')
nps = self.get('noise', 'nps')
# Use existing optimum filter if it has already been calculated
if self.exists(f'optimumfilter{name_appendix_group}'):
of_real = self.get(f'optimumfilter{name_appendix_group}', 'optimumfilter_real')
of_imag = self.get(f'optimumfilter{name_appendix_group}', 'optimumfilter_imag')
transfer_function = of_real + 1j*of_imag
else:
transfer_function = [None]*self.nmbr_channels
events = self.get_event_iterator(type, batch_size=chunk_size)
fs = [partial(get_amplitudes,
stdevent=sev[c],
nps=nps[c],
hard_restrict=hard_restrict,
down=down,
window=window,
return_peakpos=True,
baseline_model=baseline_model,
pretrigger_samples=pretrigger_samples,
transfer_function=transfer_function[c],
flexibility=flexibility,
calc_rms=calc_rms)
for c in range(events.n_channels)]
wrapper = partial(_apply_of_helper,
functions=fs,
correlated=first_channel_dominant and events.n_channels>1,
onsets=onset_to_dominant_channel)
wrapper.batch_support = "full"
print(txt_fmt('Calculating OF pulse heights ...', style="bold"))
# first output: optimum filter pulse height, second output: peakpos (not relevant anymore)
# remaining output: empty if calc_rms=False, else rms and peak_rms
of_ph, _, *remainder = vai.apply(wrapper, events, n_processes=processes if len(events)>1000 else 1)
if calc_rms:
output = {'of_ph' + name_appendix_set: of_ph.T,
'of_rms' + name_appendix_set: remainder[0].T,
'of_rms_peak' + name_appendix_set: remainder[1].T}
else:
output = {'of_ph' + name_appendix_set: of_ph.T}
self.set(type,
**output,
dtype=np.float32,
overwrite_existing=True,
write_to_virtual=False)
print("\n")
[docs]
def apply_ofilter(
self,
group: str,
of: np.ndarray,
sev: np.ndarray,
*,
only_channels: Union[int, List[int]] = None,
with_processing: Union[Callable, List[Callable]] = None,
on_stream: bool = False,
tag: str = "",
batch_size: int = 100,
preview: bool = False,
**kwargs,
):
"""
Calculate optimum filter pulse heights.
See below for some common pitfalls and refer to the documentation of :class:`cait.versatile.OFPulseHeight` for more details.
:param group: The DataHandler group with the events to which we want to apply the filter. The results will be saved to this group, too (see :class:`cait.versatile.OFPulseHeight` for a description of the outputs).
:type group: str
:param of: The optimum filter(s) to use. One for each channel, i.e. has shape ``(N, M//2+1)`` where ``N`` is the number of channels and ``M`` is the record length. Using 2D filters is no exception here: The filters have to be supplied as shape ``(N, M//2+1)`` arrays.
:type of: np.ndarray
:param sev: The standard events corresponding to the filters. They are used as reference to calculate the filter (peak) RMS. One standard event per entry in ``of`` is needed. I.e. has to be of shape ``(N, M)``.
:type sev: np.ndarray
:param only_channels: If you only want to filter some of the channels in 'group', you can specify the channel index/indices here. Note that the sizes of ``sev`` and ``of`` have to match the number of channels to be filtered, i.e. if you filter two channels, the ``of`` also has to have two channels. Defaults to None, i.e. filtering all channels in 'group'. Note that if you select a subset of channels here, the arguments ``filter_groups``, ``max_search`` and ``relative_to`` that you can pass as keyword arguments to :class:`cait.versatile.OFPulseHeight` (see below) refer to the channel indices **of the remaining channels**.
:type only_channels: Union[int, List[int]], optional
:param with_processing: Optional processing to be applied to the event traces before calculating the optimum filter heights. See :func:`cait.versatile.iterators.IteratorBaseClass.with_processing` for details. A processing worth mentioning is :class:`cait.versatile.TukeyWindow`.
:type with_processing: Union[Callable, List[Callable]], optional
:param on_stream: If True, the event traces are extended on the original stream, adding samples outside the record window. This allows for filtering without edge effects. Of course, this only works if the reference to the stream is saved in the DataHandler, which happens automatically if you use :func:`cait.mixins.TriggerCollectionMixin.trigger_of` or :func:`cait.mixins.TriggerCollectionMixin.trigger_zscore` for triggering. Adding the reference manually after triggering is painful and is not recommended. If you nevertheless want to perform such a calculation, you can use :func:`cait.versatile.OFPulseHeight` and `cait.versatile.iterators.StreamIterator.with_extended_window` to achieve the same. Defaults to False.
:type on_stream: bool, optional
:param tag: A string that is appended to the datasets when they are saved to the DataHandler (e.g. if you want to apply different filters). This string is appended with a hyphen, i.e. for ``tag="wafer"`` this would result in datasets like ``of_ph-wafer``. Defaults to an empty string, i.e. no tag.
:type tag: str, optional
:param batch_size: The number of events to process at once.
:type batch_size: int, optional
:param preview: If True, an interactive preview illustrating the filter evaluation using the current input arguments on the event traces opens up. Defaults to False
:type preview: bool, optional
:param kwargs: Additional keyword arguments passed to :class:`cait.versatile.OFPulseHeight`. Notable arguments are ``max_search`` (to specify where to search for maxima), ``relative_to`` (to specify relative to which channels the filtered traces should be evaluated), ``peak_rms_width`` (number of samples for peak RMS calculation) and ``filter_groups`` (used to specify which channels should be treated together using a 2D filter). Refer to its documentation page for more details.
:type kwargs: any, optional
.. note::
If you have two channels and you want to search the maximum of the first channel around 1/4th of the record window and evaluate the second channel ``k`` samples *before* the maximum found in the first channel, you have to pass the arguments ``max_search=[(0.2, 0.4), -k]`` and ``relative_to=[None, 0]``.
.. note::
To apply a 2d optimum filter to channels 0 and 1 and search for the maximum in channel 2 at most ``k`` samples away from the 2d filter maximum of the first two channels, pass arguments ``filter_groups=[(0, 1), 2]``, ``max_search=[(0.2, 0.4), (-k, k)]``, and ``relative_to=[None, 0]``. Notice here that the elements of ``relative_to`` refer to the entries in ``filter_groups``.
.. note::
If you see weird edge effects in your filtered traces, it might be a good idea to apply the filter on an extended window (on the initial stream) using ``on_stream=True``. This only works if a reference to the initial stream is still available in the DataHandler.
If it is not, you can also try adding ``with_processing=[vai.RemoveBaseline(), vai.TukeyWindow()]`` which should reduce edge effects (especially for events which do not fully decay within the record window).
.. warning::
If you start using ``only_channels``, ``relative_to``, ``max_search``, and ``filter_groups`` together, you have to be careful with what the indices are referring to: Once you specify ``only_channels``, the entries of the remaining arguments refer to the indices **of the remaining channels**. I.e. if ``only_channels=[0, 2]``, the first entries in ``relative_to`` and ``max_search`` refer to channel 0, wherease the second entries refer to (the old) channel 2. Likewise, if you already chose ``only_channels=[0, 2]``, a valid 2d filter group would be ``filter_groups=[(0, 1)]`` (**not** ``[(0, 2)]``!).
.. warning::
The output arrays **always** have as many channels as the events in the specified ``group``. If you select a subset of channels using ``only_channels``, the remaining channels will be filled with -404, indicating missing values. **However**, if you also set ``filter_groups`` (in order to apply a 2D filter), you inherently lose the channel-index correspondence. To keep things consistent, the output values of any 2D filters are **duplicated** into the channels that were filtered together. E.g. if you choose ``filter_groups=[(0, 1), 2]``, the first and second row of the output arrays will have identical data (results of the 2D filtering), while the third row will have the data from channel 2.
**Example:**
.. code-block:: python
import cait as ai
import cait.versatile as vai
record_length = 2**14
# Generate mock data (two cannels)
md = vai.MockData(record_length=record_length)
it = md.get_event_iterator()
sev, of = md.sev, md.of
# Generate mock stream (two channels)
s = vai.MockStream(rate_Hz=1, seed=137)
# Initialize DataHandler
dh = ai.DataHandler(record_length=record_length,
nmbr_channels=2,
sample_frequency=s.sample_frequency)
dh.set_filepath(path_h5="",
fname="apply_ofilter_test",
appendix=False)
dh.init_empty()
# Trigger to get traces in DataHandler
dh.trigger_zscore(s, trigger_channels=["Ch0"], passive_channels=["Ch1"])
# Calculate filter pulse heights (evaluate channel 1 relative
# to channel 0). For more examples, see vai.OFPulseHeight.
dh.apply_ofilter(
"events",
of=of,
sev=sev,
max_search=[(0.2, 0.4), (-1000, 0)],
relative_to=[None, 0],
# preview=True, # set to True to get a preview before commiting
)
"""
if only_channels is None:
only_channels = slice(None)
elif isinstance(only_channels, int):
only_channels = [only_channels]
if with_processing is None:
with_processing = []
elif callable(with_processing):
with_processing = [with_processing]
# Load events for specified group. If 'on_stream=True', the resulting
# iterator is a StreamIterator (if available).
events = self.get_event_iterator(
group=group,
channel=only_channels,
prefer_external=on_stream,
)
n_ev = len(events)
n_ch_total = self.get_event_iterator(group=group).n_channels
# In case we use on_stream=True, we will extend the record windows.
# Sometimes, this results in extending windows outside the valid range
# of the stream. This is to be expected and intentionally throws and
# error. However, in this 'convenience' DataHandler function, we detect
# such issues and set the offending events' output to -404.
valid_flag = np.ones(len(events), dtype=bool)
if len(set([
a := events.n_channels,
b := np.atleast_2d(of).shape[0],
c := np.atleast_2d(sev).shape[0],
])) > 1:
raise ValueError(f"The number of OF and SEV channels must match the number of selected channels for filtering. Got {a}, {b}, {c}.")
if on_stream:
if not isinstance(
events,
(
vai.iterators.StreamIterator,
vai.iterators.IteratorCollection,
vai.iterators.PulseSimIterator,
),
):
raise Exception("Unable to load StreamIterator. There seems to be no stream reference to the original events in the DataHandler. Set 'on_stream=False' to continue.")
# If IteratorCollection, check right away if the contents are StreamIterators. This
# can be done easily by checking for the existance of the 'with_record_length' function:
try:
events.with_record_length(events.record_length)
except NotImplementedError:
raise Exception("Unable to load StreamIterator. There seems to be no stream reference to the original events in the DataHandler. Set 'on_stream=False' to continue.")
# If we can load events from stream, we use linear convolution
# and extend the iterator
if "method" in kwargs:
raise KeyError("When using 'on_stream=True', you cannot additionally set 'method', as it is overridden by 'method='linear''.")
kwargs["method"] = "linear"
# It is possible, that some of the extended windows fall outside the valid stream range
# because the trigger algorithm may still find triggers close to the end of a stream.
# The exception raised by 'with_extended_window()' in such a case is intentional.
# However, from a convenience perspective, it would be much nicer, if such events were
# just removed from the analysis. What we do here is set their RMS values to -404 but
# don't throw an error.
# The offending events are always among the first/last events, so we only have to check
# them. For an IteratorCollection, however, the check is a bit more annoying.
def find_offending_indices(it):
# Check at most 10 events (per stream) to be removed.
try_max = min(len(it), 10)
offending_indices = []
inds = np.argsort(it.timestamps)
# Check smallest timestamps.
for i in range(try_max):
try:
# If the specified timestamp causes an error ...
it[:, inds[i]].with_extended_window()
except IndexError:
# ... it is considered offending
offending_indices.append(inds[i])
# Check largest timestamps.
try:
it[:, inds[-i]].with_extended_window()
except IndexError:
offending_indices.append(inds[-i])
# For very short interators, there could be an overlap
# when looking from the front and the back of the list.
# Using list(set()) gets rid of potential duplicates.
return list(set(offending_indices))
if isinstance(events, (vai.iterators.StreamIterator, vai.iterators.PulseSimIterator)):
inds = find_offending_indices(events)
else: # must be Collection because of the check above
inds = []
offset = 0
for it in events.iterators:
sub_inds = find_offending_indices(it)
inds.extend([i + offset for i in sub_inds])
offset += len(it)
if len(inds) > 0:
print(f"Found {len(inds)} events that have record windows outside the stream's boundaries after extending their windows. Those will have RMS values of -404 in the final datasets.")
valid_flag[inds] = False
events = events[:, valid_flag].with_extended_window()
# Processing needs to be added after a potential increase of the window size
if with_processing:
events = events.with_processing(with_processing)
# Configuration of filter evaluation is handled by OFPulseHeight
f = vai.OFPulseHeight(of=of, sev=sev, **kwargs)
if preview:
return vai.Preview(
events.with_processing(
with_processing + [vai.RemoveBaseline()]
), f
)
of_res = vai.apply(f, events.with_batchsize(batch_size), pb_prefix="Calculating OF pulse heights")
of_res_dict = {k: v for k, v in zip(f.names(), of_res)}
# See docstring for how we treat 'missing channels' in the output arrays.
if only_channels == slice(None):
selected_channels = list(range(n_ch_total))
else:
# Those may be unordered.
selected_channels = only_channels
if "filter_groups" in kwargs:
filter_groups = kwargs["filter_groups"]
else:
filter_groups = selected_channels
# NOTE: The next step assumes that the outputs of OFPulseHeight are all scalar,
# meaning that the arrays returned are 1D for single channel data and 2D else.
# This allows for a simple construction of the output arrays.
# Sanitize data (such that all of the datasets have shape (n_events, n_channels), even if n_channels=1)
of_res_dict = {k: (v if v.ndim>1 else np.atleast_2d(v).T) for k, v in of_res_dict.items()}
for n, t in zip(f.names(), f.dtypes()):
out = -404*np.ones((n_ch_total, n_ev), dtype=t)
# NOTE: It is important to distinguish the position of the channels in the of_res_dict
# from the actual channel indices in the DataHandler.
# Some 2D filter has been applied
if len(filter_groups) != len(selected_channels):
for i, fg in enumerate(filter_groups):
if isinstance(fg, int):
actual_ch_id = selected_channels[fg]
out[actual_ch_id, valid_flag] = of_res_dict[n][:, i].flatten()
elif isinstance(fg, tuple):
for ch_id in fg:
actual_ch_id = selected_channels[ch_id]
out[actual_ch_id, valid_flag] = of_res_dict[n][:, i].flatten()
else:
# This should never be raised because OFPulseHeight validates the input.
# Nevertheless, this could prevent headaches in case that mechanism ever fails.
raise TypeError(f"Unsupported type '{type(fg)}' for entry in 'filter_groups'.")
# Simple treatment: only separate channels (I know that this can be considered
# a special case of the nested abomination above but I think it's cleaner this way).
else:
for i, ch_id in enumerate(selected_channels):
out[ch_id, valid_flag] = of_res_dict[n][:, i].flatten()
self.set(
group=group,
**{f"{n}" + (f"-{tag}" if tag else ""): out},
dtype=t,
overwrite_existing=True,
write_to_virtual=False,
)
if np.any(self.get(group, "of_rms" + (f"-{tag}" if tag else "")) == -404):
print(f"{txt_fmt('One or more RMS value(s) was/were set to -404.', style='bold')} The corresponding filter values should not be trusted! If you set 'on_stream=True' it is possible that some of the events extended outside of the stream range after expanding them. Such events cannot be reliably filtered and were removed from the calculation and their RMS value was set to -404. Likewise, if you chose only to filter a subset of channels using the 'only_channels' argument, channels which were not filtered had their RMS values set to -404. {txt_fmt('This means that you should ALWAYS perform a cut of the form RMS>0.', style='bold')}")
# calc stdevent carrier
[docs]
def calc_exceptional_sev(self,
naming,
channel=0,
type='events',
use_prediction_instead_label=False,
model=None,
correct_label=None,
use_idx=None,
pulse_height_interval=[0, 10],
left_right_cutoff=None,
rise_time_interval=None,
decay_time_interval=None,
onset_interval=None,
remove_offset=True,
verb=True,
scale_fit_height=True,
sample_length=None):
"""
Calculate an exceptional Standard Event for a Class in the HDF5 File, for only one specific channel.
Attention! Since v1.0 can as well use the regular calc_sev method for calculating SEVs of different pulse shapes.
This method is therefore no longer maintained.
:param naming: Pick a name for the type of event, e.g. 'carrier'.
:type naming: string
:param channel: The number of the channel in the hdf5 file.
:type channel: int
:param type: The group name in the HDF5 set, either "events" or "testpulses".
:type type: string
:param use_prediction_instead_label: If True then instead of the labels the predictions are used.
:type use_prediction_instead_label: bool
:param model: If set this is the name of the model whiches predictions are in the
h5 file, e.g. "RF" --> look for "RF_predictions".
:type model: string or None
:param correct_label: Use only events with this label.
:type correct_label: int or None
:param use_idx: If set then only these indices are used for the sev creation.
:type use_idx: list of ints or None
:param pulse_height_interval: The upper
and lower bound for the pulse heights to include into the creation of the SEV.
:type pulse_height_interval: list of length 2 (interval)
:param left_right_cutoff: The maximal abs value of the linear slope of events
to be included in the Sev calculation. Based on the sample index as x-values.
:type left_right_cutoff: float
:param rise_time_interval: The upper
and lower bound for the rise time to include into the creation of the SEV.
Based on the sample index as x-values.
:type rise_time_interval: lists of length 2 (interval)
:param decay_time_interval: The upper
and lower bound for the decay time to include into the creation of the SEV.
Based on the sample index as x-values.
:type decay_time_interval: list of length 2 (interval)
:param onset_interval: The upper
and lower bound for the onset time to include into the creation of the SEV.
Based on the sample index as x-values.
:type onset_interval: list of length 2 (interval)
:param remove_offset: If True the offset is removed before the events are superposed for the
sev calculation. Highly recommended!
:type remove_offset: bool
:param verb: If True, some verbal feedback is output about the progress of the method.
:type verb: bool
:param scale_fit_height: If True the parametric fit to the sev is normalized to height 1 after
the fit is done.
:type scale_fit_height: bool
:param sample_length: The length of one sample in milliseconds. If None, this is calculated from the sample
frequency.
:type sample_length: float
"""
warn(
'Attention! Since v1.0 can as well use the regular calc_sev method for calculating SEVs of different pulse shapes. This method is therefore no longer maintained.')
if sample_length is None:
sample_length = 1000 / self.sample_frequency
with h5py.File(self.path_h5, 'r+') as h5f:
if correct_label is None and use_idx is None:
raise KeyError('Provide either Correct Label or Index List!')
if correct_label is not None:
if use_prediction_instead_label:
if model is not None:
labels = h5f[type]['{}_predictions'.format(model)][channel]
else:
raise KeyError('Please provide a model string!')
else:
labels = h5f[type]['labels'][channel]
else:
labels = None
if use_idx is None:
use_idx = [i for i in range(len(labels))]
events = h5f[type]['event'][channel, use_idx, :]
mainpar = h5f[type]['mainpar'][channel, use_idx, :]
if labels is not None:
labels = labels[use_idx]
sev = h5f.require_group('stdevent_{}'.format(naming))
sev_event, par = generate_standard_event(events=events,
main_parameters=mainpar,
labels=labels,
correct_label=correct_label,
pulse_height_interval=pulse_height_interval,
left_right_cutoff=left_right_cutoff,
rise_time_interval=rise_time_interval,
decay_time_interval=decay_time_interval,
onset_interval=onset_interval,
remove_offset=remove_offset,
verb=verb,
scale_fit_height=scale_fit_height,
sample_length=sample_length)
sev.require_dataset('event',
shape=(len(sev_event),), # this is then length of sev
dtype='f')
sev['event'][...] = sev_event
sev.require_dataset('fitpar',
shape=(len(par),),
dtype='float')
sev['fitpar'][...] = par
# description of the fitparameters (data=column_in_fitpar)
sev['fitpar'].attrs.create(name='t_0', data=0)
sev['fitpar'].attrs.create(name='A_n', data=1)
sev['fitpar'].attrs.create(name='A_t', data=2)
sev['fitpar'].attrs.create(name='tau_n', data=3)
sev['fitpar'].attrs.create(name='tau_in', data=4)
sev['fitpar'].attrs.create(name='tau_t', data=5)
mp = calc_main_parameters(sev_event).getArray()
sev.require_dataset('mainpar',
shape=mp.shape,
dtype='f')
sev['mainpar'][...] = mp
# description of the mainpar (data=col_in_mainpar)
sev['mainpar'].attrs.create(name='pulse_height', data=0)
sev['mainpar'].attrs.create(name='t_zero', data=1)
sev['mainpar'].attrs.create(name='t_rise', data=2)
sev['mainpar'].attrs.create(name='t_max', data=3)
sev['mainpar'].attrs.create(name='t_decaystart', data=4)
sev['mainpar'].attrs.create(name='t_half', data=5)
sev['mainpar'].attrs.create(name='t_end', data=6)
sev['mainpar'].attrs.create(name='offset', data=7)
sev['mainpar'].attrs.create(name='linear_drift', data=8)
sev['mainpar'].attrs.create(name='quadratic_drift', data=9)
print('{} SEV calculated.'.format(type))
[docs]
def calc_nps(self, use_labels=False, down=1, percentile=None,
rms_cutoff=None, cut_flag=None, window=True, force_zero=True):
"""
Calculates the mean Noise Power Spectrum with option to use only the baselines
that are labeled as noise (label == 3).
:param use_labels: If True only baselines that are labeled as noise are included.
:type use_labels: bool
:param down: A factor by that the baselines are downsampled before the calculation - must be 2^x.
:type down: int
:param percentile: The lower percentile of the fit errors of the baselines that we include in the calculation.
:type percentile: int
:param rms_cutoff: Only baselines with a fit rms below this values are included in the NPS calculation. This
will overwrite the percentile argument, if it is not set to None.
:type rms_cutoff: list of nmbr_channels floats
:param cut_flag: Only the noise baselines for which the value in this array is True, are used for the
calculation.
:type cut_flag: 1d bool array
:param window: If True, a window function is applied to the noise baselines before the calculation of the NPS.
:type window: bool
:param force_zero: Force the zero coefficient (constant offset) of the NPS to zero.
:type force_zero: bool
"""
print('Calculate NPS.')
if rms_cutoff is None:
rms_cutoff = [None for c in range(self.nmbr_channels)]
# open file
with h5py.File(self.path_h5, 'r+') as h5f:
baselines = np.array(h5f['noise']['event'])
mean_nps = []
for c in range(self.nmbr_channels):
bl = baselines[c]
if use_labels and cut_flag is None:
labels = np.array(h5f['noise']['labels'][c])
bl = bl[labels[c] == 3]
elif use_labels and cut_flag is not None:
labels = np.array(h5f['noise']['labels'][c])
labels = labels[cut_flag]
bl = bl[cut_flag]
bl = bl[labels[c] == 3]
elif not use_labels and cut_flag is None:
pass
elif not use_labels and cut_flag is not None:
bl = bl[cut_flag]
if 'fit_rms' in h5f['noise']:
rms_baselines = h5f['noise']['fit_rms'][c]
if cut_flag is not None:
rms_baselines = rms_baselines[cut_flag]
else:
rms_baselines = None
mean_nps.append(calculate_mean_nps(bl,
down=down,
percentile=percentile,
rms_baselines=rms_baselines,
sample_length=1 / self.sample_frequency,
rms_cutoff=rms_cutoff[c],
window=window)[0])
mean_nps = np.array([mean_nps[i] for i in range(self.nmbr_channels)])
frequencies = np.fft.rfftfreq(self.record_length, d=1. / self.sample_frequency * down)
if force_zero:
mean_nps[:, 0] = 0
naming = 'nps'
naming_fq = 'freq'
if down > 1:
naming += '_down' + str(down)
naming_fq += '_down' + str(down)
h5f['noise'].require_dataset(naming,
shape=mean_nps.shape,
dtype='float')
h5f['noise'][naming][...] = mean_nps
h5f['noise'].require_dataset(naming_fq,
shape=frequencies.shape,
dtype='float')
h5f['noise'][naming_fq][...] = frequencies
[docs]
@deprecated(deprecated_in='1.3.0', removed_in="2.0.0", details="Use DataHandler.cmp() instead.")
def calc_additional_mp(self,
type: str = 'events',
path_h5: str = None,
down: int = 1,
no_of: bool = False,
processes: int = -1):
"""
Calculate the additional Main Parameters for the Events in an HDF5 File.
:param type: The group name within the HDF5 file, either events or testpulses.
:type type: string
:param path_h5: An alternative full path to the hdf5 file, e.g. "data/bck_001.h5".
:type path_h5: string
:param down: The downsample rate before calculating the parameters.
:type down: int
:param no_of: Do not use the optimum filter, fill the quantities with zeros instead.
:type no_of: bool
:param processes: The number of processes to use for the calculation. If -1, all available resources are used.
:type processes: int
"""
if not path_h5: path_h5 = self.path_h5
if processes == -1: processes = ai._available_workers
assert self.exists("optimumfilter") or no_of, 'You need to calculate the optimal filter first, or activate no_of!'
if no_of:
of = [None]*self.nmbr_channels
else:
of_real = self.get("optimumfilter", "optimumfilter_real")
of_imag = self.get("optimumfilter", "optimumfilter_imag")
of = of_real + 1j * of_imag
f = [partial(calc_additional_parameters, optimal_transfer_function=of[c], down=down)
for c in range(self.nmbr_channels)]
events = [self.get_event_iterator(type, channel=ch, batch_size=100) for ch in range(self.nmbr_channels)]
n_events = len(events[0])
out = []
print(txt_fmt('Calculating additional main parameters ...', style="bold"))
for ch in range(self.nmbr_channels):
out.append( vai.apply(f[ch],
events[ch],
n_processes=processes if n_events>1000 else 1,
pb_prefix=f"Channel {ch}")
)
self.set(type,
add_mainpar=np.array(out),
dtype=np.float32,
overwrite_existing=True,
write_to_virtual=False)
print("\n")
with h5py.File(path_h5, 'r+') as h5f:
group = h5f[type]
group['add_mainpar'].attrs.create(name='array_max', data=0)
group['add_mainpar'].attrs.create(name='array_min', data=1)
group['add_mainpar'].attrs.create(name='var_first_eight', data=2)
group['add_mainpar'].attrs.create(name='mean_first_eight', data=3)
group['add_mainpar'].attrs.create(name='var_last_eight', data=4)
group['add_mainpar'].attrs.create(name='mean_last_eight', data=5)
group['add_mainpar'].attrs.create(name='var', data=6)
group['add_mainpar'].attrs.create(name='mean', data=7)
group['add_mainpar'].attrs.create(name='skewness', data=8)
group['add_mainpar'].attrs.create(name='max_derivative', data=9)
group['add_mainpar'].attrs.create(name='ind_max_derivative', data=10)
group['add_mainpar'].attrs.create(name='min_derivative', data=11)
group['add_mainpar'].attrs.create(name='ind_min_derivative', data=12)
group['add_mainpar'].attrs.create(name='max_filtered', data=13)
group['add_mainpar'].attrs.create(name='ind_max_filtered', data=14)
group['add_mainpar'].attrs.create(name='skewness_filtered_peak', data=15)
[docs]
def apply_logical_cut(self,
cut_flag: list,
naming: str,
channel: int,
type: str = 'events',
delete_old: bool = False):
"""
Save the cut flag of a logical cut within the HDF5 file.
:param cut_flag: The cut flag that we want to save.
:type cut_flag: list of bools
:param naming: The naming of the dataset to save.
:type naming: string
:param channel: The channel for that the cut flag is meant.
:type channel: int
:param type: The naming of the group in the HDF5 file, in that we want to save the cut flag, e.g. 'events'.
:type type: string
:param delete_old: If true, the old dataset of this name in the group 'type' gets deleted.
:type delete_old: bool
"""
with h5py.File(self.path_h5, 'r+') as f:
if delete_old:
if naming in f[type]:
print('Delete old {} dataset'.format(naming))
del f[type][naming]
cut_dataset = f[type].require_dataset(name=naming,
shape=(self.nmbr_channels, len(cut_flag)),
dtype=bool)
cut_dataset[channel, ...] = cut_flag
print('Applied logical cut.')
[docs]
@deprecated(deprecated_in='1.2.0', removed_in="2.0.0", details="Use DataHandler.set() instead.")
def include_values(self,
values: list,
naming: str,
channel: int,
type: str = 'events',
delete_old: bool = False):
"""
Include values as a data set in the HDF5 file.
Typically this is used to store values of cuts or calibrated energies.
:param values: The values that we want to include in the file.
:type values: list of floats
:param naming: The name of the data set in the HDF5 file.
:type naming: string
:param channel: The channel number to which we want to include the cut values.
:type channel: int
:param type: The group name in the HDF5 set.
:type type: string
:param delete_old: If a set by this name exists already, it gets deleted first.
:type delete_old: bool
"""
with h5py.File(self.path_h5, 'r+') as f:
if delete_old:
if naming in f[type]:
print('Delete old {} dataset'.format(naming))
del f[type][naming]
cut_dataset = f[type].require_dataset(name=naming,
shape=(self.nmbr_channels, len(values)),
dtype=float)
cut_dataset[channel, ...] = values
print('Included values.')
[docs]
def calc_peakdet(self, type='events', lag=1024, threshold=5, look_ahead=1024):
"""
Calculate the number of prominent peaks within the record window. A number > 1 points towards pile up events.
Based on https://stackoverflow.com/a/22640362/15216821.
:param type: The group name of the HDF5 set.
:type type: str
:param lag: The lag value of the algorithm, i.e. the number of samples that are taken to calculate the
moving mean and standard deviation.
:type lag: int
:param threshold:
:type threshold: int
:param look_ahead: When a sample triggers, we look for even higher samples in the subsequent look_ahead number
of samples.
:type look_ahead: int
"""
with h5py.File(self.path_h5, 'r+') as f:
print('CALCULATE NUMBER OF PEAKS.')
nmbr_events = f[type]['event'].shape[1]
nmbr_peaks = np.empty((self.nmbr_channels, nmbr_events), dtype=float)
for c in range(self.nmbr_channels):
for i in trange(nmbr_events):
signal, _, _, _ = get_triggers(array=f[type]['event'][c, i],
lag=lag,
threshold=threshold,
init_mean=None,
init_var=None,
look_ahead=look_ahead)
nmbr_peaks[c, i] = len(signal)
set_peaks = f[type].require_dataset(name='nmbr_peaks',
shape=nmbr_peaks.shape,
dtype=float)
set_peaks[...] = nmbr_peaks