Source code for cait.versatile.datasources.stream.impl_vdaq2

from functools import partial

import numpy as np
import warnings

import cait as ai

from ....readers import BinaryFile
from ...eventfunctions.processing.removebaseline import RemoveBaseline
from ...functions.apply import apply
from ...functions.trigger.trigger_zscore import zscore_chunk
from ...functions.trigger.trigger_of import filter_chunk
from ...functions.trigger.triggerbase import trigger_base
from .streambase import StreamBaseClass


# Helper Function to get testpulse information from VDAQ2 files
def _square(x): 
    return x**2
    
def _max_and_argmax(x):
    ind = np.argmax(x)
    return ind, x[ind]
    
def vdaq2_dac_channel_trigger(stream, key, threshold, record_length):
    with stream:
        inds, _ =  trigger_base(stream=stream[key],
                                threshold=threshold,
                                filter_fnc=partial(zscore_chunk, record_length=record_length),
                                record_length=record_length)
    
    if not inds:
        out_timestamps, out_tpas = [], []

    else:
        out_timestamps = stream.time[inds]
        it = stream.get_event_iterator(keys=key,
                                       record_length=record_length,
                                       timestamps=out_timestamps,
                                       alignment=1/4)

        # we also want argmax to correct timestamps such that they mark the maximum
        argmaxs, out_tpas = apply(_max_and_argmax,
                             it.with_processing([_square, RemoveBaseline()]),
                             n_processes=ai._available_workers)

        out_timestamps = stream.time[inds + argmaxs - record_length//4] 

    return out_timestamps, out_tpas

def vdaq2_cal_channel_trigger(stream, key, threshold, record_length):
    
    # NUCLEUS-specific implementation (low noise stream): if a threshold is not passed explicitely, set it to 5 ADC counts.
    if threshold is None:
        single_adc_count_V = ai.data.convert_to_V(1, bits=stream._adc_bits, min=-20, max=20) 
        threshold = 5 * single_adc_count_V
    
    #Use a mock OF to remove the baseline and leave the stream unaltered.
    mock_of = np.r_[0.0, np.ones(record_length // 2)]

    with stream:
        inds, _ =  trigger_base(stream=stream[key],
                                threshold=threshold,
                                filter_fnc=partial(filter_chunk, of=mock_of, record_length=record_length),
                                record_length=record_length)
    
    if not inds:
        out_timestamps, out_calpas = [], []

    else:
        out_timestamps = stream.time[inds]

        it = stream.get_event_iterator(keys=key,
                                       record_length=record_length,
                                       timestamps=out_timestamps,
                                       alignment=1/4)

        # we also want argmax to correct timestamps such that they mark the maximum
        argmaxs, out_calpas = apply(_max_and_argmax,
                                    it.with_processing(RemoveBaseline()),
                                    n_processes=ai._available_workers)

        warnings.warn("The 'calpulseamplitude' computed here (with a mock OF) is not a reliable estimate of the actual injected energy of the calibration pulse.")

        out_timestamps = stream.time[inds + argmaxs - record_length//4] 

    return out_timestamps, out_calpas

def to_bin_string(s, nmbr_bits=None):
    """
    Returns a string of 0/1 values for any datatype.

    :param s: Any variable or object.
    :type s: any
    :return: The 0/1's of s' bits.
    :rtype: string
    """
    bit_list = str(s) if s <= 1 else bin(s >> 1) + str(s & 1)
    if nmbr_bits is not None:
        while len(bit_list) < nmbr_bits:
            bit_list = '0' + bit_list
    return bit_list

def read_header(path_bin):
    """
    Function that reads the header of a VDAQ2 `*.bin` file.

    :param path_bin: The path to the `*.bin` file.
    :type path_bin: string
    :return: list (dictionary with infos from header,
                    list of keys that are written in each sample,
                    bool True if adc is 16 bit,
                    bool True if dac is 16 bit)
    :rtype: list
    """

    keys = []

    dt_header = np.dtype([('ID', 'i4'),
                          ('numOfBytes', 'i4'),
                          ('downsamplingFactor', 'i4'),
                          ('channelsAndFormat', 'i4'),
                          ('timestamp', 'uint64'),
                          ])

    #header = np.fromfile(path_bin, dtype=dt_header, count=1)[0]
    header = BinaryFile(path=path_bin, dtype=dt_header, count=1)[0]

    channelsAndFormat = to_bin_string(header['channelsAndFormat'], 32)

    # print('channelsAndFormat: ', channelsAndFormat)

    # bit 0: Timestamp uint64
    if channelsAndFormat[-1] == '1': keys.append('Time')
    #bit 9 sample number, currently only used by NUCLEUS experiment
    if channelsAndFormat[-10] == '1': keys.append('SampleNr')
    # bit 1: settings uint32
    if channelsAndFormat[-2] == '1': keys.append('Settings')
    # bit 2-5: dac 1-4
    for c, b in enumerate([2, 3, 4, 5]):
        if channelsAndFormat[-int(b + 1)] == '1': keys.append('DAC' + str(c + 1))
    # bit 6-8: adc 1-3 
    for c, b in enumerate([6, 7, 8]):
        if channelsAndFormat[-int(b + 1)] == '1': keys.append('ADC' + str(c + 1))
    # bit 10-16: -
    # bit 16: 0...DAC 16 bit, 1...DAC 32 bit
    dac_short = not (channelsAndFormat[-17] == '1')
    # bit 17: 0...ADC 16 bit, 1...ADC 32 bit
    adc_short = not (channelsAndFormat[-18] == '1')
    # bit 18-31: -

    # construct data type

    dt_tcp = []

    for k in keys:
        if k.startswith('Time'): dt_tcp.append((k, 'uint64'))
        elif k.startswith('SampleNr'): dt_tcp.append((k, 'uint32'))
        elif k.startswith('Settings'): dt_tcp.append((k, 'i4'))
        elif k.startswith('DAC'): dt_tcp.append((k, 'i2' if dac_short else 'i4'))
        elif k.startswith('ADC'): dt_tcp.append((k, 'i2' if adc_short else 'i4'))

    dt_tcp = np.dtype(dt_tcp)

    adc_bits = 16 if adc_short else 24
    dac_bits = 16 if dac_short else 24

    return header, keys, adc_bits, dac_bits, dt_tcp

# TODO: test cases
[docs] class Stream_VDAQ2(StreamBaseClass): """ Implementation of StreamBaseClass for hardware 'vdaq2'. VDAQ2 data is stored in `*.bin` files. Its header contains instructions on how to read the data and all recorded channels are stored in the same file. :param file: The binary file with the stream data (including the file extension). :type file: str :param dac_trig_thr: Trigger threshold (in sigmas) to find testpulses from the DAC channels. Defaults to 5 sigmas :type dac_trig_thr: float, optional :param dac_trig_win_len_ms: Trigger window length (in ms) to find testpulses from the DAC channels. Defaults to 500 ms :type dac_trig_win_len_ms: int, optional :param cal_trig_thr: Trigger threshold (in V) to find calpulses from the calibration ADC channels. Default to None, in which case the threshold is taken to be 5 times a single ADC count. :type cal_trig_thr: float, optional :param cal_trig_win_len_ms: Trigger window length (in ms) to find calpulses from the calibration ADC channels. Defaults to 50 ms :type cal_trig_win_len_ms: int, optional """ def __init__(self, file: str, dac_trig_thr: float = 5., # sigmas dac_trig_win_len_ms: int = 500, # ms cal_trig_thr: float = None, # V cal_trig_win_len_ms: int = 50 # ms ): super().__init__(file=file, dac_trig_thr=dac_trig_thr, dac_trig_win_len_ms=dac_trig_win_len_ms, cal_trig_thr=cal_trig_thr, cal_trig_win_len_ms=cal_trig_win_len_ms) # Get relevant info about file from its header header, keys, self._adc_bits, self._dac_bits, dt_tcp = read_header(file) # Start timestamp of the file in us (header['timestamp'] is in ns) self._start = int(header['timestamp']/1000) # Temporal step size in us (= inverse sampling frequency) self._dt = int(header['downsamplingFactor']) # VDAQ2 format could contain keys 'Settings' and 'Time' which we do not want to have as available data channels self._keys = list(set(keys) - set(['Time', 'Settings', 'SampleNr'])) # Create memory map to binary file self._data = BinaryFile(path=file, dtype=dt_tcp, offset=header.nbytes) #self._data = np.memmap(file, dtype=dt_tcp, mode='r', offset=header.nbytes) self._dac_trig_thr = dac_trig_thr self._dac_trig_win_len_ms = dac_trig_win_len_ms self._dac_trig_win_len = int(1000*dac_trig_win_len_ms/self._dt) self._cal_trig_thr = cal_trig_thr self._cal_trig_win_len_ms = cal_trig_win_len_ms self._cal_trig_win_len = int(1000*cal_trig_win_len_ms/self._dt) # Create placeholders for testpulses and calibration pulses self._tp_timestamps = dict() self._tpas = dict() self._calp_timestamps = dict() self._calpas = dict() def __len__(self): return len(self._data) def __enter__(self): self._data.__enter__() return self def __exit__(self, typ, val, tb): self._data.__exit__(typ, val, tb) def get_trace(self, key: str, where: slice, voltage: bool = True): if key.lower().startswith('adc'): bits = self._adc_bits elif key.lower().startswith('dac'): bits = self._dac_bits else: raise ValueError(f'Unable to assign the correct itemsize to name "{key}" as it does not start with "ADC" or "DAC".') data = self._data[where][key] return ai.data.convert_to_V(data, bits=bits, min=-20, max=20) if voltage else data @property def start_us(self): return self._start @property def dt_us(self): return self._dt @property def keys(self): return self._keys @property def tp_keys(self): return [x for x in self.keys if x.startswith("DAC")] @property def tpas(self): return VDAQ2_TPAS(self) @property def tp_timestamps(self): return VDAQ2_TP_TS(self) @property def calp_keys(self): return [x for x in self.keys if x.startswith("ADC")] @property def calpas(self): return VDAQ2_CALPAS(self) @property def calp_timestamps(self): return VDAQ2_CALP_TS(self)
class VDAQ2_TPAS: """A helper class for accessing testpulse amplitudes of the VDAQ2 hardware (which requires triggering a DAC channel).""" def __init__(self, stream: Stream_VDAQ2): self._stream = stream def __repr__(self): return f'{self.__class__.__name__}(keys={self.keys()})' def __getitem__(self, key: str): if not key in self.keys(): raise KeyError(f"Invalid testpulse key '{key}'. Valid keys: {self.keys()}") if key not in self._stream._tpas.keys(): print(f"Triggering {key} to obtain testpulse timestamps and testpulse amplitudes ...") timestamps, tpas = vdaq2_dac_channel_trigger(self._stream, key, self._stream._dac_trig_thr, self._stream._dac_trig_win_len) self._stream._tpas[key] = tpas self._stream._tp_timestamps[key] = timestamps return self._stream._tpas[key] def keys(self): return self._stream.tp_keys class VDAQ2_TP_TS: """A helper class for accessing testpulse timestamps of the VDAQ2 hardware (which requires triggering a DAC channel).""" def __init__(self, stream: Stream_VDAQ2): self._stream = stream def __repr__(self): return f'{self.__class__.__name__}(keys={self.keys()})' def __getitem__(self, key: str): if not key in self.keys(): raise KeyError(f"Invalid testpulse key '{key}'. Valid keys: {self.keys()}") if key not in self._stream._tp_timestamps.keys(): print(f"Triggering {key} to obtain testpulse timestamps and testpulse amplitudes ...") timestamps, tpas = vdaq2_dac_channel_trigger(self._stream, key, self._stream._dac_trig_thr, self._stream._dac_trig_win_len) self._stream._tpas[key] = tpas self._stream._tp_timestamps[key] = timestamps return self._stream._tp_timestamps[key] def keys(self): return self._stream.tp_keys class VDAQ2_CALPAS: """A helper class for accessing calibration pulse amplitudes of the VDAQ2 hardware (which requires triggering the ADC calibration channel).""" def __init__(self, stream: Stream_VDAQ2): self._stream = stream def __repr__(self): return f'{self.__class__.__name__}(keys={self.keys()})' def __getitem__(self, key: str): if not key in self.keys(): raise KeyError(f"Invalid calibration pulses key '{key}'. Valid keys: {self.keys()}") if key not in self._stream._calpas.keys(): print(f"Triggering {key} to obtain calpulse timestamps and calpulse amplitudes ...") timestamps, calpas = vdaq2_cal_channel_trigger(self._stream, key, self._stream._cal_trig_thr, self._stream._cal_trig_win_len) self._stream._calpas[key] = calpas self._stream._calp_timestamps[key] = timestamps return self._stream._calpas[key] def keys(self): return self._stream.calp_keys class VDAQ2_CALP_TS: """A helper class for accessing calpulse timestamps of the VDAQ2 hardware (which requires triggering the ADC calibration channel).""" def __init__(self, stream: Stream_VDAQ2): self._stream = stream def __repr__(self): return f'{self.__class__.__name__}(keys={self.keys()})' def __getitem__(self, key: str): if not key in self.keys(): raise KeyError(f"Invalid calibration key '{key}'. Valid keys: {self.keys()}") if key not in self._stream._calp_timestamps.keys(): print(f"Triggering {key} to obtain calibration pulse timestamps and calibration pulse amplitudes ...") timestamps, calpas = vdaq2_cal_channel_trigger(self._stream, key, self._stream._cal_trig_thr, self._stream._cal_trig_win_len) self._stream._calpas[key] = calpas self._stream._calp_timestamps[key] = timestamps return self._stream._calp_timestamps[key] def keys(self): return self._stream.calp_keys