Source code for cait.versatile.functions.trigger.trigger_of

from functools import partial
from typing import List, Union

import numpy as np
import scipy as sp
from numpy.typing import ArrayLike

from .triggerbase import trigger_base

####################################################
### FUNCTIONS IN THIS FILE HAVE NO TESTCASES YET ###
####################################################


def filter_chunk(data: np.ndarray, of: np.ndarray, record_length: int):
    """
    Filters 'data' with an optimum filter 'of' according to the overlap-add-algorithm.

    :param data: The data to filter.
    :type data: np.ndarray
    :param of: The filter to use.
    :type of: np.ndarray
    :param record_length: The record length as determined by the filter length (this determines, how many samples are discarded in the beginning and end of the data to avoid edge effects).
    :type record_length: int

    :return: Filtered chunk
    :rtype: np.ndarray
    """
    # The scipy function 'oaconvolve' does NOT assume the response function to be in wrap-around order.
    # Therefore, we have to shift it such that the maximum position is again at 1/4 of the record window
    omega = 2 * np.pi * np.fft.rfftfreq(record_length)
    phase = np.exp(1j * record_length/4 * omega)
    return sp.signal.oaconvolve(np.fft.irfft(of*phase), data, mode="valid")[record_length - record_length//4 + 1: -record_length//4]


def filter_chunk_2d(data: np.ndarray, of: np.ndarray, record_length: int):
    """
    Filters multi-channel 'data' with a 2D optimum filter 'of' according to the overlap-add-algorithm and sums them up to obtain a single, filtered trace, which is ready for triggering/maximum search.

    :param data: The data to filter.
    :type data: np.ndarray
    :param of: The filter to use.
    :type of: np.ndarray
    :param record_length: The record length as determined by the filter length (this determines, how many samples are discarded in the beginning and end of the data to avoid edge effects).
    :type record_length: int

    :return: Filtered chunk
    :rtype: np.ndarray
    """
    # The scipy function 'oaconvolve' does NOT assume the response function to be in wrap-around order.
    # Therefore, we have to shift it such that the maximum position is again at 1/4 of the record window
    offset = record_length // 4
    return np.sum(
        np.atleast_2d(
            sp.signal.oaconvolve(
                np.roll(np.fft.irfft(of), offset, axis=-1), data, mode="full"
            )
        ),
        axis=0,
    )[record_length + offset : -2 * record_length + offset + 1]


[docs] def trigger_of( stream: ArrayLike, threshold: float, of: np.ndarray, n_triggers: int = None, chunk_size: int = 100, apply_first: Union[callable, List[callable]] = None, n_processes: int = None, ): """ Trigger a single channel of a stream using the optimum filter triggering algorithm described in https://edoc.ub.uni-muenchen.de/23762/. See :func:`cait.versatile.functions.trigger.triggerbase.trigger_base` for details on the implementation. :param stream: The stream channel to trigger. :type stream: ArrayLike :param threshold: The threshold (in Volts) above which events should be triggered. :type threshold: float :param of: The optimum filter to be used for filtering (it is assumed that the filter's first entry is set to zero to correctly remove the offset). :type of: np.ndarray :param n_triggers: The number of events to trigger (might be more, depending on 'chunk_size'). E.g. useful to look at the first 100 triggered events. Defaults to None, i.e. all events in the stream are triggered :type n_triggers: int :param chunk_size: The number of record windows that are processed (i.e. filter + peak search) at a time. :type chunk_size: int :param apply_first: A function or list of functions to be applied to the stream data BEFORE the filter function is applied. E.g. ``lambda x: -x`` to trigger on the inverted stream. :type apply_first: Union[callable, List[callable]], optional :param n_processes: The number of processes to use to process chunks. If None, all available cores are utilized. Defaults to None :type n_processes: int, optional :return: Tuple of trigger indices and trigger heights. :rtype: Tuple[List[int], List[float]] **Example:** .. code-block:: python import cait.versatile as vai # Construct stream object stream = vai.Stream(hardware="vdaq2", src="path/to/stream_file.bin") # Get an optimum filter from somewhere (here, we get one from mock data but # this will not work for your stream data) of = vai.MockData().of # Perform triggering (use context to keep stream file opened) with stream: trigger_inds, amplitudes = vai.trigger_of(stream["ADC1"], 0.1, of) # Get trigger timestamps from trigger indices timestamps = stream.time[trigger_inds] # Plot trigger amplitude spectrum vai.Histogram(amplitudes) """ # size of record window (as determined by the size of the filter) record_length = 2 * (of.shape[-1] - 1) # before samples exceeding threshold are searched, the chunks are filtered filter_fnc = partial(filter_chunk, of=of, record_length=record_length) return trigger_base( stream=stream, threshold=threshold, filter_fnc=filter_fnc, record_length=record_length, n_triggers=n_triggers, chunk_size=chunk_size, apply_first=apply_first, n_processes=n_processes, )
[docs] def trigger_of2d( stream: ArrayLike, threshold: float, of: np.ndarray, n_triggers: int = None, chunk_size: int = 100, apply_first: Union[callable, List[callable]] = None, n_processes: int = None, ): """ Trigger multiple channels of a stream at the same time using a 2D optimum filter and the optimum filter triggering algorithm described in https://edoc.ub.uni-muenchen.de/23762/. See :func:`cait.versatile.functions.trigger.triggerbase.trigger_base` for details on the implementation. The correlated filtering returns A SINGLE voltage trace (even for multiple input channels), hence, only a combined trigger threshold is required. Apart from the fact that multi-channel 'streams' and 'of2d' are required here, the usage is identical to :func:`cait.versatile.trigger_of`. :param stream: The stream channels to trigger (The 'channel' returns 2d-arrays. Usually, one would get this using ``stream[('Ch0', 'Ch1')]``). :type stream: ArrayLike :param threshold: The threshold (in Volts) above which events should be triggered. :type threshold: float :param of: The correlated (2d) optimum filter to be used for filtering (it is assumed that the filter's first entry is set to zero to correctly remove the offset). :type of: np.ndarray :param n_triggers: The number of events to trigger (might be more, depending on 'chunk_size'). E.g. useful to look at the first 100 triggered events. Defaults to None, i.e. all events in the stream are triggered :type n_triggers: int :param chunk_size: The number of record windows that are processed (i.e. filter + peak search) at a time. :type chunk_size: int :param apply_first: A function or list of functions to be applied to the stream data BEFORE the filter function is applied. E.g. ``lambda x: -x`` to trigger on the inverted stream. :type apply_first: Union[callable, List[callable]], optional :param n_processes: The number of processes to use to process chunks. If None, all available cores are utilized. Defaults to None :type n_processes: int, optional :return: Tuple of trigger indices and trigger heights. :rtype: Tuple[List[int], List[float]] """ # size of record window (as determined by the size of the filter) record_length = 2 * (of.shape[-1] - 1) # before samples exceeding threshold are searched, the chunks are filtered filter_fnc = partial(filter_chunk_2d, of=of, record_length=record_length) return trigger_base( stream=stream, threshold=threshold, filter_fnc=filter_fnc, record_length=record_length, n_triggers=n_triggers, chunk_size=chunk_size, apply_first=apply_first, n_processes=n_processes, )