Source code for cait.event_interface

import math

import numpy as np
import h5py
import matplotlib.pyplot as plt

from .evaluation._color import console_colors
from .features._fem import get_elements, plot_S1
from .filter._of import filter_event
from .fit._templates import sev_fit_template
from .styles._plt_styles import use_cait_style, make_grid
from .fit._saturation import scaled_logistic_curve
from .fit._numerical_fit import template

# -----------------------------------------------------------
# CLASS
# -----------------------------------------------------------


[docs] class EventInterface: """ A class for the viewing and labeling of Events from HDF5 data sets. The Event interface is one of the core classes of the cait package. It is used to view raw data events and label them in order to train supervised machine learning models. The interface is with an interactive menu optimized for an efficient workflow. :param record_length: The number of samples in a record window. :type record_length: int :param sample_frequency: The record frequency of the measurement. :type sample_frequency: int :param nmbr_channels: The number of channels of the detector modules, typically 1, 2 (phonon/light) or 3 (including I-sticks or ring). :type nmbr_channels: int :param down: The downsample rate for viewing the events. This can later be adapted in the interactive menu. :type down: int :param dpi: Dots per inch for the plots. :type dpi: int :param run: The number of the measurement run. This is a optional argument, to identify a measurement with a given module uniquely. Providing this argument has no effect, but might be useful in case you start multiple DataHandlers at once, to stay organized. :type run: string or None :param module: The naming of the detector module. Optional argument, for unique identification of the physics data. Providing this argument has no effect, but might be useful in case you start multiple DataHandlers at once, to stay organized. :type module: string or None >>> ei = ai.EventInterface() Event Interface Instance created. >>> ei.load_h5(path='./', fname='test_001', channels=[0,1]) Nmbr triggered events: 4 Nmbr testpulses: 11 Nmbr noise: 4 HDF5 File loaded. >>> ei.create_labels_csv(path='./') """ def __init__(self, record_length: int = 16384, sample_frequency: int = 25000, nmbr_channels: int = 2, down: int = 1, dpi: int = None, run: str = None, module: str = None, pre_trigger_region: float = 1 / 8, ): self.nmbr_channels = nmbr_channels self.module = module self.run = run self.nmbr_channels = nmbr_channels self.record_length = record_length self.down = down self.sample_frequency = sample_frequency self.window_size = int(record_length / down) self.show_mp = False self.show_derivative = False self.show_triangulation = False self.std_thres = [] for i in range(self.nmbr_channels): self.std_thres.append(0.001) self.only_wrong = False self.show_filtered = False self.sev = False self.arr = False self.saturation = False self.fit_models = None self.stdevents = None self.saturation_pars = None self.of = None self.subtract_offset = False self.labels = {} self.predictions = {} self.model_names = {} self.valid_types = ['events', 'testpulses', 'noise'] if self.nmbr_channels == 2: self.channel_names = ['Phonon', 'Light'] else: self.channel_names = [ 'Channel {}'.format(i) for i in range(nmbr_channels) ] self.xlim = None self.ylim = None self.dpi = dpi self.show_time = False self.window = False self.threshold = None self.show_threshold = False self.pre_trigger_region = pre_trigger_region print('Event Interface Instance created.') # ------------------------------------------------------------ # INCLUDE THE DATA # ------------------------------------------------------------ # Load in the hdf5 dataset
[docs] def load_h5(self, path: str, fname: str, channels: list = None, appendix=True, which_to_label=['events']): """ Load a hdf5 dataset to the event interface instance. This is typically done right after the declaration of a new instance. :param path: Path to the file folder. E.g. "data/" --> filepath "data/fname-[appendix].h5". :type path: string :param channels: The numbers of the channels that are included in the HDF5 file. The should be consistent with the appendix of the file name. If the file has no appendix, the numbering can be done arbitrarily, e.g. with [0, 1] for a two-channel detector module. :type channels: list of string :param fname: The file name without suffix, e.g. "test_001.h5" --> "test_001". :type fname: string :param appendix: If True the appendix generated from the gen_h5_from_rdt function is automatically appended to the fname string. Use this argument, if your HDF5 file has such an appendix. Do not put the appendix in the fname string then, e.g. "test_001-P_Ch1-L_Ch2.h5" --> fname="test_001", appendix=True :type appendix: bool :param which_to_label: Specify which groups from the HDF5 set should be labeled. Possible list members are 'events', 'testpulses' and 'noise'. In most use cases you will just want the standard argument ['events']. :type which_to_label: list of strings """ assert not appendix or channels is not None, 'If you want an automatically appendix you must hand the channels!' if appendix: if self.nmbr_channels == 2: app = '-P_Ch{}-L_Ch{}'.format(*channels) else: app = '' for i, c in enumerate(channels): app += '-{}_Ch{}'.format(i + 1, c) else: app = '' if all([type in self.valid_types for type in which_to_label]): self.which_to_label = which_to_label else: raise ValueError( 'which_to_label must be a list and contain at least one of events, testpulses, noise.' ) self.fname = fname if channels is not None: if not len(channels) == self.nmbr_channels: raise ValueError('List of channels must vale length {}.'.format( self.nmbr_channels)) if path == '': path = './' if path[-1] != '/': path = path + '/' path_h5 = path + '{}{}.h5'.format(fname, app) self.path_h5 = path_h5 with h5py.File(path_h5, 'r') as f: if channels is not None: self.channels = channels self.nmbrs = {} try: self.nmbrs['events'] = f['events']['event'].shape[1] print('Nmbr triggered events: ', self.nmbrs['events']) except KeyError: print('No triggered events in h5 file.') try: self.nmbrs['testpulses'] = f['testpulses']['event'].shape[1] print('Nmbr testpulses: ', self.nmbrs['testpulses']) except KeyError: print('No Testpulses in h5 file.') try: self.nmbrs['noise'] = f['noise']['event'].shape[1] print('Nmbr noise: ', self.nmbrs['noise']) except KeyError: print('No noise in h5 file.') print('HDF5 File loaded.')
# ------------------------------------------------------------ # LABELS HANDLING # ------------------------------------------------------------ # Create CSV file for labeling
[docs] def create_labels_csv(self, path: str): """ Create a new CSV file to store the labels. The labels are intentionally not included in the HDF5 set right away, to provide a fail-save mechanism in case the HDF5 file is re-converted or the this method is accidentally called, overwriting existing labels. Labels are usually assigned per hand, making the labels the most time-costly values in your dataset. :param path: The path to the file folder where the labels CSV is to be created. A unique naming is automatically assigned, e.g. "data/" --> file name "data/labels_bck_001_type.csv". :type path: string """ if path == '': path = './' if path[-1] != '/': path = path + '/' self.path_csv_labels = path + \ 'labels_{}_'.format(self.fname) try: for type in self.which_to_label: self.labels[type] = np.zeros( [self.nmbr_channels, self.nmbrs[type]]) np.savetxt(self.path_csv_labels + type + '.csv', self.labels[type], fmt='%i', delimiter='\n') except NameError: print('Error! Load a h5 file first.')
# Load CSV file for labeling
[docs] def load_labels_csv(self, path: str, type: str = 'events'): """ Load a CSV file with labels for the given detector module. :param path: the path to the file folder containing the labels CSV file, e.g. "data/" --> file name "data/labels_bck_001_type.csv". :type path: string :param type: The group in the HDF5 corresponding to the labels, either 'events', 'testpulses' or 'noise'. :type type: string >>> ei.load_labels_csv(path='./') Loading Labels from ./labels_test_001_events.csv. """ if not type in self.valid_types: raise ValueError('Type should be events, testpulses or noise.') if path == '': path = './' if path[-1] != '/': path = path + '/' self.path_csv_labels = path + \ 'labels_{}_'.format(self.fname) filename = self.path_csv_labels + type + '.csv' print('Loading Labels from {}.'.format(filename)) labels = np.loadtxt(filename, delimiter='\n') labels.resize((self.nmbr_channels, self.nmbrs[type])) self.labels[type] = labels
# Export labels from hdf5 file to CSV file
[docs] def export_labels(self, path: str, type: str = 'events'): """ Save the labels included in the HDF5 file as CSV file. You will usually need this option if you have an HDF5 file with included labels, but lost the corresponding CSV file. Also, it is recommended to export and store the labels as CSV in on a safe place, e.g. in a Wiki. :param path: The path to the file folder containing the labels CSV, e.g. "data/" --> file name "data/labels_bck_001_type.csv". :type path: string :param type: The group in the HDF5 file corresponding to the labels, either 'events', 'testpulses' or 'noise'. :type type: string >>> ei.export_labels(path='./') Labels from HDF5 exported to ./labels_test_001_. """ with h5py.File(self.path_h5, 'r+') as f: if not type in self.valid_types: raise ValueError('Type should be events, testpulses or noise.') if path == '': path = './' if path[-1] != '/': path = path + '/' self.path_csv_labels = path + \ 'labels_{}_'.format(self.fname) # check if hdf5 file has labels if not f[type]['labels']: print('Load HDF5 File with labels first!') else: np.savetxt(self.path_csv_labels + type + '.csv', np.array(f[type]['labels']), fmt='%i', delimiter='\n') print('Labels from HDF5 exported to {}{}.'.format( self.path_csv_labels, type))
# ------------------------------------------------------------ # PREDICTIONS HANDLING # ------------------------------------------------------------
[docs] def load_predictions_csv(self, path: str, model: str, type: str = 'events'): """ Load a CSV file with predictions from a machine learning model for the given HDF5 dataset. :param path: The path to the file folder containing the CSV file with the predictions, e.g. "data/" --> file name "data/<model>_predictions_bck_001_type.csv". :type path: string :param type: The group in the HDF5 file corresponding to the predictions, either 'events', 'testpulses' or 'noise'. :type type: string :param model: The name of the model that made the predictions, e.g. "RF" --> Random Forest. :type model: string >>> ei.load_predictions_csv(path='./', model='RF') Loading Predictions from ./RF_predictions_test_001_events.csv. """ if not type in self.valid_types: raise ValueError('Type should be events, testpulses or noise.') if path == '': path = './' if path[-1] != '/': path = path + '/' self.path_csv_predictions = path + \ '{}_predictions_{}_'.format(model, self.fname) filename = self.path_csv_predictions + type + '.csv' print('Loading Predictions from {}.'.format(filename)) predictions = np.loadtxt(filename, delimiter='\n') predictions.resize((self.nmbr_channels, self.nmbrs[type])) # append the predictions if type not in self.predictions.keys(): self.predictions[type] = [] self.predictions[type].append(predictions) # append the model name if type not in self.model_names.keys(): self.model_names[type] = [] self.model_names[type].append(model)
[docs] def export_predictions(self, path: str, model: str, type: str = 'events'): """ Save the predictions from a machine learning model included in the HDF5 file as CSV file. :param path: The path to the file folder containing the predictions CSV, e.g. "data/" --> file name "data/<model>_predictions_bck_001_type.csv". :type path: string :param type: The name of the group in the HDF5 file, either 'events' or 'testpulses' or 'noise'. :type type: string :param model: The name of the model that made the predictions, e.g. "RF" --> Random Forest. :type model: string >>> ei.export_predictions(path='./', model='RF') RF Predictions from HDF5 exported to RF_predictions_test_001_events. """ with h5py.File(self.path_h5, 'r+') as f: if not type in self.valid_types: raise ValueError('Type should be events, testpulses or noise.') if path == '': path = './' if path[-1] != '/': path = path + '/' self.path_csv_predictions = path + \ '{}_predictions_{}_'.format(model, self.fname) # check if hdf5 file has labels if not f[type]['{}_predictions'.format(model)]: print('Load HDF5 File with labels first!') else: np.savetxt(self.path_csv_predictions + type + '.csv', np.array(f[type]['{}_predictions'.format(model)]), fmt='%i', delimiter='\n') print('{} Predictions from HDF5 exported to {}{}.'.format( model, self.path_csv_predictions, type))
# ------------------------------------------------------------ # FEATURE HANDLING # ------------------------------------------------------------ # Load OF
[docs] def load_of(self, down: int = 1, group_name_appendix: str = ''): """ Add the optimal transfer function from the HDF5 file. :param down: The downsample factor of the optimal transfer function. The data set of the optimumfilter in the HDF5 set has a consistent name appendix _downX. :type down: int :param group_name_appendix: A string that is appended to the group name optimumfilter in the HDF5 file. Typically this could be _tp for a test pulse optimum filter. :type group_name_appendix: str This is needed in order to view the filtered event in the labeling or viewing process. For this to work, the optimal transfer function must be included in the HDF5 file, e.g. calculated with an instance of the DataHandler before (dh.calc_of()). >>> ei.load_of() Added the optimal transfer function. """ with h5py.File(self.path_h5, 'r') as f: if down != 1: try: of_real = np.array( f['optimumfilter' + group_name_appendix]['optimumfilter_real_down{}'.format(down)]) of_imag = np.array( f['optimumfilter' + group_name_appendix]['optimumfilter_imag_down{}'.format(down)]) except KeyError: raise KeyError( 'Please calculate the OF with according downsampling rate.') else: of_real = np.array(f['optimumfilter' + group_name_appendix]['optimumfilter_real']) of_imag = np.array(f['optimumfilter' + group_name_appendix]['optimumfilter_imag']) self.of = of_real + 1j * of_imag print('Added the optimal transfer function.')
[docs] def load_sev_par(self, name_appendix='', sample_length=0.04, group_name_appendix: str = ''): """ Add the sev fit parameters from the HDF5 file. This is needed in order to view the fitted event in the labeling or viewing process. For this to work, the sev fit parameters for every event must be included in the HDF5 file, e.g. calculated with an instance of the DataHandler before (dh.apply_sev_fit()). :param name_appendix: An appendix to the data set sev_fit_par in the HDF5 set. Typically this is _downX in case a downsampling was used for the fit. :type name_appendix: string :param sample_length: The length of a sample in milliseconds, i.e. 1/sample_frequency. :type sample_length: float :param group_name_appendix: A string that is appended to the group name stdevent in the HDF5 file. Typically this could be _tp for a test pulse standard event. :type group_name_appendix: string >>> ei.load_sev_par() Added the sev fit parameters. """ # save this for loading of the parameters when viewing self.name_appendix = name_appendix with h5py.File(self.path_h5, 'r') as f: sev_par = np.array(f['stdevent' + group_name_appendix]['fitpar']) t = (np.arange(0, self.record_length, dtype=float) - self.record_length / 4) * sample_length self.fit_models = [] for c in range(self.nmbr_channels): self.fit_models.append(sev_fit_template(pm_par=sev_par[c], t=t)) print('Added the sev fit parameters.')
[docs] def load_arr_par(self, name_appendix='', group_name_appendix: str = '', use_this_array=None): """ Add the array fit parameters from the HDF5 file. :param name_appendix: An appendix to the data set arr_fit_par in the HDF5 set. :type name_appendix: string :param group_name_appendix: A string that is appended to the group name stdevent in the HDF5 file. Typically this could be _tp for a test pulse standard event. :type group_name_appendix: string :param use_this_array: :type use_this_array: """ # save this for loading of the parameters when viewing self.arr_name_appendix = name_appendix if use_this_array is not None: self.stdevents = np.array(use_this_array) else: with h5py.File(self.path_h5, 'r') as f: self.stdevents = np.array(f['stdevent' + group_name_appendix]['event']) print('Added the array.')
[docs] def load_saturation_par(self): """ Add the saturation fit parameters from the HDF5 file. This is needed to show the saturated, fitted events. """ with h5py.File(self.path_h5, 'r') as f: self.saturation_pars = np.array(f['saturation']['fitpar']) print('Added the saturation fit parameters.')
[docs] def set_threshold(self, threshold: list): """ Set a threshold to show for all channels. :param thresholds: The thresholds for all channels. :type thresholds: list of floats """ assert len(threshold) == self.nmbr_channels, 'You need to define one threshold for each channel!' self.threshold = threshold print('Set thresholds to: ', self.threshold)
# ------------------------------------------------------------ # LABEL AND VIEWER INTERFACE # ------------------------------------------------------------ def _plot_mp(self, main_par, down: int = 1, color: str = 'r', offset_in_samples: int = 0, xlim: tuple = None, offset_sub: float = 0): """ Function to plot the main parameters, typically accessed by the labeling tool internally. :param main_par: list of the 10 main parameters :param down: int, the downsample rate :param color: string, the color in which the mp are plotted :param offset_in_samples: int, an offset parameter from the beginning of the file :param xlim: tuple, the lower and upper limit of the x axis in the plot, in sample numbers :offset_sub: float, a value that is additionally substracted from the y values """ pulse_height = main_par[0] t_zero = main_par[1] t_rise = main_par[2] t_max = main_par[3] t_decaystart = main_par[4] t_half = main_par[5] t_end = main_par[6] offset = main_par[7] x_values = [(t_zero - offset_in_samples) / down, (t_rise - offset_in_samples) / down, (t_max - offset_in_samples) / down, (t_decaystart - offset_in_samples) / down, (t_half - offset_in_samples) / down, (t_end - offset_in_samples) / down] y_values = [ offset + 0.1 * pulse_height, offset + 0.8 * pulse_height, offset + pulse_height, offset + 0.9 * pulse_height, offset + 0.736 * pulse_height, offset + 0.368 * pulse_height ] x_values = np.array(x_values) y_values = np.array(y_values) - offset_sub if xlim is not None: mask = (x_values >= np.array(xlim[0])) & (x_values <= np.array(xlim[1])) plt.scatter(x_values[mask], y_values[mask], color=color, zorder=15) else: plt.scatter(x_values, y_values, color=color, zorder=15) # Access options of label interface def _viewer_options(self): """ Prints out all the options that are available in the event viewer/labeling tool """ print('---------- OPTIONS: ----------') print('down ... downsample') print('der ... show derivative of event') print('mp ... show main parameters') print('triang ... show triangulation') print('of ... show filtered event') print('sev ... show fitted standardevent') print('arr ... show fitted array') print('sat ... show fitted event with saturation') print('threshold ... show the trigger threshold') print('xlim ... set the x limit') print('ylim ... set the y limit') print('sub ... subtract offset') print('time ... plot time instead of sample index') print('window ... include window in of filtering') print('q ... quit options menu') while True: user_input = input('Choose option or q(uit): ') # downsample if user_input == 'down': user_input2 = input('Enter downsample factor (power of 2): ') try: down = int(user_input2) if math.log2(down).is_integer(): self.down = down self.window_size = int(self.record_length / down) print('Downsample rate set to {}.'.format(self.down)) else: print( 'Downsample rate has to be integer (power of 2).') except ValueError: print('Downsample rate has to be integer (power of 2).') # derivative elif user_input == 'der': self.show_derivative = not self.show_derivative self.show_filtered = False print('Show derivative set to: ', self.show_derivative) # optimum filter elif user_input == 'of': assert self.of is not None, 'You need to load an optimal filter first!' self.show_filtered = not self.show_filtered self.show_derivative = False print('Show filtered set to: ', self.show_filtered) # triangulation elif user_input == 'triang': self.show_triangulation = not self.show_triangulation print('Show triangulation set to: ', self.show_triangulation) # main parameters elif user_input == 'mp': self.show_mp = not self.show_mp print('Show Main Parameters set to: ', self.show_mp) # sev fit elif user_input == 'sev': assert self.fit_models is not None, 'You need to load standard event fit parameters first!' self.sev = not self.sev print('Show SEV fit set to: ', self.sev) # arr fit elif user_input == 'arr': assert self.stdevents is not None, 'You need to load standard event fit parameters first!' self.arr = not self.arr print('Show SEV fit set to: ', self.arr) # saturation elif user_input == 'sat': assert self.saturation_pars is not None, 'You need to load saturation fit parameters first!' self.saturation = not self.saturation print('Show saturated fit set to: ', self.sev) # elif user_input == 'threshold': assert self.threshold is not None, 'You need to define the threshold first!' self.show_threshold = not self.show_threshold print('Show threshold set to: ', self.show_threshold) # xlim elif user_input == 'xlim': user_input2 = input('Set x lower limit: ') lb = float(user_input2) user_input2 = input('Set x upper limit: ') ub = float(user_input2) self.xlim = (lb, ub) # ylim elif user_input == 'ylim': user_input2 = input('Set y lower limit: ') lb = float(user_input2) user_input2 = input('Set y upper limit: ') ub = float(user_input2) self.ylim = (lb, ub) elif user_input == 'sub': self.subtract_offset = not self.subtract_offset print('Subtract offset set to: ', self.subtract_offset) elif user_input == 'time': self.show_time = not self.show_time print('Plot time instead of index set to: ', self.show_time) elif user_input == 'window': self.window = not self.window print('Include window in OF plot set to: ', self.window) # quit elif user_input == 'q': print('Quit options menu.') break else: print('Please enter a valid option or q to end.') # Show specific sample idx from the dataset
[docs] def show(self, idx: int, type: str = 'events'): """ Plots an event of the dataset. :param idx: The index of the event that is to show in the hdf5 file. :type idx: int :param type: The containing group in the HDF5 data set, either 'events', 'testpulses' or 'noise'. :type type: string >>> ei.show(idx=0) Label Phonon: 0.0 Label Light: 0.0 .. image:: pics/event.png """ with h5py.File(self.path_h5, 'r+') as f: if not type in self.valid_types: raise ValueError('Type should be events, testpulses or noise.') # get event event = np.array(f[type]['event'][:, idx, :]) appendix = '' # downsample first if not self.down == 1: event = event.reshape((self.nmbr_channels, self.window_size, self.down)) event = np.mean(event, axis=2) # threshold if self.show_threshold: threshold = self.threshold + np.mean( event[:, :int(len(event[0]) * self.pre_trigger_region / self.down)], axis=1) # optimum filter if self.show_filtered: try: for c in range(self.nmbr_channels): offset = np.mean(event[c, :int(len(event[c]) * self.pre_trigger_region / self.down)]) event[c] = filter_event(event[c] - offset, self.of[c], window=self.window) + offset except ValueError: raise ValueError(console_colors.FAIL + 'ERROR: ' + console_colors.ENDC + 'The downsampling rate of the loaded OF and the Events needs to be the same!') appendix = 'Filtered' # derivative if self.show_derivative: event = self.down * \ np.diff(event, axis=1, prepend=event[:, 0, np.newaxis]) appendix = 'Derivative' # triangulation if self.show_triangulation: elements = [] for i in range(self.nmbr_channels): elements.append( get_elements(event[i], std_thres=self.std_thres[i])) # mp if self.show_mp: main_par = np.array(f[type]['mainpar'][:, idx]) # sev if self.sev: try: sev_fit = [] fp = f[type]['sev_fit_par{}'.format(self.name_appendix)][:, idx, :] for c in range(self.nmbr_channels): sev_fit.append(self.fit_models[c]._wrap_sec(*fp[c])) if self.saturation: offset = fp[c][2] sev_fit[-1] = scaled_logistic_curve(sev_fit[-1] - offset, *self.saturation_pars[c]) + offset except AttributeError: raise AttributeError('No name_appendix attribute, did you load the SEV fit parameters?') # arr if self.arr: try: arr_fit = [] fp = f[type]['arr_fit_par{}'.format(self.arr_name_appendix)][:, idx, :] for c in range(self.nmbr_channels): t = (np.arange(0, event.shape[1], 1) - event.shape[1] / 4) * self.down / self.sample_frequency * 1000 sev = self.stdevents[c] timebase_ms = 1000 / self.sample_frequency max_shift = np.abs(fp[c, 1]) + 1 sample_bounds = int(max_shift / 1000 * self.sample_frequency) arr_fit.append(np.pad(array=template(*fp[c], t[sample_bounds:-sample_bounds], sev, timebase_ms, max_shift), pad_width=sample_bounds, mode='edge')) if self.saturation: offset = fp[c][2] arr_fit[-1] = scaled_logistic_curve(arr_fit[-1] - offset, *self.saturation_pars[c]) + offset except AttributeError: raise AttributeError('No arr_name_appendix attribute, did you load the arr fit parameters?') # def colors if self.nmbr_channels == 1: colors = ['blue'] anti_colors = ['red'] else: colors = ['red' for i in range(self.nmbr_channels - 1)] colors.append('blue') anti_colors = ['blue' for i in range(self.nmbr_channels - 1)] anti_colors.append('red') # -------- START PLOTTING -------- use_cait_style(dpi=self.dpi) plt.close() for i in range(self.nmbr_channels): if self.subtract_offset: offset = np.mean(event[i, :int(len(event[i]) * self.pre_trigger_region / self.down)]) else: offset = 0 x = np.arange(0, len(event[i]), 1) if self.xlim is not None: mask = x[np.logical_and(x >= np.array(self.xlim[0]), x <= np.array(self.xlim[1]))] else: mask = x if self.show_time: x = (np.arange(0, len(event[i]), 1) - len(event[i]) / 4) * self.down / self.sample_frequency plt.subplot(self.nmbr_channels, 1, i + 1) if not self.show_time: plt.axvline(x=self.window_size / 4, color='grey', alpha=0.6) else: plt.axvline(x=0, color='grey', alpha=0.6) plt.plot(x[mask], event[i][mask] - offset, label=self.channel_names[i], color=colors[i], zorder=10) if self.show_threshold: plt.axhline(y=threshold[i], color='black', alpha=0.8, linewidth=2.5, linestyle='dotted', zorder=30) plt.title('Index {}, {} {}'.format(idx, self.channel_names[i], appendix)) # triangulation if self.show_triangulation: if not self.show_time: plot_S1(event[i], elements[i], color=anti_colors[i], xlim=self.xlim, offset=offset) else: print('Cannot show time and mp.') # main parameters if self.show_mp: if not self.show_time: self._plot_mp(main_par[i], color=anti_colors[i], down=self.down, xlim=self.xlim, offset_sub=offset) else: print('Cannot show time and mp.') # sev if self.sev: plt.plot(x[mask], sev_fit[i][mask] - offset, color='orange', zorder=15) # arr if self.arr: plt.plot(x[mask], arr_fit[i][mask] - offset, color='c', zorder=15) make_grid() plt.ylim(self.ylim) plt.show(block=False) # -------- END PLOTTING -------- # labels if len(self.labels) > 0: try: label = self.labels[type][:, idx] for i, nm in enumerate(self.channel_names): print('Label {}: {}'.format(nm, label[i])) except NameError: print('No or incorrect Labels.') # predictions if len(self.predictions) > 0: for p_arr in self.predictions[type]: pred = p_arr[:, idx] for i, nm in enumerate(self.channel_names): print('Prediction {}: {}'.format(nm, pred[i])) # TPA if type == 'testpulses': tpa = f['testpulses']['testpulseamplitude'] if len(tpa.shape) > 1: tpa = f['testpulses']['testpulseamplitude'][:, idx] else: tpa = f['testpulses']['testpulseamplitude'][idx] print('TPA: {}'.format(tpa))
def _print_labels(self): """ Prints the labels that are available. The list can be expanded for any given use case. """ print('---------- LABELS: ----------') print('0 ... unlabeled') print('1 ... Event Pulse') print('2 ... Test/Control Pulse') print('3 ... Noise') print('4 ... Squid Jump') print('5 ... Spike') print('6 ... Early or late Trigger') print('7 ... Pile Up') print('8 ... Carrier Event') print('9 ... Strongly Saturated Event Pulse') print('10 ... Strongly Saturated Test/Control Pulse') print('11 ... Decaying Baseline') print('12 ... Temperature Rise') print('13 ... Stick Event') print('14 ... Square Waves') print('15 ... Human Disturbance') print('16 ... Large Sawtooth') print('17 ... Cosinus Tail') print('18 ... Light only Event') print('19 ... Ring & Light Event') print('20 ... Sharp Light Event') print('99 ... unknown/other') def _ask_for_options(self, user_input): if user_input == 'q': return -1 elif user_input == 'b': return -2 elif user_input == 'n': return -3 elif user_input == 'o': return -4 elif user_input == 'i': return -5 elif user_input == 'p': return -6 else: print( 'Enter q end, b back, n next, o options, i idx, p for (de)activate label list\n') def _ask_for_label(self, idx: int, which: str = 'phonon'): """ Takes and processes an user input to the viewer/labeling tool. :param idx: int, the index of the event that is to label in the h5 file :param which: string, the naming of the channel, e.g. phonon/light :return: int > 0 or option code (int < 0) if the user input was one of the option flag """ print( 'Assign label for event idx: {} {} (q end, b back, n next, o options, i idx, p for (de)activate label list)\n'.format( idx, which)) while True: user_input = input('{}: '.format(which)) try: label = int(user_input) if label > 0: return label else: print( 'Enter Integer > 0 or q end, b back, n next, o options, i idx, p for (de)activate label list') except ValueError: return self._ask_for_options(user_input) def _ask_for_idx(self, length: int): """ Gets an index from the user to which we want to jump. :param length: int, maximal index the user may put :return: int, the index the used put :raises ValueError if the user input was not a valid index """ while True: user_input = input('Jump to which index? ') try: idx = int(user_input) if (idx < 0) or (idx >= length): raise ValueError('This is not a valid index!') else: print('Jumping to index ', idx) return idx except ValueError: print('Enter valid index!')
[docs] def start(self, start_from_idx: int = 0, print_label_list: bool = True, label_only_class: int = None, label_only_prediction: int = None, model: str = None, viewer_mode: bool = False): """ Starts the label/view interface. The user gets the events shown and is asked for labels. There are viewer options available: - n … next sample - b … previous sample - idx ... user is asked for an index to jump to - q … quit - o … show set options and ask for changes, options are down - der - mp - predRF - predLSTM - triang - of - … - q The options in the options menu change the displayed event: - down ... Event is downsampled before plotting, this smoothes the noise. - der ... The derivative of the event is shown. - mp ... The main parameters are visualized as scatter points in the plot. - triang ... A triangulation of the event is shown. This feature is experimental and not supported any longer. - of ... The filtered event is shown. - q ... Quit the menu and go back to the labeling interface. There are more options, explore them in the options menu! :param start_from_idx: An event index to start labeling from. :type start_from_idx: int :param print_label_list: If set to true, the list of the labels is printed together when the user is asked for labels. :type print_label_list: bool :param label_only_class: If set only events of this class will be shown in the labeling/viewing process. :type label_only_class: int :param label_only_prediction: If set only events of this prediction will be shown in the labeling/viewing process. :type label_only_prediction: int :param model: The naming of the model that made the predictions, e.g. 'RF' for Random Forest :type model: string :param viewer_mode: Activates viewer mode. Labelling is not possible while in viewer mode. :type viewer_mode: bool """ if label_only_class: print('Start labeling from idx {}, label only class {}.'.format( start_from_idx, label_only_class)) # label_all_classes = False elif label_only_prediction: print('Start labeling from idx {}, label only prediction {}.'.format( start_from_idx, label_only_prediction)) # label_all_classes = False else: print('Start labeling from idx {}.'.format(start_from_idx)) # label_all_classes = True if not viewer_mode: try: print('Labels autosave to {}.'.format(self.path_csv_labels)) except: while True: print("No Labels file! Do you want to start in viewer mode (y/n)?") viewer_mode = input() if viewer_mode.lower() == 'y': viewer_mode = True print('You have selected viewer mode!') print( 'Navigate through events by pressing b back or n next. All other options are also available.') break elif viewer_mode.lower() == 'n': raise AttributeError('Load or create labels file first!') else: print('Please enter a valid input! Either y or n') for type in self.which_to_label: idx = np.copy(start_from_idx) while idx < self.nmbrs[type]: if label_only_class is not None: class_condition = (label_only_class == self.labels[type][:, idx]).any() else: class_condition = True if label_only_prediction is not None: preds = self.predictions[type][self.model_names[type].index(model)][:, idx] prediction_condition = (label_only_prediction == preds).any() del preds else: prediction_condition = True if class_condition and prediction_condition: # or label_all_classes: if print_label_list and not viewer_mode: self._print_labels() self.show(idx, type) for i, channel in enumerate(self.channel_names): if not viewer_mode: user_input = self._ask_for_label(idx, channel) else: user_input = input( 'Enter q end, b back, n next, o options, i idx, p for (de)activate label list') user_input = self._ask_for_options(user_input) if user_input == -1: print('End labeling.') idx = self.nmbrs[type] break elif user_input == -2: print('Rolling back to previous.') idx -= 2 break elif user_input == -3: print('Skipping this label.') break elif user_input == -4: self._viewer_options() idx -= 1 break elif user_input == -5: idx = self._ask_for_idx(self.nmbrs[type]) - 1 break elif user_input == -6: print_label_list = not print_label_list idx -= 1 break elif not viewer_mode: self.labels[type][i, idx] = user_input np.savetxt(self.path_csv_labels + type + '.csv', self.labels[type], fmt='%i', delimiter='\n') else: print('Only option keys are valid!') pass idx += 1