from typing import Union, List
from functools import partial
import numpy as np
from numpy.typing import ArrayLike
import pandas as pd
from tqdm.auto import tqdm
import cait.versatile as vai
from .triggerbase import trigger_base
####################################################
### FUNCTIONS IN THIS FILE HAVE NO TESTCASES YET ###
####################################################
def zscore_chunk(data: np.ndarray, record_length: int):
"""
Calculates the moving z-score of 'data'.
Found here: https://stackoverflow.com/a/47165379
:param data: The data for which the z-score has to be calculated.
:type data: np.ndarray
:param record_length: The record length for the triggering (this determines the length of the running average/variance for the z-score).
:type record_length: int
:return: Moving z-score of data
:rtype: np.ndarray
"""
r = pd.Series(data).rolling(window=record_length)
m = r.mean().shift(1)
s = r.std(ddof=0).shift(1)
return np.array((data-m)/s)[record_length:-record_length]
[docs]
def trigger_zscore(stream: ArrayLike,
record_length: int,
threshold: float = 5,
n_triggers: int = None,
chunk_size: int = 100,
apply_first: Union[callable, List[callable]] = None,
n_processes: int = None):
"""
Trigger a single channel of a stream using a moving z-score. See :func:`cait.versatile.functions.trigger.triggerbase.trigger_base` for details on the implementation.
:param stream: The stream channel to trigger.
:type stream: ArrayLike
:param threshold: The threshold (in z-scores) above which events should be triggered.
:type threshold: float
:param n_triggers: The number of events to trigger (might be more, depending on 'chunk_size'). E.g. useful to look at the first 100 triggered events. Defaults to None, i.e. all events in the stream are triggered
:type n_triggers: int
:param chunk_size: The number of record windows that are processed (i.e. z-score calculation + peak search) at a time.
:type chunk_size: int
:param apply_first: A function or list of functions to be applied to the stream data BEFORE the filter function is applied. E.g. ``lambda x: -x`` to trigger on the inverted stream.
:type apply_first: Union[callable, List[callable]], optional
:param n_processes: The number of processes to use to process chunks. If None, all available cores are utilized. Defaults to None
:type n_processes: int, optional
:return: Tuple of trigger indices and trigger heights.
:rtype: Tuple[List[int], List[float]]
**Example:**
.. code-block:: python
import cait.versatile as vai
# Construct stream object
stream = vai.Stream(hardware="vdaq2", src="path/to/stream_file.bin")
# Perform triggering (use context to keep stream file opened)
with stream:
trigger_inds, amplitudes = vai.trigger_zscore(stream["ADC1"], 2**14)
# Get trigger timestamps from trigger indices
timestamps = stream.time[trigger_inds]
# Plot trigger amplitude spectrum
vai.Histogram(amplitudes)
"""
# Before samples exceeding threshold are searched, the z-scores of the chunks are calculated
filter_fnc = partial(zscore_chunk, record_length=record_length)
# Trigger to get the trigger indices
inds, _ = trigger_base(stream=stream,
threshold=threshold,
filter_fnc=filter_fnc,
record_length=record_length,
n_triggers=n_triggers,
chunk_size=chunk_size,
apply_first=apply_first,
n_processes=n_processes)
if not inds: return [], []
# The trigger_vals are given in z-scores.
# Therefore, we also calculate a naïve pulse height after subtracting a constant baseline
# Also apply the function for the pulse height calculation
if apply_first is not None:
if callable(apply_first): apply_first = [apply_first]
else:
apply_first = []
# Record windows centered at 1/4*record_length
before = int(record_length/4)
after = int(3*record_length/4)
# Slice to search peak in the interval (1/5, 2/5) of the record window
a, b = int(record_length/5), int(2*record_length/5)
sl = slice(a, b)
phs = np.zeros(len(inds), dtype=np.float32)
corrected_inds = np.zeros(len(inds), dtype=np.int64)
processing = apply_first + [vai.BoxCarSmoothing(), vai.RemoveBaseline()]
for i, ind in enumerate(pbar := tqdm(inds, desc="Calculating pulse heights", disable=len(stream)-3*record_length<chunk_size*record_length)):
trace = stream[ind-before:ind+after]
for p in processing: trace = p(trace)
# re-calculate maximum and peak position
peak_pos = np.argmax(trace[sl])
corrected_inds[i] = ind - before + a + peak_pos
phs[i] = trace[sl][peak_pos]
# By correcting the trigger indices using the maximum search, it is possible to find triggers that happened
# before 'record_length' samples into the stream. We want to explicitly discard those.
if len(inds)>0 and corrected_inds[0]<record_length:
corrected_inds, phs = corrected_inds[1:], phs[1:]
return corrected_inds, phs