Source code for cait.versatile.eventfunctions.scalarfunctions.mainparameters

from typing import List, Union

import numpy as np
from scipy.integrate import trapezoid

from ..functionbase import ScalarFncBaseclass
from ..processing.boxcarsmoothing import BoxCarSmoothing
from .fitbaseline import FitBaseline


[docs] class MainParameters(ScalarFncBaseclass): """ Calculate main parameters for an event. These parameters are: - **pulse height** (V): Height of the event - **peak position** (ms): Position of the peak. - **onset** (ms): Start of the pulse, which is assumed to be where the trace rises to 20% of the pulse height. This value is shifted relative to 1/4 of the record length, so should be negative for normal pulses. - **rise time** (ms): Time from onset to reach 80% of the pulse height. - **decay time** (ms): Time (after pulse maximum) from 90% of pulse height to 36.8% (1/e) of pulse height. - **baseline slope** (V/samples): Slope of baseline prior to onset, calculated using a fraction of the record length. - **baseline offset** (V): Offset of the baseline, averaged over a fraction of the record length. - **baseline difference** (V): Difference between the right and left edges of the trace, averaged over a fraction of the record length. - **baseline rms** (V): RMS noise of the baseline over a fraction of the record length. - **minimum derivative** (V): Most negative difference between two samples. - **minimum derivative index** (sample): index of the minimum derivative. - **maximum derivative** (V): Most positive difference between two samples. - **maximum derivative index** (sample): index of the maximum derivative. - **maximum** (V): Maximum minus minimum for the entire trace. - **integral** (V ms): Integral of the trace. - **variance** (V²): Variance of the entire trace. In addition, some parameters are calculated in the way that **CAT** does. These parameters differ in the following: - **onset** (ms): Start of the pulse, found from when the trace is above 3 times the baseline RMS noise. - **rise time** (ms): Calculated from 10% to 90% of the pulse height (and indendent of offset). - **decay time** (ms): Calculated from 90% to 10% of the pulse height (and indendent of offset). :param dt_us: The microsecond time base of the recording. If provided, relevant output (e.g. rise time) will be in units of seconds. If not provided, these will be in terms of samples. :type dt_us: int, optional :param peak_loc: Location to search for, or used as, the peak. If a tuple of floats, this is the fraction of the record length in which the peak is searched, e.g. a tuple of ``(0, 1/2)`` searches the first half of the trace, while ``(0, 1)`` searches the whole trace. If a float, the peak is always assumed to be at that fraction of the record length, e.g. ``1/4`` will take the pulse maximum to be at one quarter of the record length. If an integer, this is the location of the peak in samples. Defaults to ``(1/5, 2/5)``. :type peak_loc: tuple, float, int, optional :param bcs: Keyword arguments for :class:`cait.versatile.BoxCarSmoothing`. See its docstring for details. :type bcs: dict :param fbl: Keyword arguments for :class:`cait.versatile.FitBaseline`. See its docstring for details. :type fbl: dict :return: Main parameters as described above :type: Tuple[np.ndarray] .. code-block:: python import cait.versatile as vai events = vai.MockData().get_event_iterator() f = vai.MainParameters(dt_us=events.dt_us) # Preview the working of the calculation vai.Preview(events, f) # Apply function to all events (this returns a tuple of arrays) mp = vai.apply(f, events) # Generate a directory where the keys correspond to the names # of the main parameters and the values to the calculated values mp_dict = {k: v for k, v in zip(f.names(), mp)} .. image:: media/MainParameters_preview.png """ _outputs = [ ("pulse_height", float), ("peak_position", float), ("onset", float), ("rise_time", float), ("decay_time", float), ("rms", float), ("baseline_slope", float), ("baseline_offset", float), ("baseline_difference", float), ("min_deriv", float), ("min_deriv_index", int), ("max_deriv", float), ("max_deriv_index", int), ("maximum", float), ("integral", float), ("variance", float), # CAT parameters ("onset_CAT", float), ("rise_time_CAT", float), ("decay_time_CAT", float), ] def __init__( self, dt_us: int = None, peak_loc: Union[List[float], List[int], float, int] = [1/5, 2/5], bcs = dict(length=50), fbl = dict(model=1, where=1/8), ): super().__init__() if not isinstance(peak_loc, (tuple, list, np.ndarray, int, float)): raise ValueError("Argument peak_loc must be array-like, int, or " f"float, got {type(peak_loc)}") # Check `peak_loc` for proper input # Integer check if isinstance(peak_loc, int) and peak_loc < 0: raise ValueError("Only integers greater than zero are " f"allowed for `peak_loc`, got {peak_loc}") # Float check elif isinstance(peak_loc, float) and (peak_loc < 0 or peak_loc > 1): raise ValueError("Only floats between 0 and 1 are " f"allowed for `peak_loc`, got {peak_loc}") # Array-like check elif isinstance(peak_loc, (tuple, list, np.ndarray)): peak_loc = np.array(peak_loc) if len(peak_loc) != 2: raise ValueError("If `peak_loc` is array-like, it must have " f"length 2, got len(peak_loc) == {len(peak_loc)}") if np.any(peak_loc < 0): raise ValueError("All entries of `peak_loc` must be " f"greater than 0, got {peak_loc}") if not peak_loc[0] < peak_loc[1]: raise ValueError("If `peak_loc` is array-like, it must be " f"strictly increasing, got {peak_loc}") # Float checks if isinstance(peak_loc[0], float): if not np.all((peak_loc >= 0) & (peak_loc <= 1)): raise ValueError("Only arrays of floats between 0 and 1 are " f"allowed for `peak_loc`, got {peak_loc}") # Integer checks elif isinstance(peak_loc[0], int): if not np.all(peak_loc >= 0): raise ValueError("Only arrays of positive integers are " f"allowed for `peak_loc`, got {peak_loc}") self._dt_us = dt_us self._peak_loc = peak_loc self._fitbaseline = FitBaseline(**fbl) self._smoothing = BoxCarSmoothing(**bcs) def __call__(self, event): # This prevents the user's event from being modified event = np.array(event) # Expand dims if the event is 1-D (we can then work on it as though # there were more than one event, makes assumptions easier) was_single_channel = False if event.ndim == 1: was_single_channel = True event = np.expand_dims(event, 0) # Used for calculations. If dt_us is set, then output has some values # with units in terms of seconds (e.g. rise time in [V/s]), otherwise # in samples (e.g. [V/sample]). _dt = self._dt_us / 1000 if self._dt_us is not None else 1 par, bl_rms = self._fitbaseline(event) bl_rms = bl_rms.reshape(event.shape[:-1]) bl_offset = self._fitbaseline.model(0, par).reshape(event.shape[:-1]) # Since the baseline fit may be an arbitrary polynomial or an exponential, # approximate the slope using finite difference if a fit was used. if self._fitbaseline.model == 0: x0 = self._fitbaseline.xdata[0] x1 = self._fitbaseline.xdata[1] bl_slope = (self._fitbaseline.model(x1, par) - self._fitbaseline.model(x0, par)) / (x1 - x0) bl_slope = bl_slope.reshape(event.shape[:-1]) else: # Approximate from endpoints of the `where` argument x0 = self._fitbaseline.where.start x1 = self._fitbaseline.where.stop step = min(100, (x1 - x0) // 2) bl_slope = ( (np.mean(event[..., x1-step:x1], axis=-1) - np.mean(event[..., x0:x0+step], axis=-1)) / (x1 - x0) ) # After calculating the baseline parameters, we operate on the baseline- # subtracted event. event -= self._fitbaseline.model(self._fitbaseline.xdata, par) # Calculate parameters which need to be calculated pre-smoothing # TODO: choose whether evmax is calculated from the entire trace, or if it # uses self._peak_loc. evmax = event.max(axis=-1) - event.min(axis=-1) # max - min diff = np.diff(event, axis=-1) max_deriv_ind = diff.argmax(axis=-1) # index of maximum derivative min_deriv_ind = diff.argmin(axis=-1) # index of minimum derivative max_deriv = diff.max(axis=-1) / _dt # maximum derivative min_deriv = diff.min(axis=-1) / _dt # minimum derivative bl_diff = np.mean(event[..., -50:], axis=-1) - np.mean(event[..., :50], axis=-1) # baseline difference # The rest of the parameters are calculated from the smoothed event # (unless the `length` kwarg is set to 1) event = self._smoothing(event) # Pulse height is simply the maximum in the search interval if isinstance(self._peak_loc, int): self._peak_pos = np.full(event.shape[:-1], self._peak_loc) ph = event[..., self._peak_loc] elif isinstance(self._peak_loc, float): ind = int(np.round(self._peak_loc * event.shape[-1])) self._peak_pos = np.full(event.shape[:-1], ind) ph = event[..., ind] elif isinstance(self._peak_loc, (tuple, list, np.ndarray)): # peak_loc is always an array, and its type should already be set bounds = np.array(self._peak_loc) if bounds.dtype == float: bounds = (bounds * event.shape[-1]).astype(int) self._peak_pos = np.argmax(event[..., bounds[0]:bounds[1]], axis=-1) + bounds[0] index = tuple(np.indices(event.shape[:-1])) + (self._peak_pos,) ph = event[index].reshape(event.shape[:-1]) # Onset is the last sample above 3x the baseline RMS, searching # backwards from the peak. # --- # Rise time is the amount of time (or number of samples) to go from # 20% of the pulse height to 80% of the pulse height. # DIFFERS FROM CAT: CAT is 10% -> 90%. # --- # Decay time is the amount of time (or number of samples) to go from # 90% of the pulse height to 1/e of the pulse height. # DIFFERS FROM CAT: CAT is 90% -> 10%. # --- _x = np.tile(np.arange(event.shape[-1]), (*event.shape[:-1], 1)) mask_pp = _x < np.expand_dims(self._peak_pos, -1) ph_exp = np.expand_dims(ph, axis=-1) # Used a lot below self._rs = np.argmax(_x * (mask_pp & (event < 0.2*ph_exp)), axis=-1) self._re = np.argmax(_x * (mask_pp & (event < 0.8*ph_exp)), axis=-1) self._ds = np.argmax(_x * (~mask_pp & (event > 0.9*ph_exp)), axis=-1) self._de = np.argmax(_x * (~mask_pp & (event > 1/np.e*ph_exp)), axis=-1) self._os = self._rs - event.shape[-1] // 4 osc = np.argmax(_x * (mask_pp & (event < 3*np.expand_dims(bl_rms, axis=-1))), axis=-1) rsc = np.argmax(_x * (mask_pp & (event < 0.1*ph_exp)), axis=-1) rec = np.argmax(_x * (mask_pp & (event < 0.9*ph_exp)), axis=-1) dsc = self._ds dec = np.argmax(_x * (~mask_pp & (event > 0.1*ph_exp)), axis=-1) # Integral may be useful in rejecting pileup. integral = trapezoid(event, dx=_dt, axis=-1) variance = np.var(event, axis=-1) # Done this way to be able to mask out the rise/decay times which # weren't able to be found. rise_time = np.full(self._rs.shape, -1, dtype=float) decay_time = np.full(self._rs.shape, -1, dtype=float) rcond = (self._rs > 0) & (self._re > 0) dcond = (self._ds > 0) & (self._de > 0) rise_time[rcond] = (self._re - self._rs)[rcond] decay_time[dcond] = (self._de - self._ds)[dcond] rise_time_cat = np.full(rsc.shape, -1, dtype=float) decay_time_cat = np.full(rsc.shape, -1, dtype=float) rcond = (rsc > 0) & (rec > 0) dcond = (dsc > 0) & (dec > 0) rise_time_cat[rcond] = (rec - rsc)[rcond] decay_time_cat[dcond] = (dec - dsc)[dcond] out = [ ph, self._peak_pos * _dt, self._os * _dt, rise_time.astype(float) * _dt, decay_time * _dt, bl_rms, bl_slope, bl_offset, bl_diff, min_deriv, min_deriv_ind, max_deriv, max_deriv_ind, evmax, integral, variance, osc * _dt, rise_time_cat * _dt, decay_time_cat * _dt, ] # Return values # This should fix batch issues... if was_single_channel: out = np.squeeze(out) return tuple(out) @property def batch_support(self): return 'full' def preview(self, event) -> dict: unsmoothed = event.copy() _ = self(event) mp = np.array([self._peak_pos, self._os, self._rs, self._re, self._ds, self._de]).squeeze() _dt = self._dt_us if self._dt_us is not None else 1 x = np.arange(event.shape[-1]) * _dt if event.ndim > 1: d1 = {} d2 = {} for i in range(event.shape[0]): d1[f"channel {i}"] = [x, unsmoothed[i]] d1[f"channel {i} bcs"] = [x, self._smoothing(unsmoothed[i])] d2[f"MP channel {i}"] = [x[mp[:, i]], unsmoothed[i][mp[:, i]]] else: d1 = {'event': [x, event], 'event bcs': [x, self._smoothing(event)]} d2 = {'MP': [x[mp], self._smoothing(event)[mp]]} return dict(line=d1, scatter=d2, axes=dict(xaxis={"label": "time (ms)" if self._dt_us is not None else "index"}))