import itertools
import json
import os
import re
from typing import Any
import numpy as np
import cait as ai
from ..eventfunctions.processing.optimumfiltering import OptimumFiltering
from ..plot.basic.line import Line
from .arraywithbenefits import ArrayWithBenefits
from .helper import is_array_like
from .nps import NCM, NPS
from .sev import SEV
[docs]
class OF(ArrayWithBenefits):
"""
Object representing an Optimum Filter (OF). It can either be created from a Standard Event (SEV) and a Noise Power Spectrum (NPS), a SEV and a Noise Covariance Matrix (NCM), from an `np.ndarray` or read from a DataHandler or xy-file.
:param args: The data to use for the OF. If None, an empty OF is created. If `np.ndarray`, each row in the array is interpreted as an OF for separate channels. If instances of :class:`SEV` and :class:`NPS`, the OF is calculated from them. If instances of :class:`SEV` and :class:`NCM`, the 2d optimum filter (a correlated version of an optimum filter with multiple channels) is calculated. Defaults to None.
:type data: Any
:param dt_us: The microsecond timebase used in the recording. Only necessary if the input is an array (because it can be automatically inferred if constructed from NPS and SEV, a file, or a DataHandler). Defaults to None (i.e. automatic detection if possible).
:type dt_us: int
.. code-block:: python
sev = vai.SEV.from_dh(dh)
nps = vai.NPS.from_dh(dh)
of = vai.OF(sev, nps)
"""
def __init__(self, *args: Any, dt_us: int = None):
if len(args) == 1 and (
isinstance(args[0], np.ndarray) or is_array_like(args[0])
):
if dt_us is None:
raise ValueError(
"If OF is constructed from array(-like) input, the microsecond timebase of the recording (dt_us) has to be specified."
)
self._dt_us = dt_us
self._of = np.array(args[0])
if self._of.ndim > 1:
self._n_ch = self._of.shape[0]
if self._n_ch == 1:
self._of = self._of.flatten()
else:
self._n_ch = 1
elif len(args) == 2:
bool_sev = [isinstance(k, SEV) for k in args]
bool_nps = [isinstance(k, (NPS, NCM)) for k in args]
if not any(bool_sev) or not any(bool_nps):
raise TypeError(
f"If 2 arguments are parsed, one of them has to be of class 'NPS' (or 'NCM') and one of class 'SEV', not {type(args[0])} and {type(args[1])}."
)
sev, nps = args[bool_sev.index(True)], args[bool_nps.index(True)]
if sev._n_channels != nps._n_channels:
raise Exception(
f"SEV and NPS/NCM must have the same number of channels. Numbers received: ({sev._n_channels},{nps._n_channels})"
)
if sev.dt_us != nps.dt_us:
raise Exception(
f"SEV and NPS/NCM must have the same timebase. Numbers received: ({sev.dt_us}, {nps.dt_us})"
)
self._dt_us = sev.dt_us
self._n_ch = sev._n_channels
# Maximum time in samples
t_m = np.argmax(np.array(sev), axis=-1)
# Cast t_m into a column vector such that vectorization works
if t_m.ndim > 0:
t_m = t_m[:, None]
# np.fft.rfftfreq is [0, 1/n, 2/n, ... 1/2-1/n, 1/2], i.e. in units of cycles/sample
# But the definition of the inverse Fourier transform in GATTI and MANFREDI reveals
# that they are using an omega given in rad/s.
# To match this convention, we have to multiply by 2*pi to go from
# frequency to angular frequency
omega = 2 * np.pi * np.fft.rfftfreq(sev.shape[-1])
# NPS version (regular OF)
if isinstance(nps, NPS):
# Cast to numpy array here (this will get rid of the SEV and NPS character)
sev, nps = np.array(sev), np.array(nps)
# The cait function calc_nps has an argument `force_zero` which sets
# the first entry of the NPS to 0. If the user uses such an NPS to
# create an OF, we set its first entry to zero to stay in line with
# the standard cait routines.
if self._n_ch == 1:
if nps[0] == 0:
s1, s2 = slice(1, None), slice(1, None)
else:
s1, s2 = slice(None, None), slice(None, None)
else:
if np.any(nps[:, 0] == 0):
s1, s2 = (slice(None, None), slice(1, None)), slice(1, None)
else:
s1, s2 = (
(slice(None, None), slice(None, None)),
slice(None, None),
)
H = np.zeros(nps.shape, dtype=np.complex128)
H[s1] = (
np.fft.rfft(sev).conjugate()[s1]
* np.exp(-1j * t_m * omega[s2])
/ nps[s1]
)
# In any case, we force the 0 component of the filter kernel to 0
# (this shifts the signal to 0)
H[..., 0] = 0
maxima = np.max(OptimumFiltering(H)(sev), axis=-1)
# Cast maxima into a column vector such that vectorization works
if maxima.ndim > 0:
maxima = maxima[:, None]
self._of = H / maxima
# NCM version (correlated OF).
# Above, we made sure that there are only those two options.
else:
sev_vec = (
np.atleast_2d(
np.fft.rfft(sev).conjugate() * np.exp(-1j * t_m * omega)
).T
)[:, None, :]
ncm_inv = np.linalg.inv(np.transpose(np.array(nps), [2, 0, 1]))
H = np.squeeze(sev_vec @ ncm_inv).T
maximum = np.max(
np.sum(np.atleast_2d(OptimumFiltering(H)(sev)), axis=0)
)
self._of = H / maximum
elif len(args) == 0:
self._of = np.empty(0)
self._n_ch = 0
self._dt_us = dt_us
else:
raise TypeError(f"Unsupported input arguments {args}")
[docs]
@classmethod
def from_dh(cls, dh, group: str = "optimumfilter", dataset: str = "optimumfilter*"):
"""
Construct OF from DataHandler.
:param dh: The DataHandler instance to read from.
:type dh: DataHandler
:param group: The HDF5 group where the OF is stored.
:type group: str
:param dataset: The HDF5 dataset where the OF is stored. The star `*` denotes the position of the suffixes '_real' and '_imag'.
:type dataset: str
:return: Instance of OF.
:rtype: OF
"""
if "*" not in dataset:
dataset += "*"
ds_prefix, ds_suffix = dataset.split("*")
arr = dh.get(group, ds_prefix + "_real" + ds_suffix) + 1j * dh.get(
group, ds_prefix + "_imag" + ds_suffix
)
return cls(arr, dt_us=dh.dt_us)
[docs]
def to_dh(
self,
dh,
group: str = "optimumfilter",
dataset: str = "optimumfilter*",
**kwargs,
):
"""
Save OF to DataHandler.
:param dh: The DataHandler instance to write to.
:type dh: DataHandler
:param group: The HDF5 group where the OF should be stored.
:type group: str
:param dataset: The HDF5 dataset where the OF should be stored. The star `*` denotes the position of the suffixes '_real' and '_imag'.
:type dataset: str
:param kwargs: Keyword arguments for :func:`cait.DataHandler.set`.
:type kwargs: Any
"""
if self._dt_us != dh.dt_us:
raise ValueError(
f"Timebase of OF ({self._dt_us}) does not match the one of DataHandler ({dh.dt_us})."
)
if "*" not in dataset:
dataset += "*"
ds_prefix, ds_suffix = dataset.split("*")
data = self._of[None, :] if self._n_channels == 1 else self._of
dh.set(
group,
**{
ds_prefix + "_real" + ds_suffix: np.real(data),
ds_prefix + "_imag" + ds_suffix: np.imag(data),
},
**kwargs,
)
[docs]
@classmethod
def from_file(cls, fname: str, src_dir: str = ""):
"""
Construct OF from xy-file.
:param fname: Filename to look for (without file-extension)
:type fname: str
:param out_dir: Directory to look in. Defaults to '' which means searching current directory. Optional
:type out_dir: str
:return: Instance of OF.
:rtype: OF
"""
fpath = os.path.join(src_dir, fname + ".xy")
# We read the file to find out how many header lines it has
with open(fpath, "r") as f:
first_line = f.readline()
header = re.findall(r"\{.*\}", first_line)
if header and all([x in json.loads(header[0]) for x in ["dt_us", "n_ch"]]):
info = json.loads(header[0])
data = np.loadtxt(fpath, skiprows=2 * info["n_ch"] + 3)[:, 1:].T
dt_us = info["dt_us"]
else:
content = np.loadtxt(fpath)
data = content[:, 1:].T
dt_us = int(
np.round(
1 / np.diff(content[:, 0])[0] / (2 * (data.shape[-1] - 1)) * 1e6
)
)
if not (data.ndim % 2) == 0 or data.ndim == 0:
raise Exception(
"Compatible files must have an even number of data columns (containing real and imaginary part of the OF, respectively)"
)
arr = data[::2] + 1j * data[1::2]
return cls(arr, dt_us=dt_us)
[docs]
def to_file(self, fname: str, out_dir: str = "", info_str: str = ""):
"""
Write OF to xy-file.
:param fname: Filename to use (without file-extension)
:type fname: str
:param out_dir: Directory to write to. Defaults to '' which means writing to current directory. Optional
:type out_dir: str
:param info_str: An info string to be saved as a description of the OF in the header of the file. Optional
:type info_str: str
"""
if np.array_equal(self._array, np.empty(0)):
raise Exception("Empty OF cannot be saved.")
fpath = os.path.join(out_dir, fname + ".xy")
if self._n_ch > 1:
data = [
[self._array[k].real, self._array[k].imag] for k in range(self._n_ch)
]
data = [self.freq] + list(itertools.chain.from_iterable(data))
names = [
[f"Real Channel {k}", f"Imag Channel {k}"] for k in range(self._n_ch)
]
names = ["Frequency (Hz)"] + list(itertools.chain.from_iterable(names))
else:
data = [self.freq, self._array.real, self._array.imag]
names = ["Frequency (Hz)", "Real", "Imag"]
header = (
"OF "
+ json.dumps(
{
"cait": ai.__version__,
"dt_us": self.dt_us,
"record_length": 2 * (self._of.shape[-1] - 1),
"n_ch": self._n_channels,
**({"info": str(info_str)} if info_str else {})
}
)
+ "\n"
)
header += "\n".join(names + [f"{len(self.freq)}"])
np.savetxt(fpath, np.array(data).T, header=header)
[docs]
def show(self, **kwargs):
"""
Plot OF for all channels. To inspect just one channel, you can index OF first and call `.show` on the slice.
:param kwargs: Keyword arguments passed on to :class:`cait.versatile.Line`.
:type kwargs: Any
"""
if self._n_channels == 0:
raise Exception("Nothing to plot.")
if "xscale" not in kwargs.keys():
kwargs["xscale"] = "log"
if "yscale" not in kwargs.keys():
kwargs["yscale"] = "log"
if "x" not in kwargs.keys():
kwargs["x"] = self.freq
if "xlabel" not in kwargs.keys():
kwargs["xlabel"] = "Frequency (Hz)"
if self._n_channels > 1:
y = dict()
for i, channel in enumerate(self._array):
y[f"|channel {i}|"] = np.abs(channel)
else:
y = np.abs(self._array)
return Line(y, **kwargs)
@property
def _array(self):
return self._of
@_array.setter
def _array(self, array):
if self._of.size == 0:
self._of = array
self._n_ch = 1 if array.ndim < 2 else array.shape[0]
else:
raise Exception("OF._array can only be set as long as it is empty.")
@property
def _n_channels(self):
return self._n_ch
@property
def dt_us(self):
return self._dt_us
@property
def freq(self):
if self.dt_us is not None:
n = 2 * (self.shape[-1] - 1)
return np.fft.rfftfreq(n, self.dt_us / 1e6)