Event Iterators
In general, iterators are never created directly by the user but provided by the respective data source through the get_event_iterator method. E.g. Stream provides a StreamIterator and RDTChannel provides an RDTIterator. Irrespective of what the underlying data source is, all iterators inherit from IteratorBaseClass and share its methods and properties which are documented below.
In particular, you can view the events in an iterator it by calling vai.Preview(it), add processing (functions that are applied to each event in an iterator before it is returned) through it.with_processing(vai.RemoveBaseline()) or it.with_processing([vai.RemoveBaseline(), lambda x: x**2]), etc., and request the iterator’s properties, e.g. it.t (the time array of the events), it.timestamps (timestamps of events in iterator), it.dt_us (microsecond timebase used to record the events in the iterator), …
As mentioned, iterators are usually obtained using the get_event_iterator method (e.g. dh.get_event_iterator('events')). One iterator that is useful to explicitly create yourself, however, is the cait.versatile.iterators.PulseSimIterator which can be used to simulate pulses given some SEV (or template parameters), pulse heights and shifts (see below).
Furthermore, the cait.versatile.iterators.StreamIterator has additional functionality (it.with_extended_window), which is why it is explicitly documented below.
- class cait.versatile.iterators.iteratorbase.IteratorBaseClass(inds: List[int], batch_size: int = None, **kwargs)[source]
Baseclass for all iterators. Defines behavior shared among all event iterators.
- __add__(other)[source]
Add two iterators sequentially. E.g. given two iterators
it1andit2, the sumit1 + it2returns an iterator that first iterates throughit1, and then throughit2, onceit1is consumed.Example:
# Given two iterators 'it1' and 'it2', they can be sequentially combined into # a single iterator by combined_it = it1 + it2
- __getitem__(val)[source]
Slice iterator as if it was laid out as a numpy.ndarray and return a new iterator. The first argument slices the channel, the second slices the list of events in the iterator.
Example:
# Starting from an iterator 'it' of multiple channels, you can # - access only the first channel it[0] # - access the last 1000 events of the first channel it[0, -1000:] # - access every second event from all channels it[:, ::2] # ... etc.
- add_processing(f: Callable | List[Callable])[source]
Add functions to be applied to each event before returning it. Batches are supported, i.e. if the iterator returns events in batches, the specified functions are applied to all events in a batch separately. However, the user is responsible for handling multiple channels correctly: Events are passed to the functions directly, even if it includes multiple channels.
- Parameters:
f (Union[Callable, List[Callable]]) – Function(s) to be applied. Function signature: f(event: np.ndarray) -> np.ndarray
Example:
import cait.versatile as vai def f1(event): return event + 1 def f2(event): return event*2 it = vai.MockData().get_event_iterator() it.add_processing([f1, f2])
- with_processing(f: Callable | List[Callable])[source]
Same as
add_processingbut it returns a new iterator instead of modifying the original one.- Parameters:
f (Union[Callable, List[Callable]]) – Function(s) to be applied. Function signature: f(event: np.ndarray) -> np.ndarray
Example:
import cait.versatile as vai def f1(event): return event + 1 def f2(event): return event*2 it = vai.MockData().get_event_iterator() new_it = it.with_processing([f1, f2])
- pop_processing()[source]
Removes all processing functions from the iterator and returns them as a list.
- with_batchsize(batch_size: int)[source]
Returns an identical iterator but with a different batch size.
- Parameters:
batch_size (int) – The new batch size.
- flatten()[source]
Returns an identical iterator but without batches. Has no effect if iterator didn’t use batches before.
- grab(which: int | list)[source]
Grab specified event(s) and return it/them as numpy array.
- Parameters:
which (Union[int, list]) – Events of interest.
Example:
import cait.versatile as vai it = vai.MockData().get_event_iterator() # Get events from mock data selected_event = it.grab(-1) # Get the last event in the iterator selected_events = it.grab([1,7,9]) # Get events with indices 1, 7, 9
- property t
Return the time axis (record window) of the events in the iterator. It is a millisecond array with 0 being at 1/4th of the window.
- property uses_batches
Returns True if the iterator returns batches.
- property n_batches
Returns the number of batches in the iterator.
- property has_processing
Returns True if one or more processing functions have been added to the iterator.
- property hours
Returns the times (in hours) of the events in this iterators since the start of the underlying datasource.
- abstract property record_length
Returns the record length (in samples) of the events in the iterator.
- abstract property dt_us
Returns the time base (in microseconds) of the events in the iterator.
- property sample_frequency
Returns the sampling frequency (in Hz) of the events in the iterator.
- Returns:
Sampling frequency (Hz)
- Return type:
int
- abstract property ds_start_us
The microsecond timestamp of the start of the recording for the datasource underlying this iterator object.
- abstract property timestamps
Returns microsecond timestamps corresponding to the trigger times of the events in the iterator.
- abstract property n_channels
Returns the number of channels in the iterator.
- class cait.versatile.iterators.PulseSimIterator(iterator: IteratorBaseClass, pulse_heights: List[List[float]], shift_samples: List[List[int]] = None, shift_subsamples: List[List[float]] = None, sev: ndarray = None, sev_fitpars: List[List[float]] = None, channels: int | List[int] = None, inds: List[int] = None, batch_size: int = None)[source]
Iterator object that returns voltage traces superimposed with a SEV. The SEV can EITHER be specified by a template array OR by fit parameters [t0, An, At, tau_n, tau_in, tau_t] where the time constants are given in ms. The n-component pulse shape model with parameters [t0, A1, A2, …, Ak, tau_in, tau_2, …, tau_k, tau_n] is also supported.
- Parameters:
iterator (IteratorBaseClass) – An iterator (of baselines, stream_chunks, etc.) that you want to superimpose the SEV on.
pulse_heights (List[List[float]]) – The pulse heights to scale the SEV. One for each event in ‘iterator’ and each channel, i.e. with shape
(iterator.n_channels, len(iterator)).shift_samples (List[List[int]]) – If specified for all elements in
pulse_heights(i.e. also for all channels). The respective x-shift (in samples) is applied to the superimposed pulse. If you use asev(see below), the edges of the shifted array are padded with the average of the first/last 10 samples. If you usesev_fitpars(see below), no padding is required because the fit can just be extrapolated.shift_subsamples (List[List[float]]) – If specified for all elements in
pulse_heights(i.e. also for all channels). The respective x-shift (in fractional samples) is applied to the superimposed pulse. If you use asev(see below), the array is linearly interpolated between samples. If you usesev_fitpars(see below), the model is evaluated at the intermediate points. Note that all values have to be in the interval [0, 1), corresponding to shifts between zero and one sample.sev (np.ndarray) – The SEV to superimpose. Has to match the number of channels of
iteratorand its record length, i.e. requires shape(iterator.n_channels, iterator.record_length). Cannot be specified together withsev_fitpars.sev_fitpars (List[List[float]]) – The fit parameters for the SEV to superimpose. Has to match the number of channels of
iterator. Cannot be specified together withsev.channels (Union[int, List[int]]) – The channels that we are interested in. Has to be a subset of iterator’s channels. If None, all channels are considered. Defaults to None.
inds (Union[int, List[int]]) – The indices of ‘iterator’ that we want to iterate over. If None, all indices are considered. Defaults to None
batch_size (int) – The number of events to be returned at once (these are all read together). There will be a trade-off: large batch_sizes cause faster read speed but increase the memory usage.
import numpy as np import scipy as sp import cait.versatile as vai from cait.versatile.iterators import PulseSimIterator # Use mock data. You will have a more meaningful iterator. md = vai.MockData() sev = md.sev[0] # This is just a cheeky way to get an iterator of random noise. # You will have actual noise. noise_it = md.get_event_iterator()[0].with_processing(lambda x: sp.stats.norm.rvs(loc=0, scale=0.1, size=md.record_length)) # Define random pulse heights sim_phs = sp.stats.uniform.rvs(size=len(noise_it)) # Set up the iterator containing simulated pulses on top of # the noise traces. # Check out the docstring to learn about advanced ways to # simulate events, e.g. by adding template shifts. pulse_sim_it = PulseSimIterator( iterator=noise_it, pulse_heights=sim_phs, sev=sev, ) # Preview your pulses. vai.Preview(pulse_sim_it) # In a next step, you could for example filter the simulated # events, determine their pulse heights, and estimate the baseline # resolution (the code below is a minimal example and definitely not # perfect!). In this case, it makes more sense to simulate a fixed # pulse height: pulse_sim_it_fixed_ph = PulseSimIterator( iterator=noise_it, pulse_heights=0.5*np.ones_like(sim_phs), sev=sev, ) reconstructed_phs = vai.apply( np.max, pulse_sim_it_fixed_ph.with_processing(vai.OptimumFiltering(md.of[0])) ) # Plot a histogram of reconstructed pulse heights. vai.Histogram(reconstructed_phs)
- class cait.versatile.iterators.StreamIterator(stream, keys: str | List[str], inds: int | List[int], record_length: int, alignment: float = 0.25, batch_size: int = None)[source]
Iterator object that returns voltage traces for given trigger indices of a stream file.
- Parameters:
stream (StreamBaseClass) – The stream object to read the voltage traces from.
keys (Union[str, List[str]]) – The keys (channel names) of the stream object to be iterated over.
inds (Union[int, List[int]]) – The stream indices for which we want to read the voltage traces. This index is aligned according to ‘alignment’ (default: at 1/4th of the record window).
record_length (int) – The number of samples to be returned for each index. Usually, those are powers of 2, e.g. 16384
alignment (float) – A number in the interval [0,1] which determines the alignment of the record window (of length record_length) relative to the specified index. E.g. if alignment=1/2, the record window is centered around the index. Defaults to 1/4.
batch_size (int) – The number of events to be returned at once (these are all read together). There will be a trade-off: large batch_sizes cause faster read speed but increase the memory usage.
- Returns:
Iterable object
- Return type:
- property alignment
The time axis alignment of the iterator.
For most event iterators, this is 1/4, i.e. the timestamp of an event corresponds to the sample at 1/4th of the record window. However, when constructing a StreamIterator, you may choose the alignment. Therefore, for StreamIterators, this value may be anything in the interval [0, 1].
- property t
Return the time axis (record window) of the events in the iterator.
It is a millisecond array with 0 aligned according to the ‘alignment’ argument used when constructing the StreamIterator.
- with_alignment(alignment: float)[source]
Return an iterator for identical timestamps but with different alignment.
- Parameters:
alignment (float) – A number in the interval [0,1] which determines the alignment of the record window (of length record_length) relative to the specified index. E.g. if alignment=1/2, the record window is centered around the index.
Warning
Requires that all event traces are still within the stream boundaries after changing alignment.
- with_extended_window()[source]
Return an iterator for identical timestamps but with the window size increased to include one additional record length before and after the previous window.
- with_record_length(record_length: int)[source]
Return an iterator for identical timestamps but with different record length.
- Parameters:
record_length (int) – The number of samples to be returned for each event. Usually, those are powers of 2, e.g. 16384
Warning
Requires that all event traces are still within the stream boundaries after changing record length.