Source code for cait.features._mp

import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as sci

from ..fit._bl_fit import fit_quadratic_baseline
from ..filter._ma import box_car_smoothing
from ..filter._of import filter_event

# ------------------------------------------------------------
# MAIN PARAMETERS CLASS
# ------------------------------------------------------------

[docs]class MainParameters(): """ Class to contain the main parameters. :param pulse_height: float, the height of the event :param t_zero: int, the sample index where the rise starts :param t_rise: int, the sample index where the rise reaches 80% :param t_max: int, the sample index of the max event :param t_decaystart: int, the sample index where the peak falls down to 90% :param t_half: int, the sample index where the peak falls down to 73% :param t_end: int, the sample index where the peak falls down to 36% :param offset: float, the mean of the first 1/8 of the record length :param linear_drift: float, the linear slope of the event baseline :param quadratic_drift: float, the quadratic slope of the event baseline """ def __init__(self, pulse_height=0, t_zero=0, t_rise=0, t_max=0, t_decaystart=0, t_half=0, t_end=0, offset=0, linear_drift=0, quadratic_drift=0): self.pulse_height = pulse_height self.t_zero = t_zero self.t_rise = t_rise self.t_max = t_max self.t_decaystart = t_decaystart self.t_half = t_half self.t_end = t_end self.offset = offset self.linear_drift = linear_drift self.quadratic_drift = quadratic_drift
[docs] def print_all(self): """ Method to print all the stored main parameters """ print('Pulse height: ', self.pulse_height) print('Index of Rise Start: ', self.t_zero) print('Index of Rise End: ', self.t_rise) print('Index of Maximum: ', self.t_max) print('Index of Decay Start: ', self.t_decaystart) print('Index of Half Decay: ', self.t_half) print('Index of End Decay: ', self.t_end) print('Offset: ', self.offset) print('Linear Drift: ', self.linear_drift) print('Quadratic drift: ', self.quadratic_drift)
[docs] def compare(self, other): """ Method to compare the main parameters with those of another instance. :param other: the other instance of main parameters :return: bool, states if the main parameters are the same """ if self.pulse_height != other.pulse_height: return False if self.t_zero != other.t_zero: return False if self.t_rise != other.t_rise: return False if self.t_max != other.t_max: return False if self.t_decaystart != other.t_decaystart: return False if self.t_half != other.t_half: return False if self.t_end != other.t_end: return False if self.offset != other.offset: return False if self.linear_drift != other.linear_drift: return False if self.quadratic_drift != other.quadratic_drift: return False return True
[docs] def getArray(self): """ Returns an array with the main parameters that are stored :return: 1D array length 10, the main parameters """ return np.array([self.pulse_height, self.t_zero, self.t_rise, self.t_max, self.t_decaystart, self.t_half, self.t_end, self.offset, self.linear_drift, self.quadratic_drift])
[docs] def plotParameters(self, down=1, offset_in_samples=0, color = 'r', zorder=10, fig=None): """ Plots the main parameters on overlaid to an event :param down: int, the downsample rate of the event that should be overlaid :param offset_in_samples: int, set if the x axis does not start at zero :param color: string, the color of the main parameters in the scatter plot :param zorder: int, the plot with the highest zorder is plot on top of the others this should be choosen high, such that the main parameters are visible :param fig: object, the pyplot figure to which we want to plot the parameters """ # t = np.linspace(0, nmbr_samples-1, nmbr_samples) - nmbr_samples/4 # start rise, top rise, max, start decay, half decay, end decay x_values = [(self.t_zero - offset_in_samples) / down, (self.t_rise - offset_in_samples) / down, (self.t_max - offset_in_samples) / down, (self.t_decaystart - offset_in_samples) / down, (self.t_half - offset_in_samples) / down, (self.t_end - offset_in_samples) / down] y_values = [ self.offset + 0.2 * self.pulse_height, self.offset + 0.8 * self.pulse_height, self.offset + self.pulse_height, self.offset + 0.9 * self.pulse_height, self.offset + 0.736 * self.pulse_height, self.offset + 0.368 * self.pulse_height] if zorder == 0: if fig is None: fig.scatter(x_values, y_values, color=color) else: plt.scatter(x_values, y_values, color=color) else: if fig is None: fig.scatter(x_values, y_values, color=color, zorder=zorder) else: plt.scatter(x_values, y_values, color=color, zorder=zorder)
[docs] def get_differences(self): """ Return the differences in the samples times :return: 1D array with length 2: (length_rise, length_peak, length_decay) """ length_rise = self.t_rise - self.t_zero length_peak = self.t_decaystart - self.t_rise length_decay = self.t_end - self.t_decaystart return length_rise, length_peak, length_decay
# ------------------------------------------------------------ # CALCULATE MAIN PARAMETERS FUNCTION # ------------------------------------------------------------ def get_times(t_zero, t_rise, t_decaystart, t_half, t_end): """ Calculate the Rise Length, Peak Length, Length of First and Seconf Half of Decay :param t_zero: integer, time of the pulse onset :param t_rise: integer, time when rise is finished :param t_decaystart: integer, end of peak :param t_half: integer, half of decay :param t_end: integer, decay over time :return: (int: length of the rise, int: length of peak, int: length of first decay half, int: length of second decay half) """ length_rise = t_rise - t_zero length_peak = t_decaystart - t_rise length_firsthalfdecay = t_half - t_decaystart length_secondhalfdecay = t_end - t_half return length_rise, length_peak, length_firsthalfdecay, length_secondhalfdecay
[docs]def calc_main_parameters(event, down=1, max_bounds=None, quad_drift=False): """ Calculates the Main Parameters for an Event. Optional, the event can be downsampled by a given factor befor the calculation :param event: 1D array, the event :param down: integer, power of 2, the factor for downsampling :param max_bounds: tuple, the lower and upper index of the interval within which the maximum is searched :param quad_drift: bool, include quadratic drift in the calculation :return: instance of MainParameters, see :class:`~simulate.MainParameters` """ length_event = len(event) if max_bounds is None: max_bounds = [0, length_event] offset = np.mean(event[:int(length_event/8)]) # smoothing or downsampling if down == 1: event_smoothed = box_car_smoothing(event - offset) else: event_smoothed = event.reshape(int(length_event / down), down) event_smoothed = np.mean(event_smoothed, axis=1) event_smoothed = event_smoothed - offset max_bounds[0] = int(max_bounds[0]/down) max_bounds[1] = int(max_bounds[1] / down) length_event_smoothed = len(event_smoothed) # get the maximal pulse height and the time of the maximum maximum_pulse_height = np.max(event_smoothed[max_bounds[0]:max_bounds[1]]) # [idx_lower_region : idx_upper_region] # maximum_index = np.argmax(event_smoothed) # [idx_lower_region : idx_upper_region] + idx_lower_region if maximum_pulse_height > np.std(event_smoothed) or not quad_drift: # typically this will be the case # get baseline offset and linear drift # offset = event_smoothed[0] linear_drift = (np.mean(event_smoothed[-int(500/down):]) - np.mean(event_smoothed[:int(500/down)])) / length_event quadratic_drift = 0 else: # if the pulse is very small then fit a quadratic baseline offset_fit, linear_drift, quadratic_drift = fit_quadratic_baseline(event_smoothed) offset = offset + offset_fit # get rid of the offset and linear drift event_nodrift = event_smoothed - linear_drift * np.linspace(0, length_event_smoothed - 1, length_event_smoothed) if quadratic_drift != 0: event_nodrift = event_nodrift - quadratic_drift * np.linspace(0, length_event_smoothed - 1, length_event_smoothed) ** 2 maximum_index = int(np.argmax(event_nodrift[max_bounds[0]:max_bounds[1]]) + max_bounds[0]) # maximum_pulse_height = event_smoothed[maximum_index] maximum_pulse_height_condition = event_smoothed[maximum_index] # get the times t_zero = np.where(event_smoothed[:maximum_index] < 0.2 * maximum_pulse_height_condition) if t_zero[0].size > 0: t_zero = t_zero[0][-1] else: t_zero = 0 t_rise = np.where(event_smoothed[t_zero:] > 0.8 * maximum_pulse_height_condition) if t_rise[0].size > 0: t_rise = t_rise[0][0] + t_zero else: t_rise = t_zero t_end = np.where(event_smoothed[maximum_index:] < 0.368 * maximum_pulse_height_condition) if t_end[0].size > 0: t_end = t_end[0][0] + maximum_index else: t_end = length_event_smoothed - 1 t_half = np.where(event_smoothed[:t_end+1] > 0.736 * maximum_pulse_height_condition) if t_half[0].size > 0: t_half = t_half[0][-1] else: t_half = t_end t_decaystart = np.where(event_smoothed[:t_half+1] > 0.9 * maximum_pulse_height_condition) if t_decaystart[0].size > 0: t_decaystart = t_decaystart[0][-1] else: t_decaystart = t_half # return all parameters main_parameters = MainParameters(pulse_height=maximum_pulse_height, t_zero=t_zero * down, t_rise=t_rise * down, t_max=maximum_index * down, t_decaystart=t_decaystart * down, t_half=t_half * down, t_end=t_end * down, offset=offset, linear_drift=linear_drift, quadratic_drift=quadratic_drift) return main_parameters
[docs]def expectation(x, dist): """ Calculate the expectation value of a random value function :param x: 1D array, the function of the random vaiable, applied to the x value array :param dist: 1D array of same length as the other array, the values of the distribution """ return (1/len(dist))*np.dot(x, dist)
[docs]def distribution_skewness(density): """ Calculate the skewness of a distribution, given the density as array :param density: 1D array, the density of the distribution we want the skewness from :return: float, the skewness of the given distribution """ density = density/np.sum(density) x = np.arange(len(density)) mu = expectation(x, density) sigma = expectation((x-mu)**2, density) std_scores = (x-mu)/sigma skew = expectation(std_scores**3, density) return skew
[docs]def calc_additional_parameters(event, optimal_transfer_function, down=1): """ Calculate parameters additionally to the main parameters :param event: array, the event of which we want to calculate the additional main parameters :param optimal_transfer_function: array, the optimum filter transfer function; if None, the of values are filled with zeros instead :param down: int, if we want to downsample the event by a factor, this can be done here :return: List of float values parameters (maximum of array, minimum of array, variance of first 1/8 or array, mean of first 1/8 or array, variance of last 1/8 or array, mean of last 1/8 or array, variance of whole array, mean of whole array, skewness of whole array, maximum of the derivative of the array, index of maximum of the derivative of the array, minimum of the derivative of the array, index of minimum of the derivative of the array, maximum of the filtered array, index of the maximum of the filtered array, distribution skewness of the filtered samples around the max of filtered array) """ length_event = len(event) event = event - np.mean(event[:int(length_event/8)]) # smoothing if down == 1: event_smoothed = box_car_smoothing(event) else: event_smoothed = event.reshape(int(length_event / down), down) event_smoothed = np.mean(event_smoothed, axis=1) length_event_smoothed = len(event_smoothed) # Max and Min max = np.max(event_smoothed) min = np.min(event_smoothed) # Variance and Mean of first 1 / 8 and last 1 / 8 var_start = np.var(event_smoothed[:int(length_event_smoothed/8)]) mean_start = np.mean(event_smoothed[:int(length_event_smoothed/8)]) var_end = np.var(event_smoothed[-int(length_event_smoothed/ 8):]) mean_end = np.mean(event_smoothed[-int(length_event_smoothed/ 8):]) # Variance, Mean and Skewness of whole array var = np.var(event_smoothed) mean = np.mean(event_smoothed) skew = sci.skew(event_smoothed) # Max, Min Derivative and position der = np.diff(event_smoothed) der_max = np.max(der) der_maxind = np.argmax(der) der_min = np.min(der) der_minind = np.argmin(der) # Min Value OF: Position maximum and skewness of samples around if optimal_transfer_function is not None: filtered = filter_event(event, transfer_function=optimal_transfer_function) filtered_max = np.max(filtered[int(length_event/8):-int(length_event/8)]) filtered_maxind = np.argmax(filtered[int(length_event/8):-int(length_event/8)]) + int(length_event/8) filtered_skew = distribution_skewness(filtered[filtered_maxind-int(length_event/32):filtered_maxind+int(length_event/32)]) else: filtered_max = 0 filtered_maxind = 0 filtered_skew = 0 return np.array([max, # 0 min, # 1 var_start, # 2 mean_start, # 3 var_end, # 4 mean_end, # 5 var, # 6 mean, # 7 skew, # 8 der_max, # 9 der_maxind, # 10 der_min, # 11 der_minind, # 12 filtered_max, # 13 filtered_maxind, # 14 filtered_skew], # 15 )