Source code for cait.versatile.datasources.stream.impl_csmpl

import json
import os
from typing import List

import numpy as np

import cait as ai

from ....readers import BinaryFile
from ..hardwaretriggered.par_file import PARFile
from .streambase import StreamBaseClass


def get_offset(path_dig_stamps):
    """
    Get the offset between start of the continuous DAQ and start of the CCS time recording.

    :param path_dig_stamps: The full path to the `*.dig` file.
    :type path_dig_stamps: str
    :return: The offset that needs to be subtracted from all CCS time stamps, to get the time stamps w.r.t. the start of the CSMPL file.
    :rtype: int
    """

    dig = np.dtype([
        ('stamp', np.uint64),
        ('bank', np.uint32),
        ('bank2', np.uint32),
    ])

    #diq_stamps = np.fromfile(path_dig_stamps, dtype=dig)
    diq_stamps = BinaryFile(path=path_dig_stamps, dtype=dig)
    dig_samples = diq_stamps['stamp']
    offset_clock = (dig_samples[1] - 2 * dig_samples[0])

    return offset_clock

def get_test_stamps(path,
                    channels=None,
                    control_pulses=None,
                    clock=10000000,
                    min_cpa=10.1):
    """
    Load the test pulse time stamps from a ``*.test_stamps`` file.

    :param path: The path to the ``*.test_stamps`` file.
    :type path: string
    :param channels: The test pulse channels we want to read out.
    :type channels: list
    :param control_pulses: If set to True, only control pulses are returned. If False, only test pulses are returned. If None, all are returned.
    :type control_pulses: bool or None
    :param clock: The Frequency of the time clock, in Hz. Standard for CRESST is 10MHz.
    :type clock: int
    :return: (the test pulse hours time stamps, the test pulse amplitudes, the channels of the test pulses)
    :rtype: 3-tuple of 1D arrays
    """

    teststamp = np.dtype([
        ('stamp', np.uint64),
        ('tpa', np.float32),
        ('tpch', np.uint32),
    ])

    #stamps = np.fromfile(path, dtype=teststamp)
    stamps = BinaryFile(path=path, dtype=teststamp)

    hours = stamps['stamp'] / clock / 3600
    tpas = stamps['tpa']
    testpulse_channels = stamps['tpch']

    # take only the channels we want
    if channels is not None:
        # Deprecated since numpy 2.0
        # cond = np.in1d(testpulse_channels, channels)
        cond = np.isin(testpulse_channels, channels)
        hours = hours[cond]
        tpas = tpas[cond]
        testpulse_channels = testpulse_channels[cond]

    # take only control or no control pulses
    if control_pulses is not None:
        if control_pulses:
            cond = tpas > min_cpa
        else:
            cond = tpas < min_cpa
        hours = hours[cond]
        tpas = tpas[cond]
        testpulse_channels = testpulse_channels[cond]

    return hours, tpas, testpulse_channels

[docs] class Stream_CSMPL(StreamBaseClass): """ Implementation of StreamBaseClass for hardware 'CSMPL'. The data is stored in `*.csmpl` files (for each channel separately). Additionally, we need a `*.par` file to read the start timestamp of the stream data from. """ def __init__(self, files: List[str]): super().__init__(files=files) if not any([x.endswith('.par') or x.endswith('.json') for x in files]): raise ValueError("You have to provide either a '.par' or '.json' file to construct this class.") if any([x.endswith('.par') for x in files]) and any([x.endswith('.json') for x in files]): raise ValueError("You may only provide one of a '.par' and a '.json' file, not both.") if not any([x.endswith('.csmpl') for x in files]): raise ValueError("You have to provide at least one '.csmpl' file to construct this class.") if any([os.path.splitext(x)[-1] not in [".csmpl", ".par", ".json", ".test_stamps", ".dig_stamps"] for x in files]): raise ValueError("Only file extensions ['.csmpl', '.par'] are supported.") csmpl_paths = [x for x in files if x.endswith('.csmpl')] test_path = [x for x in files if x.endswith('.test_stamps')] dig_path = [x for x in files if x.endswith('.dig_stamps')] # Offset from the dig_stamps file (assuming a 10 MHz clock) offset = 0 if not dig_path else int(get_offset(dig_path[0])/10) if any([x.endswith('.par') for x in files]): par_path = [x for x in files if x.endswith('.par')][0] self._par_file = PARFile(par_path) self._start = int(1e6*self._par_file.start_s + self._par_file.start_us - offset) self._dt = self._par_file.time_base_us elif any([x.endswith('.json') for x in files]): json_path = [x for x in files if x.endswith('.json')][0] with open(json_path, 'r') as f: self._config = json.load(f) if "start_ts" in self._config: # Start time is Unix timestamp self._start = int(1e6*self._config["start_ts"] - offset) elif "start_s" in self._config and "start_us" in self._config: # Timestamp is separated into seconds and microseconds, a la .par files self._start = int(1e6*self._config["start_s"] + self._config["start_us"] - offset) else: # Improperly formatted config file raise ValueError( "Config (.json) file must contain either " "\"start_ts\", or \"start_s\" and \"start_us\"." ) if "time_base_us" not in self._config: raise ValueError("Config (.json) file must contain \"time_base_us\".") self._dt = self._config["time_base_us"] self._data = dict() for fname in csmpl_paths: name = os.path.splitext(os.path.basename(fname))[0] if name.split("_")[-1].lower().startswith("ch"): name = name.split("_")[-1] self._data[name] = BinaryFile(path=fname, dtype=np.dtype(np.int16)) if test_path: if not dig_path: raise Exception("When including testpulse information using a '.test_stamps' file, you also have to provide the corresponding '.dig_stamps' file.") test_path = test_path[0] test_h, tpas, test_chs = get_test_stamps(test_path) self._tpas = dict() self._tp_timestamps = dict() for k in list(set(test_chs)): mask = test_chs == k self._tpas[str(k)] = tpas[mask] # assuming 10 MHz clock self._tp_timestamps[str(k)] = self.start_us + np.array(test_h[mask]*3600*1e6, dtype=np.int64) + offset self._keys = list(self._data.keys()) def __len__(self): return len(self._data[self.keys[0]]) def __enter__(self): for bin_file in self._data.values(): bin_file.__enter__() return self def __exit__(self, typ, val, tb): for bin_file in self._data.values(): bin_file.__exit__(typ, val, tb) def get_trace(self, key: str, where: slice, voltage: bool = True): data = self._data[key][where] return ai.data.convert_to_V(data, bits=16, min=-10, max=10) if voltage else data @property def start_us(self): return self._start @property def dt_us(self): return self._dt @property def keys(self): return self._keys @property def tp_keys(self): if not hasattr(self, '_tpas'): return [] else: return list(self._tpas.keys()) @property def tpas(self): if not hasattr(self, '_tpas'): raise KeyError("Testpulse amplitudes not available. Include a '.test_stamps' and a '.dig_stamps' file when constructing this class to use this feature.") return self._tpas @property def tp_timestamps(self): if not hasattr(self, '_tp_timestamps'): raise KeyError("Testpulse timestamps not available. Include a '.test_stamps' and a '.dig_stamps' file when constructing this class to use this feature.") return self._tp_timestamps @property def calp_keys(self): return [] @property def calpas(self): raise NotImplementedError("Calibration channels treatment is only implemented for 'vdaq2' and 'vdaq3' hardware.") @property def calp_timestamps(self): raise NotImplementedError("Calibration channels treatment is only implemented for 'vdaq2' and 'vdaq3' hardware.")