Source code for cait.versatile.eventfunctions.scalarfunctions.calcmp

import numpy as np

from ..functionbase import FncBaseClass
from ..processing.removebaseline import RemoveBaseline
from ..processing.boxcarsmoothing import BoxCarSmoothing

#### HAS NO TESTCASE YET ####
[docs]class CalcMP(FncBaseClass): """ Calculates main parameters for an event. If the argument ``dT`` is set to ``None``, the output is an array of shape ``(n_channels, 9)``, where the nine entries are ``ph, t_0, t_rise, t_max, t_decaystart, t_half, t_end, offset, lin_drift``, and quantities starting with ``t_`` are given as sample indices. If the argument ``dT`` is set (to the microsecond time base of the recording), the (human readable) quantities ``pulse_height (V), onset (ms), rise_time (ms), decay_time (ms), slope (V)`` as a tuple. Also works for multiple channels simultaneously. :param dt_us: The microsecond time base of the recording. If not provided, physical time constants cannot be computed and the function outputs indices. See above. :type dt_us: int, optional :param peak_bounds: The region in the record window which is considered for peak search. A tuple ``(0,1)`` searches the entire window, ``(0,1/2)`` only searches the first half, etc. Defaults to ``(1/5, 2/5)``. :type peak_bounds: tuple, optional :param edge_size: The (relative) size of the record window that is used to compute the linear drift. Defaults to 1/8, meaning that the first and last 1/8th of the record window is used. :type edge_size: float, optional :param box_car_smoothing: Arguments for class:`BoxCarSmoothing`, which are used for the application of the moving average. Defaults to ``{'length': 50}``. :type box_car_smoothing: dict, optional :param fit_baseline: Arguments for class:`FitBaseline`, which are used for the baseline subtractions. Defaults to ``{'model': 0, 'where': 1/8, 'xdata': None}``. :type fit_baseline: dict, optional :return: Either an array or tuple. See above. :rtype: np.ndarray, tuple **Example:** :: import cait.versatile as vai # Get events from mock data (and remove baseline) it = vai.MockData().get_event_iterator().with_processing(vai.RemoveBaseline()) # Without the 'dt_us' keyword (output is numpy array) mp_array = vai.apply(vai.CalcMP(), it) # WITH the 'dt_us' keyword (output is tuple with physical values) pulse_height, onset, rise_time, decay_time, slope = vai.apply(vai.CalcMP(dt_us=it.dt_us), it) # Example for plotting a histogram of the pulse hights of both channels vai.Histogram({'ch0': pulse_height[:,0], 'ch1': pulse_height[:,1]}) **Example Preview:** :: import cait.versatile as vai # Get events from mock data (and remove baseline) it = vai.MockData().get_event_iterator().with_processing(vai.RemoveBaseline())[0] # View main parameter calculation in action. The plot shows the original # event, the event after the moving average, and the selected datapoints # for calculating the main parameters. vai.Preview(it, vai.CalcMP()) .. image:: media/CalcMP_preview.png """ def __init__(self, dt_us: int = None, peak_bounds: tuple = (1/5, 2/5), edge_size: float = 1/8, box_car_smoothing: dict = {'length': 50}, fit_baseline: dict = {'model': 0, 'where': 1/8, 'xdata': None}, ): if not (isinstance(peak_bounds, tuple) and len(peak_bounds) == 2): raise ValueError(f"'{peak_bounds}' is not a valid interval.") if peak_bounds[0] > peak_bounds[1] or any([peak_bounds[0]<0, peak_bounds[1]<0, peak_bounds[0]>1, peak_bounds[1]>1]): raise ValueError("'peak_bounds' must be a tuple of increasing values between 0 and 1") self._dt_us = dt_us self._peak_bounds = peak_bounds self._edge_size = edge_size self._box_car_smoothing = BoxCarSmoothing(**box_car_smoothing) self._remove_baseline = RemoveBaseline(fit_baseline) def __call__(self, event): length = event.shape[-1] sl = slice(int(self._peak_bounds[0]*length), int(self._peak_bounds[1]*length)) self._smooth_event = self._box_car_smoothing(event) self._lin_drift = (np.mean(self._smooth_event[..., -int(self._edge_size*length):], axis=-1, keepdims=True) - np.mean(self._smooth_event[..., :int(self._edge_size*length)], axis=-1, keepdims=True) ) / length self._offset = np.mean(event[..., :int(self._edge_size*length)], axis=-1, keepdims=True) self._bl_removed_event = self._remove_baseline(self._smooth_event) self._t_max = (np.argmax(self._bl_removed_event[..., sl], axis=-1, keepdims=True) + sl.start).astype(np.int32) self._ph = np.take_along_axis(self._bl_removed_event, self._t_max, axis=-1) # Helper array to slice multiple channels simultaneously inds = np.arange(length) self._t0 = length - 1 - np.argmax( np.flip( np.logical_and( self._bl_removed_event < 0.2*self._ph, inds < self._t_max), axis=-1), axis=-1, keepdims=True) self._t_rise = np.argmax( np.logical_and( self._bl_removed_event > 0.8*self._ph, np.logical_and(inds >= self._t0, inds < self._t_max)), axis=-1, keepdims=True) self._t_end = np.argmax( np.logical_and( self._bl_removed_event < 0.368*self._ph, inds >= self._t_max), axis=-1, keepdims=True) self._t_end[self._t_end == 0] = length - 1 self._t_decaystart = np.argmax( np.logical_and( self._bl_removed_event < 0.9*self._ph, inds >= self._t_max), axis=-1, keepdims=True) self._t_decaystart[self._t_decaystart == 0] = length - 1 self._t_half = np.argmax( np.logical_and( self._bl_removed_event < 0.736*self._ph, inds >= self._t_max), axis=-1, keepdims=True) self._t_half[self._t_half == 0] = length - 1 # If no time base is specified, the indices are returned (all in one numpy array) if self._dt_us is None: return np.concatenate([self._ph, self._t0, self._t_rise, self._t_max, self._t_decaystart, self._t_half, self._t_end, self._offset, self._lin_drift], axis=-1) else: return (np.squeeze(self._ph)[()], # pulse height (np.squeeze((self._t0 - length/4)*self._dt_us/1000))[()], # onset (ms) (np.squeeze((self._t_rise - self._t0)*self._dt_us/1000))[()], # rise time (ms) (np.squeeze((self._t_end - self._t_decaystart)*self._dt_us/1000))[()], # decay time (ms) (np.squeeze(self._lin_drift*length))[()] # slope (V) ) @property def batch_support(self): return 'trivial' def preview(self, event) -> dict: self(event) mp = np.concatenate([self._t0, self._t_rise, self._t_max, self._t_decaystart, self._t_half, self._t_end], axis=-1) x = np.arange(event.shape[-1]) if self._dt_us is not None: x = (x - event.shape[-1]/4)*self._dt_us/1000 if np.ndim(event) > 1: d1 = dict() d2 = dict() for i in range(np.ndim(event)): d1[f'channel {i}'] = [x, event[i]] d1[f'channel {i} bcs'] = [x, self._smooth_event[i]] d2[f'MP channel {i}'] = [x[mp[i]], self._smooth_event[i][mp[i]]] else: d1 = {'event': [x, event], 'event bcs': [x, self._smooth_event]} d2 = {'MP': [x[mp], self._smooth_event[mp]]} return dict(line = d1, scatter=d2, axes=dict(xaxis={"label": "time (ms)" if self._dt_us is not None else "index"}))