Source code for cait.versatile.analysisobjects.sev

import json
import os
import re
from typing import Union

import numpy as np
from tqdm.auto import tqdm

import cait as ai

from ..eventfunctions.processing.removebaseline import RemoveBaseline
from ..iterators.iteratorbase import IteratorBaseClass
from ..plot.basic.line import Line
from .arraywithbenefits import ArrayWithBenefits
from .helper import is_array_like


[docs] class SEV(ArrayWithBenefits): """ Object representing a Standard Event (SEV). It can either be created by averaging events from an `EventIterator`, from an `np.ndarray` or read from a DataHandler or xy-file. If created from an `EventIterator`, the (constant) baseline is removed automatically. :param data: The data to use for the SEV. If None, an empty SEV is created. If `np.ndarray` (or a list that can be converted to an array), each row in the array is interpreted as a SEV for separate channels. If iterator (possibly from multiple channels) a SEV is calculated by averaging the events returned by the iterator. Defaults to None. :type data: Union[np.array, Type[IteratorBaseClass]] :param dt_us: The microsecond timebase used in the recording. Only necessary if the input is an array (because it can be automatically inferred if constructed from an EventIterator, a file, or a DataHandler). Defaults to None (i.e. automatic detection if possible). :type dt_us: int """ def __init__( self, data: Union[np.ndarray, IteratorBaseClass] = None, dt_us: int = None ): if data is None: self._sev = np.empty(0) self._n_ch = 0 self._dt_us = dt_us elif isinstance(data, IteratorBaseClass): self._dt_us = data.dt_us mean_pulse, maxima = self.unscaled(data) self._sev = mean_pulse / maxima if self._sev.ndim > 1: self._n_ch = self._sev.shape[0] if self._n_ch == 1: self._sev = self._sev.flatten() else: self._n_ch = 1 elif isinstance(data, np.ndarray) or is_array_like(data): if dt_us is None: raise ValueError( "If SEV is constructed from array(-like) input, the microsecond timebase of the recording (dt_us) has to be specified." ) self._dt_us = dt_us self._sev = np.array(data) if self._sev.ndim > 1: self._n_ch = self._sev.shape[0] if self._n_ch == 1: self._sev = self._sev.flatten() else: self._n_ch = 1 else: raise ValueError( f"Unsupported datatype '{type(data)}' for input argument 'data'." )
[docs] @classmethod def unscaled(cls, it: IteratorBaseClass): """ Construct SEV from event iterator WITHOUT scaling amplitudes to unity. :param it: Events from which to construct the SEV. :type it: IteratorBaseClass :return: Instance of SEV and relative amplitudes of all channels. :rtype: Tuple[SEV, np.ndarray] **Example:** .. code-block:: python import cait.versatile as vai # Construct mock data to get event iterator it = vai.MockData().get_event_iterator() # Get the SEV WITHOUT normalizing (unscaled_sev) and # the pulse heights (maxima) for all channels. unscaled_sev, maxima = vai.SEV.unscaled(it) # With this information, you can construct any desired # normalization. E.g. the following gives the regular # SEV, where all channels are normalized to pulse height 1. regular_sev = unscaled_sev/maxima # However, if you want to normalize all channels to the first # channel, e.g., you can do sev_scaled_to_ch0 = unscaled_sev/maxima[0] """ it = it.flatten().with_processing(RemoveBaseline()) if len(it) > 1000: mean_pulse = np.zeros_like(it.grab(0)) with it: for ev in tqdm(it, delay=5): mean_pulse += ev mean_pulse /= len(it) else: with it: mean_pulse = np.mean(it, axis=0) # Normalization maxima = np.max(mean_pulse, axis=-1) # Cast maxima into a column vector such that vectorization works if maxima.ndim > 0: maxima = maxima[:, None] return cls(mean_pulse, dt_us=it.dt_us), maxima
[docs] @classmethod def from_dh(cls, dh, group: str = "stdevent", dataset: str = "event"): """ Construct SEV from DataHandler. :param dh: The DataHandler instance to read from. :type dh: DataHandler :param group: The HDF5 group where the SEV is stored. :type group: str :param dataset: The HDF5 dataset where the SEV is stored. :type dataset: str :return: Instance of SEV. :rtype: SEV """ return cls(dh.get(group, dataset), dt_us=dh.dt_us)
[docs] def to_dh(self, dh, group: str = "stdevent", dataset: str = "event", **kwargs): """ Save SEV to DataHandler. :param dh: The DataHandler instance to write to. :type dh: DataHandler :param group: The HDF5 group where the SEV should be stored. :type group: str :param dataset: The HDF5 dataset where the SEV should be stored. :type dataset: str :param kwargs: Keyword arguments for :func:`cait.DataHandler.set`. :type kwargs: Any """ if self._dt_us != dh.dt_us: raise ValueError( f"Timebase of SEV ({self._dt_us}) does not match the one of DataHandler ({dh.dt_us})." ) data = self._sev[None, :] if self._n_channels == 1 else self._sev dh.set(group, **{dataset: data}, **kwargs)
[docs] @classmethod def from_file(cls, fname: str, src_dir: str = ""): """ Construct SEV from xy-file. :param fname: Filename to look for (without file-extension) :type fname: str :param src_dir: Directory to look in. Defaults to '' which means searching current directory. Optional :type src_dir: str :return: Instance of SEV. :rtype: SEV """ fpath = os.path.join(src_dir, fname + ".xy") # We read the file to find out how many header lines it has with open(fpath, "r") as f: first_line = f.readline() header = re.findall(r"\{.*\}", first_line) if header and all([x in json.loads(header[0]) for x in ["dt_us", "n_ch"]]): info = json.loads(header[0]) arr = np.loadtxt(fpath, skiprows=info["n_ch"] + 3)[:, 1:].T dt_us = info["dt_us"] else: content = np.loadtxt(fpath) arr = content[:, 1:].T dt_us = int(np.round(np.diff(content[:, 0])[0] * 1000)) return cls(arr, dt_us=dt_us)
[docs] def to_file(self, fname: str, out_dir: str = "", info_str: str = ""): """ Write SEV to xy-file. :param fname: Filename to use (without file-extension) :type fname: str :param out_dir: Directory to write to. Defaults to '' which means writing to current directory. Optional :type out_dir: str :param info_str: An info string to be saved as a description of the SEV in the header of the file. Optional :type info_str: str """ if np.array_equal(self._array, np.empty(0)): raise Exception("Empty SEV cannot be saved.") fpath = os.path.join(out_dir, fname + ".xy") if self._n_ch > 1: data = [self.t] + [self._array[k] for k in range(self._n_ch)] else: data = [self.t, self._array] header = ( "SEV " + json.dumps( { "cait": ai.__version__, "dt_us": self.dt_us, "record_length": len(self.t), "n_ch": self._n_channels, **({"info": str(info_str)} if info_str else {}) } ) + "\n" ) header += "\n".join( ["Time (ms)"] + [f"Trace {k} (V)" for k in range(self._n_ch)] + [f"{len(self.t)}"] ) np.savetxt(fpath, np.array(data).T, header=header)
[docs] def show(self, **kwargs): """ Plot SEV for all channels. To inspect just one channel, you can index SEV first and call `.show` on the slice. :param kwargs: Keyword arguments passed on to :class:`cait.versatile.Line`. :type kwargs: Any """ if self._n_channels == 0: raise Exception("Nothing to plot.") if "x" not in kwargs.keys(): kwargs["x"] = self.t if "xlabel" not in kwargs.keys(): kwargs["xlabel"] = "Time (ms)" if self._n_channels > 1: y = dict() for i, channel in enumerate(self._array): y[f"channel {i}"] = channel # if hasattr(self, "_fit_pars"): # y[f'channel {i} fit'] = self.fit_model[i](kwargs['x']) else: # if hasattr(self, "_fit_pars"): # y = dict(event=self._array, fit=self.fit_model(kwargs['x'])) # else: # y = self._array y = self._array return Line(y, **kwargs)
# Unfinished # def fit(self, t: np.ndarray, n_comp: int = 2, **kwargs): # if "p0" not in kwargs.keys(): # kwargs["p0"] = [0, # *[1/10**k for k in range(n_comp)], # *[100/10**(k+1) for k in range(n_comp+1)] # ] # if self._n_channels > 1: # self._fit_pars = [] # for channel in self._array: # pars, _ = sp.optimize.curve_fit(pulse_template, t, channel, **kwargs) # self._fit_pars.append(pars) # else: # self._fit_pars, _ = sp.optimize.curve_fit(pulse_template, t, self._array, **kwargs) # @property # def fit_pars(self): # if not hasattr(self, "_fit_pars"): # raise KeyError("No fit parameters are available. Make sure to fit the SEV first.") # return self._fit_pars # @property # def fit_model(self): # if self._n_channels > 1: # models = [] # for fp in self.fit_pars: # models.append(lambda t: pulse_template(t, *fp)) # return models # else: # return lambda t: pulse_template(t, *self.fit_pars) @property def _array(self): return self._sev @_array.setter def _array(self, array): if self._sev.size == 0: self._sev = array self._n_ch = 1 if array.ndim < 2 else array.shape[0] else: raise Exception("SEV._array can only be set as long as it is empty.") @property def _n_channels(self): return self._n_ch @property def dt_us(self): return self._dt_us @property def t(self): if self.dt_us is not None: return ( self.dt_us / 1000 * (np.arange(self.shape[-1]) - int(self.shape[-1] / 4)) )