Event Iterators

In general, iterators are never created directly by the user but provided by the respective data source through the get_event_iterator method. E.g. Stream provides a StreamIterator and RDTChannel provides an RDTIterator. Irrespective of what the underlying data source is, all iterators inherit from IteratorBaseClass and share its methods and properties which are documented below.

In particular, you can view the events in an iterator it by calling vai.Preview(it), add processing (functions that are applied to each event in an iterator before it is returned) through it.with_processing(vai.RemoveBaseline()) or it.with_processing([vai.RemoveBaseline(), lambda x: x**2]), etc., and request the iterator’s properties, e.g. it.t (the time array of the events), it.timestamps (timestamps of events in iterator), it.dt_us (microsecond timebase used to record the events in the iterator), …

As mentioned, iterators are usually obtained using the get_event_iterator method (e.g. dh.get_event_iterator('events')). One iterator that is useful to explicitly create yourself, however, is the cait.versatile.iterators.PulseSimIterator which can be used to simulate pulses given some SEV (or template parameters), pulse heights and shifts (see below).

Furthermore, the cait.versatile.iterators.StreamIterator has additional functionality (it.with_extended_window), which is why it is explicitly documented below.

class cait.versatile.iterators.iteratorbase.IteratorBaseClass(inds: List[int], batch_size: int = None, **kwargs)[source]

Baseclass for all iterators. Defines behavior shared among all event iterators.

__len__()[source]

Return the number of events in the iterator.

__add__(other)[source]

Add two iterators sequentially. E.g. given two iterators it1 and it2, the sum it1 + it2 returns an iterator that first iterates through it1, and then through it2, once it1 is consumed.

Example:

# Given two iterators 'it1' and 'it2', they can be sequentially combined into
# a single iterator by
combined_it = it1 + it2
__getitem__(val)[source]

Slice iterator as if it was laid out as a numpy.ndarray and return a new iterator. The first argument slices the channel, the second slices the list of events in the iterator.

Example:

# Starting from an iterator 'it' of multiple channels, you can
# - access only the first channel
it[0]

# - access the last 1000 events of the first channel
it[0, -1000:]

# - access every second event from all channels
it[:, ::2]

# ... etc.
add_processing(f: Callable | List[Callable])[source]

Add functions to be applied to each event before returning it. Batches are supported, i.e. if the iterator returns events in batches, the specified functions are applied to all events in a batch separately. However, the user is responsible for handling multiple channels correctly: Events are passed to the functions directly, even if it includes multiple channels.

Parameters:

f (Union[Callable, List[Callable]]) – Function(s) to be applied. Function signature: f(event: np.ndarray) -> np.ndarray

Example:

import cait.versatile as vai

def f1(event): return event + 1
def f2(event): return event*2

it = vai.MockData().get_event_iterator()
it.add_processing([f1, f2])
with_processing(f: Callable | List[Callable])[source]

Same as add_processing but it returns a new iterator instead of modifying the original one.

Parameters:

f (Union[Callable, List[Callable]]) – Function(s) to be applied. Function signature: f(event: np.ndarray) -> np.ndarray

Example:

import cait.versatile as vai

def f1(event): return event + 1
def f2(event): return event*2

it = vai.MockData().get_event_iterator()
new_it = it.with_processing([f1, f2])
pop_processing()[source]

Removes all processing functions from the iterator and returns them as a list.

with_batchsize(batch_size: int)[source]

Returns an identical iterator but with a different batch size.

Parameters:

batch_size (int) – The new batch size.

flatten()[source]

Returns an identical iterator but without batches. Has no effect if iterator didn’t use batches before.

grab(which: int | list)[source]

Grab specified event(s) and return it/them as numpy array.

Parameters:

which (Union[int, list]) – Events of interest.

Example:

import cait.versatile as vai

it = vai.MockData().get_event_iterator() # Get events from mock data
selected_event = it.grab(-1)             # Get the last event in the iterator
selected_events = it.grab([1,7,9])       # Get events with indices 1, 7, 9
property t

Return the time axis (record window) of the events in the iterator. It is a millisecond array with 0 being at 1/4th of the window.

property uses_batches

Returns True if the iterator returns batches.

property n_batches

Returns the number of batches in the iterator.

property has_processing

Returns True if one or more processing functions have been added to the iterator.

property hours

Returns the times (in hours) of the events in this iterators since the start of the underlying datasource.

abstract property record_length

Returns the record length (in samples) of the events in the iterator.

abstract property dt_us

Returns the time base (in microseconds) of the events in the iterator.

property sample_frequency

Returns the sampling frequency (in Hz) of the events in the iterator.

Returns:

Sampling frequency (Hz)

Return type:

int

abstract property ds_start_us

The microsecond timestamp of the start of the recording for the datasource underlying this iterator object.

abstract property timestamps

Returns microsecond timestamps corresponding to the trigger times of the events in the iterator.

abstract property n_channels

Returns the number of channels in the iterator.

class cait.versatile.iterators.PulseSimIterator(iterator: IteratorBaseClass, pulse_heights: List[List[float]], shift_samples: List[List[int]] = None, shift_subsamples: List[List[float]] = None, sev: ndarray = None, sev_fitpars: List[List[float]] = None, channels: int | List[int] = None, inds: List[int] = None, batch_size: int = None)[source]

Iterator object that returns voltage traces superimposed with a SEV. The SEV can EITHER be specified by a template array OR by fit parameters [t0, An, At, tau_n, tau_in, tau_t] where the time constants are given in ms. The n-component pulse shape model with parameters [t0, A1, A2, …, Ak, tau_in, tau_2, …, tau_k, tau_n] is also supported.

Parameters:
  • iterator (IteratorBaseClass) – An iterator (of baselines, stream_chunks, etc.) that you want to superimpose the SEV on.

  • pulse_heights (List[List[float]]) – The pulse heights to scale the SEV. One for each event in ‘iterator’ and each channel, i.e. with shape (iterator.n_channels, len(iterator)).

  • shift_samples (List[List[int]]) – If specified for all elements in pulse_heights (i.e. also for all channels). The respective x-shift (in samples) is applied to the superimposed pulse. If you use a sev (see below), the edges of the shifted array are padded with the average of the first/last 10 samples. If you use sev_fitpars (see below), no padding is required because the fit can just be extrapolated.

  • shift_subsamples (List[List[float]]) – If specified for all elements in pulse_heights (i.e. also for all channels). The respective x-shift (in fractional samples) is applied to the superimposed pulse. If you use a sev (see below), the array is linearly interpolated between samples. If you use sev_fitpars (see below), the model is evaluated at the intermediate points. Note that all values have to be in the interval [0, 1), corresponding to shifts between zero and one sample.

  • sev (np.ndarray) – The SEV to superimpose. Has to match the number of channels of iterator and its record length, i.e. requires shape (iterator.n_channels, iterator.record_length). Cannot be specified together with sev_fitpars.

  • sev_fitpars (List[List[float]]) – The fit parameters for the SEV to superimpose. Has to match the number of channels of iterator. Cannot be specified together with sev.

  • channels (Union[int, List[int]]) – The channels that we are interested in. Has to be a subset of iterator’s channels. If None, all channels are considered. Defaults to None.

  • inds (Union[int, List[int]]) – The indices of ‘iterator’ that we want to iterate over. If None, all indices are considered. Defaults to None

  • batch_size (int) – The number of events to be returned at once (these are all read together). There will be a trade-off: large batch_sizes cause faster read speed but increase the memory usage.

import numpy as np
import scipy as sp

import cait.versatile as vai
from cait.versatile.iterators import PulseSimIterator

# Use mock data. You will have a more meaningful iterator.
md = vai.MockData()
sev = md.sev[0]

# This is just a cheeky way to get an iterator of random noise.
# You will have actual noise.
noise_it = md.get_event_iterator()[0].with_processing(lambda x: sp.stats.norm.rvs(loc=0, scale=0.1, size=md.record_length))

# Define random pulse heights
sim_phs = sp.stats.uniform.rvs(size=len(noise_it))

# Set up the iterator containing simulated pulses on top of
# the noise traces.
# Check out the docstring to learn about advanced ways to
# simulate events, e.g. by adding template shifts.
pulse_sim_it = PulseSimIterator(
    iterator=noise_it,
    pulse_heights=sim_phs,
    sev=sev,
)

# Preview your pulses.
vai.Preview(pulse_sim_it)

# In a next step, you could for example filter the simulated
# events, determine their pulse heights, and estimate the baseline
# resolution (the code below is a minimal example and definitely not
# perfect!). In this case, it makes more sense to simulate a fixed
# pulse height:
pulse_sim_it_fixed_ph = PulseSimIterator(
    iterator=noise_it,
    pulse_heights=0.5*np.ones_like(sim_phs),
    sev=sev,
)

reconstructed_phs = vai.apply(
    np.max,
    pulse_sim_it_fixed_ph.with_processing(vai.OptimumFiltering(md.of[0]))
)

# Plot a histogram of reconstructed pulse heights.
vai.Histogram(reconstructed_phs)
class cait.versatile.iterators.StreamIterator(stream, keys: str | List[str], inds: int | List[int], record_length: int, alignment: float = 0.25, batch_size: int = None)[source]

Iterator object that returns voltage traces for given trigger indices of a stream file.

Parameters:
  • stream (StreamBaseClass) – The stream object to read the voltage traces from.

  • keys (Union[str, List[str]]) – The keys (channel names) of the stream object to be iterated over.

  • inds (Union[int, List[int]]) – The stream indices for which we want to read the voltage traces. This index is aligned according to ‘alignment’ (default: at 1/4th of the record window).

  • record_length (int) – The number of samples to be returned for each index. Usually, those are powers of 2, e.g. 16384

  • alignment (float) – A number in the interval [0,1] which determines the alignment of the record window (of length record_length) relative to the specified index. E.g. if alignment=1/2, the record window is centered around the index. Defaults to 1/4.

  • batch_size (int) – The number of events to be returned at once (these are all read together). There will be a trade-off: large batch_sizes cause faster read speed but increase the memory usage.

Returns:

Iterable object

Return type:

StreamIterator

property alignment

The time axis alignment of the iterator.

For most event iterators, this is 1/4, i.e. the timestamp of an event corresponds to the sample at 1/4th of the record window. However, when constructing a StreamIterator, you may choose the alignment. Therefore, for StreamIterators, this value may be anything in the interval [0, 1].

property t

Return the time axis (record window) of the events in the iterator.

It is a millisecond array with 0 aligned according to the ‘alignment’ argument used when constructing the StreamIterator.

with_alignment(alignment: float)[source]

Return an iterator for identical timestamps but with different alignment.

Parameters:

alignment (float) – A number in the interval [0,1] which determines the alignment of the record window (of length record_length) relative to the specified index. E.g. if alignment=1/2, the record window is centered around the index.

Warning

Requires that all event traces are still within the stream boundaries after changing alignment.

with_extended_window()[source]

Return an iterator for identical timestamps but with the window size increased to include one additional record length before and after the previous window.

with_record_length(record_length: int)[source]

Return an iterator for identical timestamps but with different record length.

Parameters:

record_length (int) – The number of samples to be returned for each event. Usually, those are powers of 2, e.g. 16384

Warning

Requires that all event traces are still within the stream boundaries after changing record length.