Source code for cait.versatile.analysisobjects.ecal

import copy
import json

import numpy as np
from ipywidgets import widgets

from ...serialize import SerializingMixin
from ..plot import Viewer
from .testpulseresponse import TestpulseResponse
from .transferfunction import TransferFunction


def _sanitize_inputs(arr1: np.ndarray, arr2: np.ndarray, name1: str, name2: str):
    # Check inputs and standardize arrays
    orig_shape_arr1, orig_shape_arr2 = np.shape(arr1), np.shape(arr2)
    arr1 = np.atleast_1d(arr1).astype(float)
    arr2 = np.atleast_2d(arr2).T if np.ndim(arr2)<2 else np.atleast_2d(arr2)
    arr2 = arr2.astype(float)

    if not np.ndim(arr1)==1: 
        raise ValueError(f"Array '{name1}' has to be 1d. Got shape {orig_shape_arr1}.")
    if np.ndim(arr2)>2: 
        raise ValueError(f"Array '{name2}' has to be at most 2d. Got shape {orig_shape_arr2}.")
    if arr1.shape[0] != arr2.shape[0]:
        raise ValueError(f"The number of elements in '{name1}' must match the first dimension of '{name2}'. Need shapes (N,) and (N,) or (N, M). Got shapes {orig_shape_arr1} and {orig_shape_arr2}.")
    
    return (
        arr1, # shape (N,)
        arr2, # shape (N, M)
    )

[docs] class EnergyCalibration(SerializingMixin): """ Energy calibration for fixed testpulse amplitudes (``tpas``) and testpulse pulse heights (``tp_phs``) which vary with an independent variable (``tp_x``). This independent variable is usually the (microsecond) timestamp of the testpulses but can in principle be any quantity. NOTICE: The array ``tp_phs`` must be cleaned from outliers BEFORE starting the calibration! :param tp_x: The independent variable which the testpulse pulse heights ``tp_phs`` depend on. Usually the (microsecond) timestamps of the testpulses. :type tp_x: np.array, shape (N,) :param tp_phs: The testpulse pulse heights which vary with ``tp_x``. :type tp_phs: np.array, shape (N,) :param tpas: The testpulse amplitudes of each testpulse. This array is used to divide different testpulse amplitudes into separate calibrations. :type tpas: np.array, shape (N,) :param testpulse_response: The description for how to fit the set of ``(tp_x, tp_phs)`` points (for each TPA individually). :type testpulse_response: TestpulseResponse :param transfer_function: The description for how to fit ``(tpas, tp_phs)`` values for a fixed x-value. :type transfer_function: TransferFunction :param max_x_gap: If ``<np.inf``, a separate calibration is started if there are gaps in the independent variable ``tp_x`` which are larger than this value. Units of ``max_x_gap`` are determined by ``param_scale``. Usually, this would be used when ``tp_x`` are (microsecond) timestamps and there is no data for > half an hour. In such a case, set ``max_x_gap=0.5`` and ``param_scale=1e-6/3600``. Defaults to ``np.inf``, i.e. this functionality is disabled. :type max_x_gap: float, optional :param param_scale: Conversion factor between the argument ``max_x_gap`` and the units of the input x-values. E.g. If the x-values are given in microseconds, but you want to specify the ``max_x_gap`` in hours, you can set ``param_scale=1e-6/3600``. Defaults to ``1e-6/3600``. :type param_scale: float, optional **Example:** .. code-block:: python import numpy as np import scipy as sp import cait.versatile as vai # Define a random state (so that this example is reproducible) rng_seed = 137 # Create mock data. In your analysis, you would of course already # have timestamps, testpulseamplitudes and testpulse heights. N = 5000 start_us = 1426321613000000 ts = np.hstack([sp.stats.randint.rvs(low=start_us, high=start_us+3*1e6*3600, size=N, random_state=rng_seed), sp.stats.randint.rvs(low=start_us+4*1e6*3600, high=start_us+7*1e6*3600, size=N, random_state=rng_seed)]) tpas = sp.stats.randint.rvs(low=1, high=6, size=2*N, random_state=rng_seed) tp_phs = sp.stats.norm.rvs(loc=np.sqrt(0.3*tpas), scale=0.05, size=2*N, random_state=rng_seed) # Create the EnergyCalibration object. # Choose a testpulse respose object from ['TPRCubicSpline', 'TPRPoly']. # Choose a transfer function object from ['TFPoly', 'TFPchip']. # Here, we choose piecewise cubic splines with Gaussian smoothing and # piecewise cubic Hermite polynomials. my_ecal = vai.EnergyCalibration( tp_x=ts, tp_phs=tp_phs, tpas=tpas, testpulse_response=vai.TPRCubicSpline(kernel_length=0.1), transfer_function=vai.TFPchip(), max_x_gap=0.5, # start new calibrations for segments which are separated by > 0.5 hours ) # Open plot that lets you interactively check what your calibration is doing. # You can change the parameters of the TestpulseResponse and TransferFunction. # You can click at any time in the left plot to see what the transfer function # looks like at that time. my_ecal.preview() # To convert TPE values to PH values, call the object: timestamps_of_interest = ts[100:200] tpes_of_interest = np.linspace(2, 3, len(timestamps_of_interest)) output_phs = my_ecal(timestamps_of_interest, tpes_of_interest) # To convert PH values to TPE values, call its inverse: timestamps_of_interest = ts[100:200] phs_of_interest = output_phs output_tpes = my_ecal.inverse(timestamps_of_interest, phs_of_interest) # ... in 'output_tpes' you should have recovered the initial 'tpes_of_interest'. # Of course, in a real-world scenario, you would only be interested in either # of the two directions. .. image:: media/ECalPreview.png .. automethod:: __call__ """ def __init__(self, tp_x: np.ndarray, tp_phs: np.ndarray, tpas: np.ndarray, testpulse_response: TestpulseResponse, transfer_function: TransferFunction, max_x_gap: float = np.inf, param_scale: float = 1e-6/3600, ): # For the SerializingMixin to remember the inputs super().__init__(tp_x=tp_x, tp_phs=tp_phs, tpas=tpas, testpulse_response=testpulse_response, transfer_function=transfer_function, max_x_gap=max_x_gap, param_scale=param_scale, ) for obj, target, name in zip( [testpulse_response, transfer_function], [TestpulseResponse, TransferFunction], ["testpulse_response", "transfer_function"] ): if not isinstance(obj, target): raise TypeError(f"Input argument '{name}' must be an instance of '{target.__class__.__name__}', not {type(obj)}") for arr, name in zip([tp_x, tpas, tp_phs], ["tp_x", "tpas", "tp_phs"]): if not np.ndim(arr)==1: raise ValueError(f"Input argument '{name}' has to be 1d. Got shape {np.shape(arr)}.") if len(set([np.shape(x) for x in [tp_x, tpas, tp_phs]])) > 1: raise ValueError(f"Input arguments 'tp_x', 'tpas', and 'tp_phs' have to have the same shape. Got {np.shape(tp_x)}, {np.shape(tpas)}, and {np.shape(tp_phs)}.") if max_x_gap<=0: raise ValueError(f"Input argument 'max_x_gap' has to be positive. Not {max_x_gap}.") # Sanitize data tp_x, tpas, tp_phs = np.array(tp_x), np.array(tpas), np.array(tp_phs) sort_inds = np.argsort(tp_x) tp_x, tpas, tp_phs = tp_x[sort_inds], tpas[sort_inds], tp_phs[sort_inds] # Determine unique TPA list self._unique_tpas = np.sort(np.unique(tpas)) # Separate into continuous segments if max_x_gap < np.inf: # 'scaled_x' is usually 'hours' scaled_x = (tp_x - np.min(tp_x))*param_scale anchor_inds = np.nonzero(np.diff(scaled_x)>max_x_gap)[0] + 1 self._x_blocks = np.split(tp_x, anchor_inds) self._tpa_blocks = np.split(tpas, anchor_inds) self._ph_blocks = np.split(tp_phs, anchor_inds) else: anchor_inds = np.array([0]) self._x_blocks = [tp_x] self._tpa_blocks = [tpas] self._ph_blocks = [tp_phs] # Save 'grid' of domains for later. # (start0, end0, start1, end1, ...) block_domains = [] for current_x in self._x_blocks: block_domains.append((np.min(current_x), np.max(current_x))) self._domain_grid = np.array(block_domains).flatten() # Create testpulse response for each domain with the initial testpulse response object. # (Can be changed later - in preview - through self._update_tpr) self._update_tpr(testpulse_response) # Set transfer function to the initial transfer function object. # (Can be changed later - in preview - through self._update_tf) self._update_tf(transfer_function) # Save Testpulse Response and Transfer Function objects for plotting and __repr__ self._init_tpr_obj = testpulse_response self._init_tf_obj = transfer_function # Save input data for plotting self._tp_x, self._tp_phs, self._scale = tp_x, tp_phs, param_scale def __repr__(self): return f"{self.__class__.__name__}(TPR={self._init_tpr_obj}, TF={self._init_tf_obj})"
[docs] def __call__(self, x: np.ndarray, tpes: np.ndarray): """ Convert testpulse equivalent heights to pulse heights at given x-values (usually timestamps). :param x: Independent variable (usually (microsecond) timestamps) for which to evaluate the calibration. :type x: np.ndarray, shape (N,) :param tpes: Testpulse equivalent heights at x-values for which to calculate the pulse height. If 2D, the rows are evaluated at identical x-values. :type tpes: np.ndarray, shape (N,) or (N, M) :return: The pulse height for pulses at ``x`` with testpulse equivalent heights ``tpes``. :rtype: np.ndarray, shape (N,) or (N, M) """ in_shape = np.shape(tpes) x, tpes = _sanitize_inputs(x, tpes, "x", "tpes") # guaranteed shapes (N,) and (N, M) tphs, mask_valid = self.get_tp_phs(x) # x-values (usually timestamps) which fall outside of domains are marked invalid (NaN) out = np.nan*np.ones_like(tpes) out[mask_valid, ...] = self._tf(self._unique_tpas, tphs[mask_valid, ...], tpes[mask_valid, ...]) return np.reshape(out, in_shape)
[docs] def inverse(self, x: np.ndarray, phs: np.ndarray): """ Convert pulse heights to testpulse equivalent heights at given x-values (usually timestamps). :param x: Independent variable (usually (microsecond) timestamps) for which to evaluate the calibration. :type x: np.ndarray, shape (N,) :param phs: Heights of pulses at x-values for which to calculate the testpulse equivalent height. If 2D, the rows are evaluated at identical times. :type phs: np.ndarray, shape (N,) or (N, M) :return: The testpulse equivalent energies for pulses at ``x`` with pulse heights ``phs``. :rtype: np.ndarray, shape (N,) or (N, M) """ in_shape = np.shape(phs) x, phs = _sanitize_inputs(x, phs, "x", "phs") # guaranteed shapes (N,) and (N, M) tphs, mask_valid = self.get_tp_phs(x) # x-values (usually timestamps) which fall outside of domains are marked in valid (NaN) out = np.nan*np.ones_like(phs) out[mask_valid, ...] = self._tf.inverse(self._unique_tpas, tphs[mask_valid, ...], phs[mask_valid, ...]) return np.reshape(out, in_shape)
[docs] def to_file(self, fname: str): """ Write EnergyCalibration object to ``.json``-file. :param fname: Filename to use (without file-extension) :type fname: str """ with open(f"{fname}.json", "w") as f: json.dump(self.to_dict(), f, indent=4)
[docs] @classmethod def from_file(cls, fname: str): """ Construct EnergyCalibration from ``.json``-file. :param fname: Filename to look for (without file-extension) :type fname: str :return: Instance of EnergyCalibration. :rtype: EnergyCalibration """ with open(f"{fname}.json", "r") as f: d = json.load(f) return cls.from_dict(d)
def _update_tpr(self, tpr_obj: TestpulseResponse): """Update the currently used TestpulseResponse object. Called by ``.preview`` to dynamically modify fit parameters.""" # For each segment, separate into TPA and initialize TestpulseResponse object self._tprs = [] for current_x, current_tpas, current_phs in zip(self._x_blocks, self._tpa_blocks, self._ph_blocks): self._tprs.append([ copy.copy(tpr_obj).prepare( x=current_x[current_tpas==tpa], tp_phs=current_phs[current_tpas==tpa] ) for tpa in self._unique_tpas ]) return self def _update_tf(self, tf_obj: TransferFunction): """Update the currently used TransferFunction object. Called by ``.preview`` to dynamically modify fit parameters.""" self._tf = tf_obj return self def _get_tpr_plot_dict(self, n_fit_grid_points: int, downsample_factor: int): x_start, x_stop = np.min(self._tp_x), np.max(self._tp_x) x_fit = np.linspace(x_start, x_stop, n_fit_grid_points) y_fit, _ = self.get_tp_phs(x_fit) return { "scatter": { "Testpulses": [ np.atleast_1d(self._tp_x[::downsample_factor]), np.atleast_1d(self._tp_phs[::downsample_factor]) ] }, "line": { f"TPA {tpa:.2g}": [ np.atleast_1d(x_fit), np.atleast_1d(fit) ] for tpa, fit in zip(self._unique_tpas, y_fit.T) }, } def _get_tf_plot_dict(self, x: float, n_fit_grid_points: int): if np.ndim(x)>0: raise ValueError(f"Input argument 'x' must be scalar. Got shape {np.shape(x)}.") tpe_start, tpe_stop = 0, np.max(self._unique_tpas) tpe_len = tpe_stop - tpe_start tpe_fit = np.linspace(tpe_start-0.1*tpe_len, tpe_stop+0.2*tpe_len, n_fit_grid_points) y_fit = np.reshape(self(np.atleast_1d(x), np.atleast_2d(tpe_fit)), np.shape(tpe_fit)) tp_phs = np.reshape(self.get_tp_phs(x)[0], np.shape(self._unique_tpas)) return { "line": { "Fit": [ np.atleast_1d(tpe_fit), np.atleast_1d(y_fit) ] }, "scatter": { f"TPA {tpa:.2g}": [ np.atleast_1d(tpa), np.atleast_1d(tp_ph) ] for tpa, tp_ph in zip(self._unique_tpas, tp_phs) } }
[docs] def get_tp_phs(self, x: np.ndarray): """ Return the testpulse heights evaluated at x (usually (microsecond) timestamps) for each unique TPA (columns). :param x: The independent variable (usually microsecond timestamp) for which to return the testpulse pulse heights. :type x: np.ndarray, shape (N,) :return: Pulse heights at ``x`` for each unique testpulse amplitude (TPA). Invalid values (which fall outside the calibration range) are marked as ``np.nan``. Furthermore, a flag of valid ``x`` values is returned. :rtype: Tuple[np.ndarray, np.ndarray], shape (N, n_unique_tpas) and (N,) """ # Sanitize input x = np.atleast_1d(x) if x.ndim!=1: raise ValueError(f"Array 'x' has to be 1d. Got shape {x.shape}.") out = np.nan*np.ones((x.shape[0], len(self._unique_tpas))) # Bin pulse_ts into the previously obtained domains to know # which testpulse response object we have to call domain_inds = np.digitize(x, self._domain_grid) mask_valid = np.mod(domain_inds, 2) != 0 falls_into_domain = np.array((domain_inds[mask_valid]-1)/2, dtype=int) # Select the correct domains for all x-values (usually timestamps) # and evaluate the testpulse response for all testpulses. # Create an array with rows for each TPA and columns for each x-value. tphs = np.zeros((len(self._unique_tpas), len(falls_into_domain))) for domain_ind in np.unique(falls_into_domain): current_mask = falls_into_domain==domain_ind tphs[:, current_mask] = np.array([ tpr(x[mask_valid][current_mask]) for tpr in self._tprs[domain_ind] ]) out[mask_valid, :] = tphs.T return out, mask_valid
[docs] def plot_testpulse_response(self, n_fit_grid_points: int = 1000, downsample_factor: int = 1, **viewer_kwargs, ): """ Plot the testpulse response at a given x-value (usually microsecond timestamp). :param n_fit_grid_points: Number of points used for plotting the fits. Defaults to 1000. :type n_fit_grid_points: int, optional :param downsample_factor: Downsample the ``(tp_x, tp_phs)`` points by this factor. Defaults to 1, i.e. no downsampling. :type downsample_factor: int, optional :param viewer_kwargs: Additional keyword arguments for :class:`cait.versatile.plot.viewer.Viewer`. :type viewer_kwargs: Any """ return Viewer( data=self._get_tpr_plot_dict(n_fit_grid_points, downsample_factor), xlabel="x", ylabel="Pulse Height", **viewer_kwargs )
[docs] def plot_transfer_function(self, x: float, n_fit_grid_points: int = 1000, **viewer_kwargs, ): """ Plot the testpulse response at a given x-value (usually microsecond timestamp). :param x: Number of points used for plotting the fits. Defaults to 1000. :type x: float :param n_fit_grid_points: Number of points used for plotting the fits. Defaults to 1000. :type n_fit_grid_points: int, optional :param viewer_kwargs: Additional keyword arguments for :class:`cait.versatile.plot.viewer.Viewer`. :type viewer_kwargs: Any """ return Viewer( data=self._get_tf_plot_dict(x, n_fit_grid_points), xlabel="Testpulse-equivalent amplitude", ylabel="Pulse Height", **viewer_kwargs )
[docs] def preview(self, n_fit_grid_points: int = 1000, downsample_factor: int = 1, **viewer_kwargs, ): """ Plot the testpulse response over time and the transfer function for selected times. :param n_fit_grid_points: Number of points used for plotting the fits. Defaults to 1000. :type n_fit_grid_points: int, optional :param downsample_factor: Downsample the ``(tp_x, tp_phs)`` points by this factor. Defaults to 1, i.e. no downsampling. :type downsample_factor: int, optional :param viewer_kwargs: Additional keyword arguments for :class:`cait.versatile.plot.viewer.Viewer`. :type viewer_kwargs: Any """ if "backend" in viewer_kwargs.keys() and viewer_kwargs["backend"] != "plotly": raise NotImplementedError(f"Backend '{viewer_kwargs['backend']}' is currently not supported. Please use 'plotly'. To plot using other backends, use 'plot_testpulse_response' and 'plot_transfer_function'.") if "width" not in viewer_kwargs.keys(): viewer_kwargs["width"] = 550 tpr_viewer = Viewer( xlabel="x", ylabel="Pulse Height", **viewer_kwargs ) tf_viewer = Viewer( xlabel="Testpulse-equivalent Height", **viewer_kwargs ) tpr_viewer.show_legend(False) # Already draw pulse height scatter (will not be redrawn later) and fits (will be redrawn) tpr_viewer.plot(self._get_tpr_plot_dict(n_fit_grid_points, downsample_factor)) self._current_plot_x = np.min(self._tp_x) #scale_time = lambda X: (X-first_data_x)*self._scale #f"Time (h) since {np.array([first_data_x], dtype='datetime64[us]')[0].astype(datetime.datetime).strftime('%d-%b-%Y, %H:%M:%S')}, ({first_data_x})", tpr_inputs, tf_inputs = dict(), dict() for inputs, obj in zip([tpr_inputs, tf_inputs], [self._init_tpr_obj, self._init_tf_obj]): for k, v in obj._PREVIEW_INPUTS.items(): desc = k val = obj._init_kwargs[k] if k in obj._init_kwargs.keys() else v["default"] if v["dtype"] is bool: inputs[k] = widgets.Checkbox( description=desc, value=val, ) elif v["dtype"] is float: inputs[k] = widgets.FloatSlider( description=desc, value=val, min=v["domain"][0], max=v["domain"][1], step=(v["domain"][1]-v["domain"][0])/1000 ) elif v["dtype"] is int: inputs[k] = widgets.Dropdown( description=desc, value=val, options=tuple(range(v["domain"][0], v["domain"][1]+1)), ) def _on_tpr_value_change(*args, **kwargs): new_kwargs = self._init_tpr_obj._init_kwargs.copy() for k, v in tpr_inputs.items(): new_kwargs[k] = v.value new_tpr_obj = self._init_tpr_obj.__class__(**new_kwargs) self._update_tpr(new_tpr_obj) new_plot_dict = self._get_tpr_plot_dict(n_fit_grid_points, downsample_factor) # Update only lines (the fits) and don't re-draw scatter (the data) tpr_viewer.plot(dict(line=new_plot_dict["line"])) # Also update TF plot _on_tf_value_change() def _on_tf_value_change(*args, **kwargs): new_kwargs = self._init_tf_obj._init_kwargs.copy() for k, v in tf_inputs.items(): new_kwargs[k] = v.value new_tf_obj = self._init_tf_obj.__class__(**new_kwargs) self._update_tf(new_tf_obj) new_plot_dict = self._get_tf_plot_dict(self._current_plot_x, n_fit_grid_points) # Update both lines (the fits) and re-draw scatter tf_viewer.plot(new_plot_dict) def _on_tpr_click(trace, points, state): ind = points.point_inds[0] self._current_plot_x = self._tp_x[ind] _on_tf_value_change() for i in tpr_inputs.values(): i.observe(_on_tpr_value_change, "value") for i in tf_inputs.values(): i.observe(_on_tf_value_change, "value") # Run once to calculate TF fits and plot _on_tf_value_change() # Only with plotly backend: enable clicking the TPR plot if tpr_viewer.backend == "plotly": tpr_viewer.get_artist("Testpulses").on_click(_on_tpr_click) return widgets.HBox([ widgets.VBox([ widgets.HBox(list(tpr_inputs.values())), widgets.HBox([tpr_viewer.get_figure()]) ]), widgets.VBox([ widgets.HBox(list(tf_inputs.values())), widgets.HBox([tf_viewer.get_figure()]) ]), ])