Source code for cait.data._gen_h5

import os
from multiprocessing import get_context  # Pool

import numpy as np
import h5py

from ..data._raw import read_rdt_file
from ..features._mp import calc_main_parameters
from ..fit._pm_fit import fit_pulse_shape
from ._baselines import calculate_mean_nps
from ..fit._sev import generate_standard_event
from ..filter._of import optimal_transfer_function

# ------------------------------------------------------------
# FUNCTION
# ------------------------------------------------------------


[docs] def gen_dataset_from_rdt(path_rdt, fname, path_h5, channels, tpa_list=[0.0], calc_mp=True, calc_fit=False, calc_sev=False, calc_nps=True, processes=4, event_dtype='float32', ints_in_header=7, sample_frequency=25000, lazy_loading=True, ): """ Generates a HDF5 File from an RDT File, optionally MP, Fit, SEV Calculation. :param path_rdt: Path to the rdt file e.g. "data/bcks/". :type path_rdt: string :param fname: Name of the file e.g. "bck_001". :type fname: string :param path_h5: Path where the h5 file is saved e.g. "data/hdf5s%". :type path_h5: string :param channels: the numbers of the channels in the hdf5 file that we want to include in rdt :type channels: list :param tpa_list: The test pulse amplitudes to save, if 1 is in the list, all positive values are included. :type tpa_list: list :param calc_mp: If True the main parameters for all events are calculated and stored. :type calc_mp: bool :param calc_fit: Not recommended! If True the parametric fit for all events is calculated and stored. :type calc_fit: bool :param calc_sev: Not recommended! If True the standard event for all event channels is calculated. :type calc_sev: bool :param calc_nps: If True the main parameters for all events are calculated and stored. :type calc_nps: bool :param processes: The number of processes that is used for the code execution. :type processes: int :param event_dtype: Datatype to save the events with. :type event_dtype: string :param ints_in_header: The number of ints in the header of the events in the RDF file. This should be either 7 or 6! :type ints_in_header: int :param sample_frequency: The sample frequency of the records. :type sample_frequency: int :param lazy_loading: Recommended! If true, the data is loaded with memory mapping to avoid memory overflows. :type lazy_loading: bool """ nmbr_channels = len(channels) if not os.path.exists(path_h5): os.makedirs(path_h5) print('READ EVENTS FROM RDT FILE.') metainfo, pulse = \ read_rdt_file(fname=fname, path=path_rdt, channels=channels, store_as_int=False, ints_in_header=ints_in_header, lazy_loading=lazy_loading, ) # ipdb.set_trace() if nmbr_channels == 2: path = "{}{}-P_Ch{}-L_Ch{}.h5".format(path_h5, fname, channels[0], channels[1]) else: path = "{}{}".format(path_h5, fname) for i, c in enumerate(channels): path += '-{}_Ch{}'.format(i + 1, c) path += ".h5" with h5py.File(path, 'w') as h5f: for i, c in enumerate(channels): h5f.attrs.create('Ch_{}'.format(i + 1), data=c) # ################# PROCESS EVENTS ################# # if we filtered for events if 0.0 in tpa_list: print('WORKING ON EVENTS WITH TPA = 0.') metainfo_event = metainfo[:, metainfo[0, :, 12] == 0, :] pulse_event = pulse[:, metainfo[0, :, 12] == 0, :] nmbr_events = len(metainfo_event[0]) events = h5f.create_group('events') events.create_dataset('event', data=np.array(pulse_event, dtype=event_dtype)) events.create_dataset('hours', data=np.array(metainfo_event[0, :, 10])) events.create_dataset('time_s', data=np.array(metainfo_event[0, :, 4]), dtype='int32') events.create_dataset('time_mus', data=np.array(metainfo_event[0, :, 5]), dtype='int32') print('CREATE DATASET WITH EVENTS.') # for small numbers of events only additional overhead is introduced processes = 1 if pulse_event.shape[1] < 5 else processes if calc_mp: print('CALCULATE MAIN PARAMETERS.') # 10 is number main parameters mainpar_event = np.empty( [nmbr_channels, nmbr_events, 10], dtype=float) # basically a for loop running on multiple processes with get_context("spawn").Pool(processes) as p: for c in range(nmbr_channels): mainpar_list_event = p.map( calc_main_parameters, pulse_event[c, :, :]) mainpar_event[c, :, :] = np.array( [o.getArray() for o in mainpar_list_event]) events.create_dataset('mainpar', data=np.array(mainpar_event)) # description of the mainpar (data=col_in_mainpar) events['mainpar'].attrs.create(name='pulse_height', data=0) events['mainpar'].attrs.create(name='t_zero', data=1) events['mainpar'].attrs.create(name='t_rise', data=2) events['mainpar'].attrs.create(name='t_max', data=3) events['mainpar'].attrs.create(name='t_decaystart', data=4) events['mainpar'].attrs.create(name='t_half', data=5) events['mainpar'].attrs.create(name='t_end', data=6) events['mainpar'].attrs.create(name='offset', data=7) events['mainpar'].attrs.create(name='linear_drift', data=8) events['mainpar'].attrs.create(name='quadratic_drift', data=9) if calc_fit: print('CALCULATE FIT.') # 6 is number fit parameters fitpar_event = np.empty( [nmbr_channels, nmbr_events, 6], dtype=float) with get_context("spawn").Pool(processes) as p: for c in range(nmbr_channels): fitpar_event[c] = np.array( p.map(fit_pulse_shape, pulse_event[c, :, :])) events.create_dataset('fitpar', data=np.array(fitpar_event)) # description of the fitparameters (data=column_in_fitpar) events['fitpar'].attrs.create(name='t_0', data=0) events['fitpar'].attrs.create(name='A_n', data=1) events['fitpar'].attrs.create(name='A_t', data=2) events['fitpar'].attrs.create(name='tau_n', data=3) events['fitpar'].attrs.create(name='tau_in', data=4) events['fitpar'].attrs.create(name='tau_t', data=5) if calc_sev and calc_nps: # ################# STD EVENT ################# # [pulse_height, t_zero, t_rise, t_max, t_decaystart, t_half, t_end, offset, linear_drift, quadratic_drift] sev_pulse_list = [] sev_fitpar_list = [] for c in range(nmbr_channels): stdevent_pulse, stdevent_fitpar = generate_standard_event(pulse_event[c, :, :], mainpar_event[c, :, :], pulse_height_interval=[ 0.05, 1.5], left_right_cutoff=0.1, rise_time_interval=[ 5, 100], decay_time_interval=[ 50, 2500], onset_interval=[ 1500, 3000], verb=True) sev_pulse_list.append(stdevent_pulse) sev_fitpar_list.append(stdevent_fitpar) stdevent = h5f.create_group('stdevent') stdevent.create_dataset('event', data=np.array(sev_pulse_list, dtype=event_dtype)) stdevent.create_dataset('fitpar', data=np.array(sev_fitpar_list)) # description of the fitparameters (data=column_in_fitpar) stdevent['fitpar'].attrs.create(name='t_0', data=0) stdevent['fitpar'].attrs.create(name='A_n', data=1) stdevent['fitpar'].attrs.create(name='A_t', data=2) stdevent['fitpar'].attrs.create(name='tau_n', data=3) stdevent['fitpar'].attrs.create(name='tau_in', data=4) stdevent['fitpar'].attrs.create(name='tau_t', data=5) stdevent.create_dataset('mainpar', data=np.array([calc_main_parameters(x).getArray() for x in sev_pulse_list])) # description of the mainpar (data=col_in_mainpar) stdevent['mainpar'].attrs.create(name='pulse_height', data=0) stdevent['mainpar'].attrs.create(name='t_zero', data=1) stdevent['mainpar'].attrs.create(name='t_rise', data=2) stdevent['mainpar'].attrs.create(name='t_max', data=3) stdevent['mainpar'].attrs.create(name='t_decaystart', data=4) stdevent['mainpar'].attrs.create(name='t_half', data=5) stdevent['mainpar'].attrs.create(name='t_end', data=6) stdevent['mainpar'].attrs.create(name='offset', data=7) stdevent['mainpar'].attrs.create(name='linear_drift', data=8) stdevent['mainpar'].attrs.create(name='quadratic_drift', data=9) # ################# PROCESS NOISE ################# # if we filtered for noise if -1.0 in tpa_list: print('WORKING ON EVENTS WITH TPA = -1.') metainfo_noise = metainfo[:, metainfo[0, :, 12] == -1.0, :] pulse_noise = pulse[:, metainfo[0, :, 12] == -1.0, :] print('CREATE DATASET WITH NOISE.') noise = h5f.create_group('noise') noise.create_dataset('event', data=np.array(pulse_noise, dtype=event_dtype)) noise.create_dataset('hours', data=np.array(metainfo_noise[0, :, 10])) noise.create_dataset('time_s', data=np.array(metainfo_noise[0, :, 4]), dtype='int32') noise.create_dataset('time_mus', data=np.array(metainfo_noise[0, :, 5]), dtype='int32') if calc_nps: if np.shape(pulse_noise)[1] != 0: mean_nps_all = [] for c in range(nmbr_channels): mean_nps, _ = calculate_mean_nps(pulse_noise[c, :, :]) mean_nps_all.append(mean_nps) frequencies = np.fft.rfftfreq(len(pulse_noise[0, 0]), d=1. / sample_frequency) noise.create_dataset('nps', data=np.array(mean_nps_all)) noise.create_dataset('freq', data=frequencies) else: print("DataError: No existing noise data for this channel") if (-1.0 in tpa_list) and (0 in tpa_list) and calc_sev and calc_nps: # ################# OPTIMUMFILTER ################# # H = optimal_transfer_function(standardevent, mean_nps) print('CREATE OPTIMUM FILTER.') of = np.array([optimal_transfer_function(sev, nps) for sev, nps in zip(sev_pulse_list, mean_nps_all)]) optimumfilter = h5f.create_group('optimumfilter') optimumfilter.create_dataset('optimumfilter_real', data=of.real) optimumfilter.create_dataset('optimumfilter_imag', data=of.imag) # ################# PROCESS TESTPULSES ################# # if we filtered for testpulses if any(el > 0 for el in tpa_list): print('WORKING ON EVENTS WITH TPA > 0.') tp_list = np.logical_and( metainfo[0, :, 12] != -1.0, metainfo[0, :, 12] != 0.0) metainfo_tp = metainfo[:, tp_list, :] pulse_tp = pulse[:, tp_list, :] nmbr_tp = len(metainfo_tp[0]) print('CREATE DATASET WITH TESTPULSES.') testpulses = h5f.create_group('testpulses') testpulses.create_dataset('event', data=np.array(pulse_tp, dtype=event_dtype)) testpulses.create_dataset( 'hours', data=np.array(metainfo_tp[0, :, 10])) testpulses.create_dataset( 'testpulseamplitude', data=np.array(metainfo_tp[0, :, 12])) testpulses.create_dataset('time_s', data=np.array(metainfo_tp[0, :, 4]), dtype='int32') testpulses.create_dataset('time_mus', data=np.array(metainfo_tp[0, :, 5]), dtype='int32') if calc_mp: print('CALCULATE MP.') # basically a for loop running on 4 processes mainpar_tp = np.empty([nmbr_channels, nmbr_tp, 10], dtype=float) with get_context("spawn").Pool(processes) as p: for c in range(nmbr_channels): mainpar_list_tp = p.map( calc_main_parameters, pulse_tp[c, :, :]) mainpar_tp[c] = np.array( [o.getArray() for o in mainpar_list_tp]) testpulses.create_dataset('mainpar', data=np.array(mainpar_tp)) # description of the mainpar (data=col_in_mainpar) testpulses['mainpar'].attrs.create(name='pulse_height', data=0) testpulses['mainpar'].attrs.create(name='t_zero', data=1) testpulses['mainpar'].attrs.create(name='t_rise', data=2) testpulses['mainpar'].attrs.create(name='t_max', data=3) testpulses['mainpar'].attrs.create(name='t_decaystart', data=4) testpulses['mainpar'].attrs.create(name='t_half', data=5) testpulses['mainpar'].attrs.create(name='t_end', data=6) testpulses['mainpar'].attrs.create(name='offset', data=7) testpulses['mainpar'].attrs.create(name='linear_drift', data=8) testpulses['mainpar'].attrs.create(name='quadratic_drift', data=9) if calc_fit: print('CALCULATE FIT.') fitpar_tp = [] with get_context("spawn").Pool(processes) as p: for c in range(nmbr_channels): fitpar_tp.append(np.array( p.map(fit_pulse_shape, pulse_tp[c, :, :]))) testpulses.create_dataset('fitpar', data=np.array(fitpar_tp)) # description of the fitparameters (data=column_in_fitpar) testpulses['fitpar'].attrs.create(name='t_0', data=0) testpulses['fitpar'].attrs.create(name='A_n', data=1) testpulses['fitpar'].attrs.create(name='A_t', data=2) testpulses['fitpar'].attrs.create(name='tau_n', data=3) testpulses['fitpar'].attrs.create(name='tau_in', data=4) testpulses['fitpar'].attrs.create(name='tau_t', data=5)