from abc import ABC, abstractmethod
from typing import Union, List, Tuple
import numpy as np
from ..datasourcebase import DataSourceBaseClass
from ...iterators.impl_stream import StreamIterator
[docs]class StreamBaseClass(DataSourceBaseClass):
@abstractmethod
def __len__(self):
...
@abstractmethod
def get_channel(self, key: str):
...
[docs] @abstractmethod
def get_voltage_trace(self, key: str, where: slice):
"""
Get the voltage trace for a given channel 'key' and slice 'where'.
:return: Voltage trace.
:rtype: np.ndarray
"""
...
@property
@abstractmethod
def keys(self):
"""
Available keys (channel names) in the stream.
:return: List of keys.
:rtype: list
"""
...
@property
@abstractmethod
def start_us(self):
"""
The microsecond timestamp at which the stream starts.
:return: Microsecond timestamp
:rtype: int
"""
...
@property
@abstractmethod
def dt_us(self):
"""
The length of a sample in the stream in microseconds.
:return: Microsecond time-delta
:rtype: int
"""
...
@property
@abstractmethod
def tpas(self):
"""
Dictionary of testpulse amplitudes in the stream. For hardware 'cresst' this is read from a '.test_stamps' file. For hardware 'vdaq2' this is obtained from triggering the DAC channels first.
:return: Testpulse amplitudes
:rtype: dict of `np.ndarray`
"""
...
@property
@abstractmethod
def tp_timestamps(self):
"""
Dictionary of testpulse timestamps (microseconds) in the stream. For hardware 'cresst' this is read from a '.test_stamps' file. For hardware 'vdaq2' this is obtained from triggering the DAC channels first.
:return: Testpulse microsecond timestamps.
:rtype: dict of `np.ndarray`
"""
...
def __repr__(self):
return f'{self.__class__.__name__}(start_us={self.start_us}, dt_us={self.dt_us}, length={self.__len__()}, keys={self.keys}, measuring_time_h={self.__len__()*self.dt_us/1e6/3600:.2f})'
def __getitem__(self, val: Union[str, Tuple[str, Union[int, slice, list, np.ndarray]], Tuple[str, Union[int, slice, list, np.ndarray], str]]):
# Only names and tuples are supported for slicing (no int)
if type(val) not in [str, tuple]:
raise TypeError(f'Unsupported type {type(val)} for slicing.')
# If only a name is given, the channel with the respective name is returned
if type(val) is str:
if val in self.keys:
return self.get_channel(val)
else:
raise KeyError(f'{val} not in stream. Available names: {self.keys}')
# For tuples of length 2 or 3 there are specific behaviors
else:
if len(val) in [0, 1]:
raise TypeError('No slicing support for tuples of length 0 or 1.')
# Return the integer values for the stream 'name' if everything else is fine
elif len(val) == 2:
if type(val[0]) is str and type(val[1]) in [int, slice, list, np.ndarray]:
if val[0] in self.keys:
return self.get_channel(val[0])[val[1]]
else:
raise KeyError(f'{val[0]} not in stream. Available names: {self.keys}')
else:
raise TypeError('When slicing with two arguments, the first and second one have to be of type string and int/slice, respectively.')
# Return the voltage values for the stream 'name' if everything else is fine
elif len(val) == 3:
if type(val[0]) is str and type(val[1]) in [int, slice, list, np.ndarray] and type(val[2]) is str:
if val[0] not in self.keys:
raise KeyError(f'{val[0]} not in stream. Available names: {self.keys}')
if val[2] != 'as_voltage':
raise ValueError(f'Unrecognized string "{val[2]}". Did you mean "as_voltage"?')
where = slice(val[1], val[1]+1) if type(val[1]) is int else val[1]
return self.get_voltage_trace(key=val[0], where=where)
else:
raise TypeError('When slicing with three arguments, the first, second and third one have to be of type string, int/slice and string, respectively.')
else:
raise NotImplementedError(f'Tuples of length {len(val)} are not supported for slicing')
@property
def time(self):
"""
Instance of `StreamTime`, which can be sliced to convert stream indices into microsecond timestamps and implements utility functions for the conversion to datetime for example.
:return: StreamTime instance
:rtype: `StreamTime`
"""
if not hasattr(self, "_t"):
self._t = StreamTime(self.start_us, self.dt_us, len(self))
return self._t
[docs] def get_event_iterator(self,
keys: Union[str, List[str]],
record_length: int,
inds: Union[int, List[int]] = None,
timestamps: Union[int, List[int]] = None,
alignment: float = 1/4,
batch_size: int = None):
"""
Returns an iterator object over voltage traces for given trigger indices or timestamps of a stream file.
:param keys: The keys (channel names) of the stream object to be iterated over.
:type keys: Union[str, List[str]]
:param record_length: The number of samples to be returned for each index. Usually, those are powers of 2, e.g. 16384
:type record_length: int
:param inds: The stream indices for which we want to read the voltage traces. This index is aligned at 1/4th of the record window. Either `inds` or `timestamps` has to be set.
:type inds: Union[int, List[int]]
:param timestamps: The stream timestamps for which we want to read the voltage traces. This timestamp is aligned at 1/4th of the record window. Either `inds` or `timestamps` has to be set.
:type timestamps: Union[int, List[int]]
:param alignment: A number in the interval [0,1] which determines the alignment of the record window (of length `record_length`) relative to the specified index. E.g. if `alignment=1/2`, the record window is centered around the index. Defaults to 1/4.
:type alignment: float
:param batch_size: The number of events to be returned at once (these are all read together). There will be a trade-off: large batch_sizes cause faster read speed but increase the memory usage.
:type batch_size: int
:return: Iterable object
:rtype: StreamIterator
"""
if ~np.logical_xor(inds is None, timestamps is None):
raise ValueError("You have to specify EITHER 'inds' OR 'timestamps'.")
if inds is None: inds = self.time.timestamp_to_ind(timestamps)
return StreamIterator(self,
keys=keys,
inds=inds,
record_length=record_length,
alignment=alignment,
batch_size=batch_size)
class StreamTime:
"""
An object that encapsulates time data for a given Stream object. Not intended to be created by user.
:param start_us: The first timestamp of the stream data in microseconds.
:type start_us: int
:param dt_us: The length of one sample in the stream data in microseconds.
:type dt_us: int
:param n: The number of datapoints in the stream data.
:type n: int
"""
def __init__(self, start_us: int, dt_us: int, n: int):
self._start = start_us
self._dt = dt_us
self._n = n
def __repr__(self):
return f"{self.__class__.__name__}(timestamps=[{self._start}-{self._start+self._n*self._dt}], interval={self._dt}us)"
def __getitem__(self, val):
if type(val) is int:
val = slice(val, val+1)
if type(val) is slice:
# Allows slicing [:n]
if val.start is None:
start = 0
else:
start = val.start
# Allows slicing [n:]
if val.stop is None:
stop = self._n
else:
stop = val.stop
# Allows slicing [:-n]
if start < 0: start = self._n + start
if stop <= 0: stop = self._n + stop
if start > self._n or start < 0:
raise IndexError(f'time index {start} out of range [0,{self._n})')
if stop > self._n or stop < 0:
raise IndexError(f'time index {stop} out of range [0,{self._n})')
indices = np.arange(start=start, stop=stop, step=val.step)
elif type(val) in [list, np.ndarray]:
indices = np.array(val)
else:
raise NotImplementedError("val has to be either integer, slice, or list/np.ndarray.")
return self._start + indices*self._dt
def timestamp_to_datetime(self, timestamps: Union[int, List[int]]):
"""Function to convert timestamps to numpy.datetime objects."""
# Call to check for out of range
self.timestamp_to_ind(timestamps)
return np.array(timestamps, dtype="datetime64[us]")
def datetime_to_timestamp(self, datetimes: Union[np.datetime64, List[np.datetime64]]):
"""Function to convert numpy.datetime objects to timestamps."""
out = np.array(datetimes, dtype=int)
# Call to check for out of range
self.timestamp_to_ind(out)
return out
def timestamp_to_ind(self, timestamps: Union[int, List[int]]):
"""Function to convert timestamps to indices."""
out = (np.array(timestamps)-self._start)//self._dt
if np.min(out) < 0 or np.max(out) >= self._n:
raise IndexError("Requested timestamp is out of range.")
return out