Source code for cait.versatile.datasources.stream.impl_vdaq3

import json
from typing import List, Union

import numpy as np

import cait as ai

from ....readers import BinaryFile
from .impl_vdaq2 import VDAQ2_TP_TS, VDAQ2_TPAS, VDAQ2_CALP_TS, VDAQ2_CALPAS
from .streambase import StreamBaseClass

UUID_SIZE = 37 # bytes (including the \x00 separator)
UUID_SINGLE_CH = "b0de7478-e909-4c2b-9415-e14792e0510d"
UUID_SINGLE_CH_WITH_TRAILER = "010cfb4a-71ae-4185-ab44-168c108623e0"

UINT16_SIZE = np.dtype(np.uint16).itemsize
UINT64_SIZE = np.dtype(np.uint64).itemsize


[docs] class Stream_VDAQ3(StreamBaseClass): """ Implementation of StreamBaseClass for hardware 'vdaq3'. VDAQ3 data is stored in .dat files. Its header contains instructions on how to read the data and all recorded channels are stored in a separate file. There are two variations of the files: One includes information about the testpulses/controlpulses. In this case, the TPAs and testpulse timestamps are readily available. Another format does **not** include testpulse information. In this case, you have to enter the TP channel also in the list of ``files``. It is then treated like the VDAQ2 variant, where the TP channels are triggered to obtain the TPAs and TP timestamps. To modify the trigger parameters, change the inputs ``dac_trig_thr`` and ``dac_trig_win_len_ms``. :param files: The binary files with the stream data (including the file extension). :type files: List[str] :param dac_trig_thr: **Only for VDAQ2-style TP-channel trigger:** Trigger threshold (in sigmas) to find testpulses from the DAC channels. Defaults to 5 sigmas :type dac_trig_thr: float, optional :param dac_trig_win_len_ms: **Only for VDAQ2-style TP-channel trigger:** Trigger window length (in ms) to find testpulses from the DAC channels. Defaults to 500 ms :type dac_trig_win_len_ms: int, optional :param cal_trig_thr: **Only for VDAQ2-style TP-channel trigger:** Trigger threshold (in sigmas) to find calpulses from the calibration ADC channels. Defaults to 5 sigmas :type cal_trig_thr: float, optional :param cal_trig_win_len_ms: **Only for VDAQ2-style TP-channel trigger:** Trigger window length (in ms) to find calpulses from the calibration ADC channels. Defaults to 50 ms :type cal_trig_win_len_ms: int, optional """ def __init__(self, files: Union[str, List[str]], dac_trig_thr: float = 5., # sigmas dac_trig_win_len_ms: int = 500, # ms cal_trig_thr: float = None, # V cal_trig_win_len_ms: int = 50 # ms ): super().__init__(files=files, dac_trig_thr=dac_trig_thr, dac_trig_win_len_ms=dac_trig_win_len_ms, cal_trig_thr=cal_trig_thr, cal_trig_win_len_ms=cal_trig_win_len_ms) if type(files) is str: files = [files] starts, dTs, lengths, uuids, precs = [], [], [], [], [] self._data, self._header, self._trailer = dict(), dict(), dict() self._tp_timestamps, self._tpas = dict(), dict() self._calp_timestamps, self._calpas = dict(), dict() # Data is 24 bits, i.e. 3 bytes long. Read 3 bytes at a time # Possibly also 32 bit # This is decided by 'sample_size_bytes' in the header which is either 3 or 4 data_dtype = { 3: np.dtype([('byte1', '<i1'), ('byte2', '<i1'), ('byte3', '<i1')]), 4: np.dtype([('byte1', '<i1'), ('byte2', '<i1'), ('byte3', '<i1'), ('byte4', '<i1')]) } # The data format is documented here: https://cryocluster-vccs.docs.cern.ch/data-formats/single-channel-file/ for f in files: # Starting from the second byte, we extract the UUID which defines the file format uuid = BinaryFile(path=f, dtype=np.dtype(f"V{UUID_SIZE}"), count=1, offset=2)[0].tobytes().decode("ascii").split("\x00")[0] if uuid not in [UUID_SINGLE_CH, UUID_SINGLE_CH_WITH_TRAILER]: raise NotImplementedError(f"uuid '{uuid}' is an unsupported file format for this stream object. Valid uuids: {[UUID_SINGLE_CH, UUID_SINGLE_CH_WITH_TRAILER]}") uuids.append(uuid) size_file = len(BinaryFile(path=f, dtype=np.dtype(np.int8))) if uuid == UUID_SINGLE_CH: # The first two bytes give the offset to the sample data offset_samples = int(BinaryFile(path=f, dtype=np.dtype(np.int16), count=1)[0]) offset_json = UINT16_SIZE + UUID_SIZE size_json = offset_samples - offset_json # This format doesn't save a trailer trailer = dict() size_trailer = 0 elif uuid == UUID_SINGLE_CH_WITH_TRAILER: # The first 8 bytes after the uuid give the offset to the sample data # The second 8 bytes after the uuid give the offset to the trailer offset_samples, offset_trailer = BinaryFile(path=f, dtype=np.dtype(np.uint64), count=2, offset=UINT16_SIZE+UUID_SIZE) offset_json = UINT16_SIZE + UUID_SIZE + 2*UINT64_SIZE size_json = offset_samples - offset_json size_trailer = size_file - offset_trailer trailer = json.loads( BinaryFile( path=f, dtype=np.dtype(f"V{size_trailer}"), offset=offset_trailer, count=1 )[0].tobytes().decode("ascii").split("\x00")[0] ) # This format stores testpulse/controlpulse information in the trailer tp_ts = list() tpas = list() for d in trailer["trigger_parameter"]: if "injected_pulse" in d.keys(): ip = d["injected_pulse"] if all([x in ip.keys() for x in ["energy", "ts_start_ns", "type"]]): if ip["type"] in ["test_pulse", "control_pulse"]: # Don't forget to convert ns to us tp_ts.append(ip["ts_start_ns"]//1000) tpas.append(ip["energy"]) # This format stores calpulse information in the trailer # NOTE: THESE LINES BELOW MUST BE REVISED ONCE THE LED PULSE INFORMATION WILL BE INCLUDED IN VDAQ3 TRAILER. # FOR NOW, THEY ARE JUST A COPY-PASTE OF WHAT IS DONE WITH TESTPULSES calp_ts = list() calpas = list() for d in trailer["trigger_parameter"]: if "injected_pulse" in d.keys(): ip = d["injected_pulse"] if all([x in ip.keys() for x in ["energy", "ts_start_ns", "type"]]): if ip["type"] in ["cal_pulse"]: # Don't forget to convert ns to us calp_ts.append(ip["ts_start_ns"]//1000) calpas.append(ip["energy"]) size_data = size_file - size_trailer - offset_samples header = json.loads( BinaryFile( path=f, dtype=np.dtype(f"V{size_json}"), offset=offset_json, count=1 )[0].tobytes().decode("ascii").split("\x00")[0] ) dTs.append(header["timestep_ns"]) # There is also a 'ts_utc_ns' which gives the UTC time but it is not # synchronized with e.g. muon veto starts.append(header["ts_ns"]) channel_name = header["channel_id"] prec = header["sample_size_bytes"] if prec not in [3, 4]: raise NotImplementedError(f"Only 3 and 4 byte data precision are supported. Got {prec}.") precs.append(prec) self._data[f"Ch{channel_name}"] = BinaryFile(path=f, dtype=data_dtype[prec], count=size_data//prec, offset=offset_samples) if uuid == UUID_SINGLE_CH_WITH_TRAILER and len(tp_ts)>0: self._tp_timestamps[f"TP_Ch{channel_name}"] = tp_ts self._tpas[f"TP_Ch{channel_name}"] = tpas # NOTE: THESE LINES BELOW MUST BE REVISED ONCE THE LED PULSE INFORMATION WILL BE INCLUDED IN VDAQ3 TRAILER. # FOR NOW, THEY ARE JUST A COPY-PASTE OF WHAT IS DONE WITH TESTPULSES if uuid == UUID_SINGLE_CH_WITH_TRAILER and len(calp_ts)>0: self._calp_timestamps[f"CALP_Ch{channel_name}"] = calp_ts self._calpas[f"CALP_Ch{channel_name}"] = calpas # HERE WE COULD PROBABLY USE THE INFO IN THE HEADER AT SOME POINT lengths.append(len(self._data[f"Ch{channel_name}"])) # Save header and trailer self._header[f"Ch{channel_name}"] = header self._trailer[f"Ch{channel_name}"] = trailer if len(np.unique(starts)) > 1: raise ValueError(f'Files have to start at the same time to be treated together. Got {starts}.') if len(np.unique(dTs)) > 1: raise ValueError(f'Files have to have the same time-delta to be treated together. Got {dTs}.') if len(np.unique(lengths)) > 1: raise ValueError(f'Files have to have the same length to be treated together. Got {lengths}.') if len(np.unique(uuids)) > 1: raise ValueError(f'Files have to have the same uuid string to be treated together. Got {uuids}.') if len(np.unique(precs)) > 1: raise ValueError(f'Files have to be recorded with identical byte precision to be treated together. Got {precs}.') # Number of data points in stream self._len = lengths[0] # Start timestamp of the file in us (header value is in ns) self._start = int(starts[0]//1000) # Temporal step size in us (= inverse sampling frequency) self._dt = dTs[0]//1000 # The byte precision self._prec = precs[0] # The file type self._uuid = uuids[0] if self._uuid == UUID_SINGLE_CH: # These attributes are needed for triggering the TP channels self._dac_trig_thr = dac_trig_thr self._dac_trig_win_len_ms = dac_trig_win_len_ms self._dac_trig_win_len = int(1000*dac_trig_win_len_ms/self._dt) # These attributes are needed for triggering the calibration channels # NOTE: The lines below are not part of the 'if' clause above, as calibration pulses information is not yet included into the VDAQ3 file trailer. # Once it will be implemented we will have to modify them self._cal_trig_thr = cal_trig_thr self._cal_trig_win_len_ms = cal_trig_win_len_ms self._cal_trig_win_len = int(1000*cal_trig_win_len_ms/self._dt) def __len__(self): return self._len def __enter__(self): for bin_file in self._data.values(): bin_file.__enter__() return self def __exit__(self, typ, val, tb): for bin_file in self._data.values(): bin_file.__exit__(typ, val, tb) def get_trace(self, key: str, where: slice, voltage: bool = True): data = self._data[key][where] out_shape = data.shape if self._prec == 3: # If written as 24bit values, here, we convert them to 32 bits such that numpy can handle them adc_32bit = np.reshape( np.ravel( np.vstack( [ np.ravel(np.zeros_like(data["byte1"]), "C"), np.ravel(data["byte1"], "C"), np.ravel(data["byte2"], "C"), np.ravel(data["byte3"], "C"), ] ), "F", ).view("<i4"), out_shape, ) return ai.data.convert_to_V(adc_32bit, bits=32, min=-20, max=20) if voltage else adc_32bit elif self._prec == 4: adc_32bit = np.reshape( np.ravel( np.vstack( [ np.ravel(data["byte4"], "C"), np.ravel(data["byte1"], "C"), np.ravel(data["byte2"], "C"), np.ravel(data["byte3"], "C"), ] ), "F" ).view("<i4"), out_shape, ) return ai.data.convert_to_V(adc_32bit, bits=32, min=-20, max=20) if voltage else adc_32bit @property def start_us(self): return self._start @property def dt_us(self): return self._dt @property def keys(self): return list(self._data.keys()) @property def tp_keys(self): if self._uuid == UUID_SINGLE_CH: # Allow all 'regular' channels to be testpulse channels (like in VDAQ2) return self.keys elif self._uuid == UUID_SINGLE_CH_WITH_TRAILER: # Return TP information from trailer return list(self._tp_timestamps.keys()) else: return [] @property def tpas(self): if self._uuid == UUID_SINGLE_CH: return VDAQ2_TPAS(self) elif self._uuid == UUID_SINGLE_CH_WITH_TRAILER: return self._tpas else: return {} @property def tp_timestamps(self): if self._uuid == UUID_SINGLE_CH: return VDAQ2_TP_TS(self) elif self._uuid == UUID_SINGLE_CH_WITH_TRAILER: return self._tp_timestamps else: return {} @property def calp_keys(self): if self._uuid == UUID_SINGLE_CH: # Allow all 'regular' channels to be calibration channels (like in VDAQ2) return self.keys elif self._uuid == UUID_SINGLE_CH_WITH_TRAILER: # NOTE: As long as the calibration pulses information in the trailer is not implemented for VDAQ3, we always retrieve it by triggering the ADC stream. # Once it will be implemented in the trailer, we will have to modify the line below to 'return list(self._calp_timestamps.keys())' return self.keys else: return [] @property def calpas(self): if self._uuid == UUID_SINGLE_CH: return VDAQ2_CALPAS(self) elif self._uuid == UUID_SINGLE_CH_WITH_TRAILER: # NOTE: As long as the calibration pulses information in the trailer is not implemented for VDAQ3, we always retrieve it by triggering the ADC stream. # Once it will be implemented in the trailer, we will have to modify the line below to 'return self._calpas' return VDAQ2_CALPAS(self) else: return {} @property def calp_timestamps(self): if self._uuid == UUID_SINGLE_CH: return VDAQ2_CALP_TS(self) elif self._uuid == UUID_SINGLE_CH_WITH_TRAILER: # NOTE: As long as the calibration pulses information in the trailer is not implemented for VDAQ3, we always retrieve it by triggering the ADC stream. # Once it will be implemented in the trailer, we will have to modify the line below to 'return self._calp_timestamps' return VDAQ2_CALP_TS(self) else: return {}