# -----------------------------------------------------------
# IMPORTS
# -----------------------------------------------------------
import numpy as np
from ..fit._templates import baseline_template_quad, baseline_template_cubic
from scipy.optimize import curve_fit
from scipy.stats import norm, uniform
from tqdm.auto import tqdm
from scipy import signal
# -----------------------------------------------------------
# FUNCTIONS
# -----------------------------------------------------------
[docs]def get_nps(x):
"""
Calculates the Noise Power Spectrum (NPS) of a given array.
:param x: The time series.
:type x: 1D numpy array of size N
:return: The noise power spectrum.
:rtype: 1D numpy array of size N/2 + 1
"""
x = np.fft.rfft(x)
x = np.abs(x) ** 2
return x
[docs]def noise_function(nps):
"""
Simulates the function f from CC-Noise Algo
with a given Noise Power Spectrum (NPS)
see arXiv:1006.3289, eq (4) and (5)
:param f: The noise power spectrum.
:type f: real valued 1D array of size N/2 + 1
:return: Noise Baselines.
:rtype: real valued 1D array of size N
"""
f = np.sqrt(nps)
f = np.array(f, dtype='complex') # create array for frequencies
Np = (len(f) - 1) // 2
phases = np.random.rand(Np) * 2 * np.pi # create random phases
phases = np.cos(phases) + 1j * np.sin(phases)
f[1:Np + 1] *= phases
f[-1:-1 - Np:-1] = np.conj(f[1:Np + 1])
return np.fft.irfft(f)
[docs]def get_cc_noise(nmbr_noise,
nps,
lamb=0.01):
"""
Simulation of a noise baseline, according to Carretoni Cremonesi: arXiv:1006.3289
:param nmbr_noise: Number of noise baselines to simulate.
:type nmbr_noise: int > 0
:param nps: Noise power spectrum of the baselines,
e.g. generated with scipy.fft.rfft().
:type nps: 1D array of odd size
:param lamb: Parameter of the method (overlap between ).
:type lamb: integer > 0
:return: The simulated baselines.
:rtype: 2D array of size (nmbr_noise, 2*(len(nps)-1))
"""
T = (len(nps) - 1)*2
# alpha is fixed by the function we use to 1
a = np.sqrt(1/lamb/T)
noise = np.zeros((nmbr_noise, T))
for i in tqdm(range(nmbr_noise)):
t = 0
while t < T:
noise[i] += norm.rvs(scale=a) * np.roll(noise_function(nps), t)
t = int(t - np.log(1 - uniform.rvs())/lamb)
return noise
def calculate_mean_nps(baselines,
order_polynom=3,
clean=True,
percentile=50,
down=1,
sample_length = 0.00004,
rms_baselines=None,
rms_cutoff=None,
window=True):
"""
Calculates the mean Noise Power Spectrum (mNPS) of a set of baselines,
after cleaning them from artifacts with a polynomial fit.
:param baselines: The baselines for the mean NPS.
:type baselines: 2D array of size (nmbr_baselines, record_length)
:param order_polynom: 2 or 3, the order of the fitted polynomial.
:type order_polynom: int
:param clean: If True the baselines are cleaned from artifacts with a poly fit.
:type clean: boolean
:param percentile: The percentile of the Fit RMS that is still used
for the calculation of the mNPS.
:type percentile: float between 0 and 1
:param down: Downsample the baselines before the calculation of the NPS - must be 2^x.
:type down: int
:param sample_length: The length of one sample from the time series in seconds.
:type sample_length: float
:param rms_cutoff: Only baselines with a fit rms below this values are included in the NPS calculation. This
will overwrite the percentile argument, if it is not set to None.
:type rms_cutoff: float
:param window: If True, a window function is applied to the noise baselines before the calculation of the NPS.
:type window: bool
:return: Tuple of (the mean NPS, the cleaned baselines).
:rtype: (1D array of size (record_length/2 + 1), 2D array of size (percentile*nmbr_baselines, record_length))
"""
# downsample the baselines
if down > 1:
baselines = np.mean(baselines.reshape(len(baselines),
int(len(baselines[0])/down),
down), axis=2)
record_length = baselines.shape[1]
# substract offset
baselines -= np.mean(baselines[:, :int(record_length / 8)], axis=1, keepdims=True)
# clean baselines
if clean:
if rms_baselines is None:
print('Fitting baselines.')
# choose baseline template
if order_polynom == 2:
bl_temp = baseline_template_quad
elif order_polynom == 3:
bl_temp = baseline_template_cubic
else:
bl_temp = None
print('PLEASE USE POLYNOMIAL ORDER 2 OR 3!!')
# fit polynomial coefficients
coefficients = np.zeros([len(baselines), order_polynom + 1])
t = np.linspace(0, record_length * sample_length, record_length)
for i in range(len(baselines)):
coefficients[i], _ = curve_fit(bl_temp, t, baselines[i])
# throw high rms
rms_baselines = []
for bl, coeff in tqdm(zip(baselines, coefficients)):
baseline_fit = bl_temp(t, *coeff)
baseline_fit = np.array(baseline_fit)
rms_baselines.append(np.sum((bl - baseline_fit) ** 2))
# baselines_polynomials = np.array(baselines_polynomials)
rms_baselines = np.array(rms_baselines)
else:
print('Using fitted baselines.')
rms_means = np.array(np.percentile(rms_baselines, percentile))
if rms_cutoff is None:
baselines = baselines[rms_baselines < rms_means]
else:
baselines = baselines[rms_baselines < rms_cutoff]
if window:
baselines *= signal.windows.tukey(baselines.shape[1], alpha=0.25).reshape(1, -1)
mean_nps = np.mean(get_nps(baselines), axis=0)
return mean_nps, baselines