import numpy as np
import numba as nb
[docs]
@nb.njit
def add_to_moments(x_new, mean, var, n):
"""
Update existing mean and variance values by adding a new data point.
:param x_new: The new data point.
:type x_new: float
:param mean: The old mean value.
:type mean: float
:param var: The old variance value.
:type var: float
:param n: The number of data points used to calculate the old mean and var.
:type n: int
:return: The new mean value, the new variance, the new number of data points.
:rtype: list
"""
mean_n_minus_one = mean
var_n_minus_one = var
n_minus_one = n
n += 1
mean = x_new / n + n_minus_one / n * mean_n_minus_one
var = 1 / n_minus_one * ((x_new - mean) * (x_new - mean_n_minus_one) + (n - 2) * var_n_minus_one)
return mean, var, n
[docs]
@nb.njit
def sub_from_moments(x_new, mean, var, n):
"""
Update existing mean and variance values by substracting a new data point.
:param x_new: The new data point.
:type x_new: float
:param mean: The old mean value.
:type mean: float
:param var: The old variance value.
:type var: float
:param n: The number of data points used to calculate the old mean and var.
:type n: int
:return: The new mean value, the new variance, the new number of data points.
:rtype: list
"""
n_minus_one = n - 1
mean_n = mean
mean_n_minus_one = n / n_minus_one * (mean - x_new / n)
var_n_minus_one = 1 / (n - 2) * (n_minus_one * var - (x_new - mean_n) * (x_new - mean_n_minus_one))
n -= 1
mean = mean_n_minus_one
var = var_n_minus_one
if var < 0: # in case of numerical issues
var = 1e-8
return mean, var, n
[docs]
@nb.njit
def find_peaks(array, lag, threshold, init_mean, init_var, fixed_var=0):
"""
Find the peaks in an array and return an array with 1/0/-1.
Based on https://stackoverflow.com/a/22640362/15216821.
:param array: The array in which we want to detect peaks.
:type array: list
:param lag: The size of the window which we use for calculating a mean and variance.
:type lag: int
:param threshold: The sigma value of the threshold
:type threshold: float
:param init_mean: Here we can provide an initial mean value, otherwise this is extracted from the data.
:type init_mean: float
:param init_var: Here we can provide an initial variance value, otherwise this is extracted from the data.
:type init_var: float
:param fixed_var: In case you don't want to calculate the variance with a moving window, you can provide a fixed
value.
:type fixed_var: float
:return: The array with 1/0/-1, corresponding to positive/negative peaks at the sample position. The means at all
sample positions. The variances at all sample positions.
:rtype: list
"""
n = lag
if init_mean is None:
mean = np.mean(array[:lag])
else:
mean = init_mean
if init_var is None:
variance = var(array[:lag])
else:
variance = init_var
signal = np.zeros(len(array))
all_means = np.full(len(array), mean)
all_vars = np.full(len(array), variance)
for i in range(array.shape[0]):
if fixed_var > 0:
variance = fixed_var
if np.abs(array[i] - mean) > threshold * np.sqrt(variance):
if array[i] > mean:
signal[i] = 1
else:
signal[i] = -1
if i >= lag:
mean, variance, n = add_to_moments(array[i], mean, variance, n)
mean, variance, n = sub_from_moments(array[i - lag], mean, variance, n)
all_means[i] = mean
all_vars[i] = variance
return signal, all_means, all_vars
[docs]
@nb.njit
def get_triggers(array, lag, threshold, init_mean, init_var, look_ahead, fixed_var=0):
"""
Trigger the peaks in an array and return their indices and properties.
Based on https://stackoverflow.com/a/22640362/15216821.
:param array: The array in which we want to detect peaks.
:type array: list
:param lag: The size of the window which we use for calculating a mean and variance.
:type lag: int
:param threshold: The sigma value of the threshold
:type threshold: float
:param init_mean: Here we can provide an initial mean value, otherwise this is extracted from the data.
:type init_mean: float
:param init_var: Here we can provide an initial variance value, otherwise this is extracted from the data.
:type init_var: float
:param look_ahead: Look this number of samples in the future if another peak is higher than this one and overwrite
if so.
:type look_ahead: int
:param fixed_var: In case you don't want to calculate the variance with a moving window, you can provide a fixed
value.
:type fixed_var: float
:return: The array with the triggered indices. The corresponding peak heights. The means at all
sample positions. The variances at all sample positions.
:rtype: list
"""
n = lag
if init_mean is None:
mean = np.mean(array[:lag])
else:
mean = init_mean
if init_var is None:
variance = var(array[:lag])
else:
variance = init_var
block = 0
signal = []
heights = []
all_means = []
all_vars = []
for i in range(array.shape[0]):
if fixed_var > 0:
variance = fixed_var
if array[i] - mean > threshold * np.sqrt(variance):
if block == 0:
height = np.max(array[i:i+look_ahead])
idx = i + np.argmax(array[i:i+look_ahead])
signal.append(idx)
heights.append(height - 2 * np.sqrt(variance) - mean)
all_means.append(mean)
all_vars.append(variance)
block += look_ahead
if i >= lag:
mean, variance, n = add_to_moments(array[i], mean, variance, n)
mean, variance, n = sub_from_moments(array[i - lag], mean, variance, n)
if block > 0:
block -= 1
return signal, heights, all_means, all_vars
@nb.njit
def var(x):
"""
Numba surrogate for calculating the sample variance.
:param x: The elements of which we want to calculate the variance.
:type x: list
:return: The variance of the array.
:rtype: float
"""
m = np.mean(x)
return 1 / (x.shape[0] - 1) * np.sum((x - m) ** 2)