import os
from typing import Union
import numpy as np
import cait as ai
from ....readers import BinaryFile
from ....serialize import SerializingMixin
from ...iterators.impl_rdt import RDTIterator
from ..datasourcebase import DataSourceBaseClass
from .par_file import PARFile
[docs]
class RDTFile(SerializingMixin):
"""
Class for interfacing hardware triggered files (file extension `.rdt`). This class automatically infers the available channels and the available correlated channels. Those can be retrieved by indexing the RDTFile object with channel indices/names or tuples thereof, the result of the indexing is a :class:`RDTChannel` object which provides testpulse amplitudes, timestamps, and event iterators for (the) selected channel(s) (see documentation for :class:`RDTChannel`).
:param path: The full path (including the file extension `.rdt`) to the file of interest.
:type path: str
:param path_par: The full path (including the file extension `.par`) to the file which contains the necessary parameters to read the `.rdt` file. If None is given, it is assumed that a `.par` file with identical name/path as `path` is available. Defaults to None.
:type path_par: str, optional
:return: Object interfacing an `.rdt` file.
:rtype: RDTFile
**Example:**
.. code-block:: python
import cait.versatile as vai
f = vai.RDTFile('path/to/file.rdt')
# Check available channels
print(f.keys)
# Choose channel(s) to iterate over, get testpulse amplitudes, ..., by slicing RDTFile
channels = f[(0,1)] # if interested in only one channel: channel0 = f[0]
it = channels.get_event_iterator()
# You can now further slice this iterator (like any other iterator in cait.versatile):
it_testpulses = it[:, channels.tpas > 0]
it_events = it[:, channels.tpas == 0]
it_noise = it[:, channels.tpas == -1]
# Have a look (after removing the baseline):
vai.Preview(it_testpulses.with_processing(vai.RemoveBaseline()))
"""
def __init__(self, path: str, path_par: str = None):
super().__init__(path=path, path_par=path_par)
if not path.endswith(".rdt"):
raise ValueError("Unrecognized file extension for 'path'. Please input an *.rdt file.")
if path_par is None:
path_par = os.path.splitext(path)[0] + '.par'
if not path_par.endswith(".par"):
raise ValueError("Unrecognized file extension 'path_par'. Please input a *.par file.")
self._par = PARFile(path_par)
self._dtype = np.dtype([
('detector_nmbr', 'i4'),
('coincide_pulses', 'i4'),
('trig_count', 'i4'),
('trig_delay', 'i4'),
('abs_time_s', 'i4'),
('abs_time_mus', 'i4'),
('delay_ch_tp', 'i4', (int(self._par.ints_in_header == 7),)),
('time_low', 'i4'),
('time_high', 'i4'),
('qcd_events', 'i4'),
('hours', 'f4'),
('dead_time', 'f4'),
('test_pulse_amplitude', 'f4'),
('dac_output', 'f4'),
('dvm_channels', 'f4', self._par.dvm_channels),
('samples', 'i2', self._par.record_length),
])
self._raw_file = BinaryFile(path=path, dtype=self._dtype)
#self._raw_file = np.memmap(path, dtype=self._dtype, mode='r')
# Copy the relevant data to memory so that we don't have to go through
# the memory mapped file all the time (this should only need a few MB of RAM)
with self._raw_file as f:
# iterating through the file like this is much faster than calling np.array(f[["detector_nmbr", "trig_count"]])
meta_copy = {"detector_nmbr": np.zeros(len(f), dtype=np.int32),
"trig_count": np.zeros(len(f), dtype=np.int32)}
for i in range(len(f)):
meta_copy["detector_nmbr"][i] = f[i, "detector_nmbr"]
meta_copy["trig_count"][i] = f[i, "trig_count"]
self._available_channels = np.unique(meta_copy["detector_nmbr"]).tolist()
# I'M STILL NOT SURE IF I WANT A DEFAULT CHANNELS BEHAVIOR OR NOT
# If no channels are requested by the user (through __getitem__), the default behavior is such
# that all quantities (iterators, timestamps, tpas) are returned for all channels.
# Notice that this might fail (when not all are correlated).
# In the case that, e.g., only two channels are present and correlated, this provides a shortcut
# when accessing the important stuff
# if len(self._available_channels) > 1:
# self._default_channels = tuple(self._available_channels)
# else:
# self._default_channels = int(self._available_channels[0])
self._inds = dict()
# DETERMINE INDICES FOR SINGLE CHANNELS:
for c in self._available_channels:
self._inds[c] = np.nonzero(meta_copy["detector_nmbr"] == c)[0]
# DETERMINE INDICES FOR CORRELATED CHANNELS (have same trig_count):
vals, idx_start, count = np.unique(meta_copy["trig_count"], return_counts=True, return_index=True)
# We are only interested in trigger counts that appear more than once
flag_corr = count > 1
if np.sum(flag_corr) > 0:
# Split trigger count array according to the first occurences
# (note: the first entry is discarded)
s = np.split(meta_copy["trig_count"], idx_start)[1:]
# Now restrict to those cases which include more than one occurrence
# (note that this has to be done AFTER the splitting above)
vals = vals[flag_corr]
idx_start = idx_start[flag_corr]
count = count[flag_corr]
# Furthermore, we are only interested in identical trigger counts that
# are adjacent in the file (ensure that only correctly written records
# are included)
# (The next line just checks if the pieces in s all include the same value, i.e. are consecutive)
flag_adj = [len(np.unique(x))==1 for x in s if len(x)>1]
# Restrict to adjacent
vals = vals[flag_adj]
idx_start = idx_start[flag_adj]
count = count[flag_adj]
# Create and populate a dictionary whose keys are the unique
# channel combinations and whose values are the corresponding
# data indices
unique_tuples = dict()
for n, (i, l) in enumerate(zip(idx_start, count)):
tup = tuple(meta_copy["detector_nmbr"][i:(i+l)].tolist())
if not (tup in unique_tuples.keys()):
unique_tuples[tup] = []
unique_tuples[tup].append(n)
# Add tuples and corresponding indices to self._inds dictionary
for k, v in unique_tuples.items():
tup_inds = np.array(v)
self._inds[k] = idx_start[tup_inds]
def __repr__(self):
k = self.keys.keys() if isinstance(self.keys, dict) else self.keys
return f'{self.__class__.__name__}(keys={k}, record_length={self.record_length}, dt_us={self.dt_us}, measuring_time_h={self.measuring_time_h:.2f})'
def __enter__(self):
self._file.__enter__()
return self
def __exit__(self, typ, val, tb):
self._file.__exit__(typ, val, tb)
def __getitem__(self, channels: Union[int, str, tuple, list]):
"""
Choose a single channel (integer) or correlated channels (tuple of integers) from this RDTFile for further investigations.
The available keys are found via `RDTFile.keys`. If the channels have names (successfully extracted from the `*.par` file), you can also use those for indexing.
:param channels: Channel index (potentially name) or tuple thereof.
:type channels: Union[int, str, tuple, list]
:return: RDTChannel instance corresponding to the choice of channels
:rtype: RDTChannel
"""
if isinstance(channels, str):
if not self._par.has_channel_names:
raise KeyError("Indexing with strings is not supported because no channel name information is available for this instance.")
# If channel names are available, we use the dictionary to find the corresponding keys
corresponding_key = [k for k,v in self.keys.items() if v == channels]
if not corresponding_key:
raise KeyError(f"Name '{channels}' is not a valid channel name. Did you mean {[v for k,v in self.keys.items()]}")
# If channel names are available and corresponding key could be retrieved, we switch to channel numbers
channels = corresponding_key[0]
elif isinstance(channels, (tuple, list)):
l = list(channels)
for i, c in enumerate(l):
if isinstance(c, str):
if not self._par.has_channel_names:
raise KeyError("Indexing with strings is not supported because no channel name information is available for this instance.")
corresponding_key = [k for k,v in self.keys.items() if v == c]
if not corresponding_key:
raise KeyError(f"Name '{c}' is not a valid channel name. Did you mean {[v for k,v in self.keys.items()]}")
l[i] = corresponding_key[0]
channels = tuple(l)
elif not isinstance(channels, int):
raise TypeError(f"Unsupported input type {type(channels)} for argument 'channels'.")
if not channels in self._inds.keys():
raise KeyError(f"'{channels}' is not available. Did you mean {', '.join([str(x) for x in self._inds.keys()])}?")
return RDTChannel(self, key=channels)
# used for TAB-completion in iPython/notebooks. Example: rdtf[<TAB> -> 0
def _ipython_key_completions_(self):
return self.keys
@property
def _file(self):
"""The `numpy.memmap` object to the underlying `*.rdt` file."""
return self._raw_file
@property
def record_length(self):
"""The record length (number of samples per event) of the events in the corresponding `*.rdt` file."""
return self._par.record_length
@property
def dt_us(self):
"""The time base in microseconds (time between two samples) of the events in the corresponding `*.rdt` file."""
return self._par.time_base_us
@property
def sample_frequency(self):
"""The sample frequency in Hz of the events in the corresponding `*.rdt` file."""
return int(1e6//self.dt_us)
@property
def measuring_time_h(self):
"""The total measuring time in hours of the corresponding `*.rdt` file."""
return self._par.measuring_time_h
@property
def keys(self):
"""The channel keys that can be used to index this RDTFile instance. If available, the channel names (corresponding to the indices) are shown as well."""
if self._par.has_channel_names:
d = dict()
for k in self._inds.keys():
if isinstance(k, tuple):
d[k] = tuple([self._par.channel_names[i] for i in k])
else: # then int
d[k] = self._par.channel_names[k]
return d
else:
return list(self._inds.keys())
[docs]
def get_trace(self, inds: Union[int, list], voltage: bool = True):
"""
Return the ADC traces of events in this RDTFile for given indices. If ``voltage==True``, the ADC value is converted to a voltage (V) fist.
:param inds: The indices for which to return the voltage traces.
:type inds: Union[int, list]
:param voltage: If True, voltage values are returned instead of ADC values.
:type voltage: bool, optional
:return: Array of as many ADC/voltage traces as given `inds`.
:rtype: numpy.array
"""
# index with ["key", inds] for best XRootD-protocol read performance
data = self._file["samples", inds]
return ai.data.convert_to_V(data, bits=16, min=-10, max=10) if voltage else data
# I'M STILL NOT SURE IF I WANT A DEFAULT CHANNELS BEHAVIOR OR NOT
# @property
# def default_channels(self):
# """
# This RDTFile's default channel(s).
# Those are used if `tpas`, `unique_tpas`, `timestamps` and `get_event_iterator` are called on this RDTFile instance.
# """
# return self._default_channels
# @property
# def timestamps(self):
# """The microsecond timestamps of the events in this RDTFile's default channel(s)."""
# # Create an RDTChannel instance for the default channels and return its timestamps
# return self[self._default_channels].timestamps
# @property
# def tpas(self):
# """The testpulse amplitudes of the events in this RDTFile's default channel(s)."""
# # Create an RDTChannel instance for the default channels and return its tpas
# return self[self._default_channels].tpas
# @property
# def unique_tpas(self):
# """The unique testpulse amplitudes of the events in this RDTFile's default channel(s)."""
# # Create an RDTChannel instance for the default channels and return its unique_tpas
# return self[self._default_channels].unique_tpas
# def get_event_iterator(self):
# """
# Get an iterator over the events of this RDTFile's default channel(s).
# Note that this is merely a shortcut to first choosing the channel of interest and then constructing the iterator, and that the recommended way is to NOT use this shortcut unless you are certain that you want to use the default channel.
# :return: Iterable object
# :rtype: RDTIterator
# >>> import cait.versatile as vai
# >>> f = vai.RDTFile('path/to/file.rdt')
# >>> # Check what default channel is:
# >>> print(f.default_channel)
# >>> # If you're happy with the default channel, use f.get_event_iterator()
# >>> # If you'd rather choose another channel, you can do so by slicing,
# >>> # e.g., f[0] for the channel number 0
# >>> it = f.get_event_iterator()
# >>> # You can now further slice this iterator (like any other iterator in cait.versatile):
# >>> it_testpulses = it[:, f.tpas > 0]
# >>> it_events = it[:, f.tpas == 0]
# >>> it_noise = it[:, f.tpas == -1]
# >>> # Remove baselines:
# >>> it_testpulses.add_processing(vai.RemoveBaseline())
# >>> # Have a look:
# >>> vai.Preview(it_testpulses)
# """
# # Create an RDTChannel instance for the default channels and return its event_iterator
# return self[self._default_channels].get_event_iterator()
[docs]
class RDTChannel(DataSourceBaseClass):
"""
Object representing a coherent part of an RDTFile (i.e. either a single channel or correlated channels). Usually this is not created as a standalone but the result of slicing an RDTFile.
:param rdt_file: An RDTFile instance.
:type rdt_file: RDTFile
:param key: The key which selects the single channel or correlated channels. Either of `rdt_file.keys`.
:type key: Union[int, tuple]
:return: Specified channels of an RDTFile
:rtype: RDTChannel
"""
def __init__(self, rdt_file: RDTFile, key: Union[int, tuple, list]):
super().__init__(rdt_file=rdt_file, key=key)
if isinstance(key, list):
# Ensure tuple, because list cannot be dictionary key!
key = tuple(key)
inds = rdt_file._inds[key]
self._n_channels = len(key) if isinstance(key, tuple) else 1
# Build index array for all channels, first dimension is channel index (also existent if only one
# channel is present). Slicing this array then corresponds to choosing channel(s)/event(s) in
# original file
self._inds = np.array([np.array(inds)+k for k in range(self._n_channels)])
self._key = key
self._rdt_file = rdt_file
def __repr__(self):
return f"{self.__class__.__name__}(key={self._key}, n_events={len(self)}, n_channels={self._n_channels}, unique_tpas={self.unique_tpas})"
def __len__(self):
return self._inds.shape[-1]
def __enter__(self):
self._rdt_file.__enter__()
return self
def __exit__(self, typ, val, tb):
self._rdt_file.__exit__(typ, val, tb)
def __getitem__(self, val):
"""Return voltage traces of events. Any numpy indexing can be used as if it was an array of shape (n_channels, n_events)."""
if isinstance(val, tuple):
if len(val) != 2:
raise ValueError("Indexing only supports up to two arguments.")
requested_events = self._inds[val[0]][...,val[1]].T
else:
requested_events = self._inds[..., val].T
return self._rdt_file.get_trace(requested_events, voltage=True)
@property
def key(self):
"""The RDTFile key that this RDTChannel corresponds to."""
return self._key
@property
def n_channels(self):
"""The number of channels this RDTChannel corresponds to."""
return self._n_channels
@property
def start_us(self):
s = self._rdt_file._par.start_s
mus = self._rdt_file._par.start_us
return s*int(1e6) + mus
@property
def dt_us(self):
return self._rdt_file.dt_us
@property
def timestamps(self):
"""The microsecond timestamps of the events in this RDTChannel."""
# index with ["key", inds] for best XRootD-protocol read performance
secs = np.array(self._rdt_file._file["abs_time_s", self._inds[0]], dtype=np.int64)
msecs = np.array(self._rdt_file._file["abs_time_mus", self._inds[0]], dtype=np.int64)
return secs*int(1e6) + msecs
@property
def tpas(self):
"""The testpulse amplitudes of the events in this RDTChannel."""
# index with ["key", inds] for best XRootD-protocol read performance
return self._rdt_file._file["test_pulse_amplitude", self._inds[0]]
@property
def unique_tpas(self):
"""The unique testpulse amplitudes of the events in this RDTChannel."""
# use numpy string representation to convert to python floats
return sorted([float(str(x)) for x in np.unique(self.tpas)])
[docs]
def get_event_iterator(self, batch_size: int = None):
"""
Get an iterator over the events present in this RDTChannel instance.
:param batch_size: The number of events to be returned at once (these are all read together). There will be a trade-off: large batch_sizes cause faster read speed but increase the memory usage.
:type batch_size: int
:return: Iterable object
:rtype: RDTIterator
**Example:**
.. code-block:: python
import cait.versatile as vai
f = vai.RDTFile('path/to/file.rdt')
# Choose channel(s) to iterate over by slicing RDTFile
channels = f[(0,1)]
it = channels.get_event_iterator()
# You can now further slice this iterator (like any other iterator in cait.versatile):
it_testpulses = it[:, channels.tpas > 0]
it_events = it[:, channels.tpas == 0]
it_noise = it[:, channels.tpas == -1]
# Remove baselines:
it_testpulses.add_processing(vai.RemoveBaseline())
# Have a look:
vai.Preview(it_testpulses)
"""
return RDTIterator(self, batch_size=batch_size)