"""This module contains nodes for signal filtering."""
import numpy as np
import pandas as pd
from scipy import signal
from timeflux.core.branch import Branch
from timeflux.core.node import Node
from timeflux.nodes.window import TimeWindow
from timeflux_dsp.utils.filters import (
construct_fir_filter,
construct_iir_filter,
design_edges,
)
from timeflux_dsp.utils.import_helpers import make_object
[docs]class DropRows(Node):
"""Decimate signal by an integer factor.
This node uses Pandas computationally efficient functions to drop rows.
By default, it simply transfers one row out of ``factor`` and drops the others.
If ``method`` is `mean` (resp. median), it applies a rolling window of length
equals ``factor``, computes the mean and returns one value per window.
It maintains an internal state to ensure that every k'th sample is picked
even across chunk boundaries.
Attributes:
i (Port): Default input, expects DataFrame.
o (Port): Default output, provides DataFrame.
Args:
factor (int): Decimation factor. Only every k'th sample will be
transferred into the output.
method (str|None): Method to use to drop rows.
If `None`, the values are transferred as it.
If `mean` (resp. `median`), the mean (resp. median)
of the samples is taken.
Example:
.. literalinclude:: /../examples/droprows.yaml
:language: yaml
Example:
In this exemple, we generate white noise to stream and we drop one sample out
of two using DropRows, setting:
* ``factor`` = `2`
* ``method`` = `None` (see orange trace) | ``method`` = `"mean"` (see green trace)
.. image:: /static/image/droprows_io.svg
:align: center
Notes:
Note that this node is not supposed to dejitter the timestamps, so if
the input chunk is not uniformly sampled, the output chunk won’t be either.
Also, this filter does not implement any anti-aliasing filter.
Hence, it is recommended to precede this node by a low-pass filter
(e.g., FIR or IIR) which cuts out below half of the new sampling rate.
"""
def __init__(self, factor, method=None):
super().__init__()
self._factor = factor
self._method = method
self._previous = pd.DataFrame()
[docs] def update(self):
# copy the meta
self.o.meta = self.i.meta
# if nominal rate is specified in the meta, update it.
if "rate" in self.o.meta:
self.o.meta["rate"] /= self._factor
# When we have not received data, there is nothing to do
if not self.i.ready():
return
# At this point, we are sure that we have some data to process
self.i.data = pd.concat([self._previous, self.i.data], axis=0, sort=True)
n = self.i.data.shape[0]
remaining = n % self._factor
self.i.data, self._previous = np.split(self.i.data, [n - remaining])
if self._method is None:
# take every kth sample with k=factor starting from the k-1 position
self.o.data = self.i.data.iloc[self._factor - 1 :: self._factor]
else:
# estimate rolling mean (or median) with window length=factor and take
# every kth sample with k=factor starting from the k-1 position
if self._method == "mean":
self.o.data = (
self.i.data.rolling(
window=self._factor, min_periods=self._factor, center=False
)
.mean()
.iloc[self._factor - 1 :: self._factor]
)
elif self._method == "median":
self.o.data = (
self.i.data.rolling(
window=self._factor, min_periods=self._factor, center=False
)
.median()
.iloc[self._factor - 1 :: self._factor]
)
[docs]class Resample(Node):
"""Resample signal.
This node calls the `scipy.signal.resample` function to decimate the signal
using Fourier method.
Attributes:
i (Port): Default input, expects DataFrame.
o (Port): Default output, provides DataFrame.
Args:
factor (int): Decimation factor. Only every k'th sample will be
transferred into the output.
window (str|list|float): Specifies the window applied to the signal
in the Fourier domain. Default: `None`.
Example:
.. literalinclude:: /../examples/resample.yaml
:language: yaml
Notes:
This node should be used after a buffer to assure that the FFT window
has always the same length.
References:
* `scipy.signal.resample <https://docs.scipy.org/doc/scipy-0.16.0/reference/generated/scipy.signal.resample.html>`_
"""
def __init__(self, factor, window=None):
super().__init__()
self._factor = factor
self._window = window
self._previous = pd.DataFrame()
[docs] def update(self):
# copy the meta
self.o.meta = self.i.meta
# if nominal rate is specified in the meta, update it.
if "rate" in self.o.meta:
self.o.meta["rate"] /= self._factor
# When we have not received data, there is nothing to do
if not self.i.ready():
return
# At this point, we are sure that we have some data to process
n = self.i.data.shape[0]
if not self._previous.empty:
self.i.data = pd.concat([self._previous, self.i.data], axis=0)
if self.i.data.shape[0] % self._factor == 0:
self._previous = pd.DataFrame()
else:
self._previous = self.i.data.iloc[(n // self._factor) * self._factor :]
self.i.data = self.i.data.iloc[: (n // self._factor) * self._factor]
self.o.data = pd.DataFrame(
data=signal.resample(
x=self.i.data.values, num=n // self._factor, window=self._window
),
index=self.i.data.index[np.arange(0, n - 1, self._factor)],
columns=self.i.data.columns,
)
[docs]class IIRFilter(Node):
"""Apply IIR filter to signal.
If ``sos`` is `None`, this node uses adapted methods from mne.filters to
design the filter coefficients based on the specified parameters.
If no transition band is given, default is to use :
* l_trans_bandwidth = min(max(l_freq * 0.25, 2), l_freq)
* h_trans_bandwidth = min(max(h_freq * 0.25, 2.), rate / 2. - h_freq)
Else, it uses ``sos`` as filter coefficients.
Once the kernel has been estimated, the node applies the filtering to each
columns in ``columns`` using `scipy.signal.sosfilt` to generate the output given the input,
hence ensures continuity across chunk boundaries,
Attributes:
i (Port): Default input, expects DataFrame.
o (Port): Default output, provides DataFrame.
Args:
rate (float): Nominal sampling rate of the input data. If None, rate is get
from the meta.
order (int, optional): Filter order. Default: `None`.
frequencies (list|None): Transition frequencies. Ignored when sos is given.
filter_type (str|None): Filter mode (`lowpass`, `highpass`, `bandstop`, `bandpass`).
Default: `bandpass`. Ignored when sos is given.
sos (array|None, optional) : Array of second-order sections (sos) representation,
must have shape (n_sections, 6). Default: `None`.
kwargs: keyword arguments to pass to the filter constructor
Example:
In this example, we generate a signal that is the sum of two sinus with
respective periods of 1kHz and 15kHz and respective amplitudes of 1 and 0.5.
We stream this signal using the IIRFilter node, designed for lowpass
filtering at cutoff frequency 6kHz, order 3.
* ``order`` = `3`
* ``freqs`` = `[6000]`
* ``mode`` = `'lowpass'`
We plot the input signal, the output signal and the corresponding offline filtering.
.. image:: /static/image/iirfilter_io.svg
:align: center
Notes:
This node ensures continuity across chunk boundaries, using a recursive algorithm,
based on a cascade of biquads filters.
The filter is initialized to have a minimal step response, but needs a
'warm up' period for the filtering to be stable, leading to small artifacts
on the first few chunks.
The IIR filter is faster than the FIR filter and delays the signal less
but this delay is not constant and the stability not ensured.
References:
* `Real-Time IIR Digital Filters <http://www.eas.uccs.edu/~mwickert/ece5655/lecture_notes/ece5655_chap8.pdf>`_
* `scipy.signal.sosfilt <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.sosfilt.html>`_
"""
def __init__(
self, frequencies=None, rate=None, filter_type="bandpass", sos=None, **kwargs
):
super().__init__()
# self._order = order
self._frequencies = frequencies
self._filter_type = filter_type
self._kwargs = dict(order=None, design="butter", pass_loss=3.0, stop_atten=50.0)
self._kwargs.update(kwargs)
self._rate = rate
self._zi = None
self._sos = None
self._sos_custom = sos
self._columns = None
[docs] def update(self):
# copy the meta
self.o = self.i
# When we have not received data, there is nothing to do
if not self.i.ready():
return
# At this point, we are sure that we have some data to process
if self._columns is None:
self._columns = self.i.data.columns
# set rate from the data if it is not yet given
if self._rate is None:
self._rate = self.i.meta.get("rate", None)
if self._rate is None:
# If there is no rate in the meta, set rate to 1.0
self._rate = 1.0
self.logger.warning(
f"Nominal rate not supplied, considering " f"1.0 Hz instead. "
)
else:
self.logger.info(f"Nominal rate set to {self._rate}. ")
if self._sos is None:
self._design_sos()
if self._zi is None:
zi0 = signal.sosfilt_zi(self._sos)
self._zi = np.stack(
[
(zi0 * self.i.data.iloc[0, k_col])
for k_col in range(len(self._columns))
],
axis=1,
)
port_o, self._zi = signal.sosfilt(self._sos, self.i.data.values.T, zi=self._zi)
self.o.data = pd.DataFrame(
port_o.T, columns=self._columns, index=self.i.data.index
)
def _design_sos(self):
if self._sos_custom is None:
# Calculate an IIR filter kernel for a given sampling rate.
self._sos, self._freqs = construct_iir_filter(
rate=self._rate,
frequencies=self._frequencies,
filter_type=self._filter_type,
output="sos",
**self._kwargs,
)
else:
if self._sos_custom.shape[1] == 6:
self._sos = self._sos_custom
else:
raise ValueError(
f"sos must have shape (n_sections, 6), received {self._sos_custom.shape} instead. "
)
[docs]class IIRLineFilter(Node):
"""Apply multiple Notch IIR Filter in series.
Attributes:
i (Port): Default input, expects DataFrame.
o (Port): Default output, provides DataFrame.
Args:
rate (float): Nominal sampling rate of the input data. If None, rate is get
from the meta.
edges_center: List with center of the filters.
orders (tuple|int|None): List with orders of the filters.
If int, the same order will be used for all filters.
If None, order 2 will be used for all filters.
edges_width: List with orders of the filters.
If int, the same order will be used for all filters.
If None, width of 3 (Hz) will be used for all filters.
"""
def __init__(
self,
rate=None,
edges_center=(50, 60, 100, 120),
orders=(2, 1, 1, 1),
edges_width=(3, 3, 3, 3),
):
super().__init__()
orders = orders or 2
edges_width = edges_width or 3
if isinstance(orders, int):
orders = [orders] * len(edges_center)
if isinstance(edges_width, int):
edges_width = [edges_width] * len(edges_center)
filter_type = "bandstop"
self._nodes = []
for edge_center, edge_width, order in zip(edges_center, edges_width, orders):
frequencies = [edge_center - edge_width, edge_center + edge_width]
self._nodes.append(
IIRFilter(
rate=rate,
order=order,
frequencies=frequencies,
filter_type=filter_type,
)
)
[docs] def update(self):
# When we have not received data, there is nothing to do
if not self.i.ready():
return
# At this point, we are sure that we have some data to process
# initialize output port
self.o = self.i
# apply each filter in series
for node in self._nodes:
node.i.data = self.o.data
node.update()
self.o.data = node.o.data
[docs]class FIRFilter(Node):
"""Apply FIR filter to signal.
If ``coeffs`` is `None`, this node uses adapted methods from *mne.filters*
to design the filter coefficients based on the specified parameters.
If no transition band is given, default is to use:
* l_trans_bandwidth = min(max(l_freq * 0.25, 2), l_freq)
* h_trans_bandwidth = min(max(h_freq * 0.25, 2.), fs / 2. - h_freq)
Else, it uses ``coeffs`` as filter coefficients.
It applies the filtering to each columns in ``columns`` using `scipy.signal.lfilter`
to generate the output given the input,
hence ensures continuity across chunk boundaries,
The delay introduced is estimated and stored in the meta ``FIRFilter``, ``delay``.
Attributes:
i (Port): Default input, expects DataFrame.
o (Port): Default output, provides DataFrame and meta.
Args:
rate (float): Nominal sampling rate of the input data. If None, rate is get
from the meta.
columns (list|'all', optional): Columns to apply filter on. Default: `all`.
order (int): Filter order.
frequencies (list): Transition frequencies.
filter_type (str, optional): Filter mode (`lowpass`, `highpass`, `bandstop`
or `bandpass`). Default: `bandpass`.
coeffs (array|None, optional): Custom coeffs to pass as ``b`` in `signal.filter`.
Default: `None`.
kwargs: keyword arguments to pass to the filter constructor (window, phase,... )
Example:
In this exemple, we generate a signal that is the sum of two sinus with
respective periods of 1kHz and 15kHz and respective amplitudes of 1 and 0.5.
We stream this signal using the FIRFilter node, designed for lowpass
filtering at cutoff frequency 6kHz, order 20.
* ``order`` = `20`
* ``freqs`` = `[6000, 6100]`
* ``mode`` = `'lowpass'`
The FIR is a linear phase filter, so it allows one to correct for the
introduced delay. Here, we retrieve the input sinus of period 1kHz.
We plot the input signal, the output signal, the corresponding offline
filtering and the output signal after delay correction.
.. image:: /static/image/firfilter_io.png
:align: center
Notes:
The FIR filter ensures a linear phase response, but is computationnaly
more costly than the IIR filter.
The filter is initialized to have a minimal step response, but needs a
'warmup' period for the filtering to be stable, leeding to small artifacts on
the first few chunks.
"""
def __init__(
self,
frequencies,
rate=None,
columns="all",
order=20,
filter_type="bandpass",
coeffs=None,
**kwargs,
):
super().__init__()
self._order = order
self._frequencies = frequencies
self._mode = filter_type
self._rate = rate
self._columns = columns if columns != "all" else None
self._coeffs_custom = coeffs
self._kwargs = dict(design="firwin2", phase="linear", window="hamming")
self._kwargs.update(kwargs)
# Initialize the filter kernels and states, one per stream
self._zi = {} # FIR filter states, one per column
self._coeffs = None # FIR filter coeffs
self._delay = None # FIR filter delays
[docs] def update(self):
# copy the meta
self.o = self.i
# When we have not received data, there is nothing to do
if not self.i.ready():
return
# At this point, we are sure that we have some data to process
if self._columns is None:
self._columns = self.i.data.columns
# set rate from the data if it is not yet given
if self._rate is None:
self._rate = self.i.meta.get("rate", None)
if self._rate is None:
# If there is no rate in the meta, set rate to 1.0
self._rate = 1.0
self.logger.warning(
f"Nominal rate not supplied, considering " f"1.0 Hz instead. "
)
else:
self.logger.info(f"Nominal rate set to {self._rate}. ")
self._coeffs, self._delay = self._design_filter()
for column in self._columns:
if column not in self._zi:
zi0 = signal.lfilter_zi(self._coeffs, 1.0)
self._zi[column] = zi0 * self.i.data[column].values[0]
port_o_col, self._zi[column] = signal.lfilter(
b=self._coeffs,
a=1.0,
x=self.i.data[column].values.T,
zi=self._zi[column],
)
# self.o.meta.update({'FIRFilter': {'delay': self._delay}})
self.o.data.loc[:, column] = port_o_col
# update delay
delay = self.o.meta.get("delay") or 0.0
delay += self._delay
self.o.meta.update({"delay": delay})
def _design_filter(self):
"""Calculate an FIR filter kernel for a given sampling rate."""
nyq = self._rate / 2.0
if self._coeffs_custom is None:
edges, gains, _, _ = design_edges(
frequencies=self._frequencies, nyq=nyq, mode=self._mode
)
fir_coeffs = construct_fir_filter(
self._rate, edges, gains, self._order, **self._kwargs
)
else:
fir_coeffs = self._coeffs_custom
warmup = self._order - 1
fir_delay = (warmup / 2) / self._rate
return fir_coeffs, fir_delay
[docs]class Scaler(Node):
"""Apply a sklearn scaler
Attributes:
i (Port): Default input, expects DataFrame.
o (Port): Default output, provides DataFrame and meta.
Args:
method (str): Name of the scaler object (see https://scikit-learn.org/stable/modules/classes.html#module-sklearn.preprocessing)
**kwargs: keyword arguments to initialize the scaler.
"""
def __init__(self, method="sklearn.preprocessing.StandardScaler", **kwargs):
super().__init__()
try:
self._scaler = make_object(method, kwargs)
except AttributeError:
raise ValueError("Cannot make object from {method}".format(method=method))
[docs] def update(self):
if not self.i.ready():
return
# scale the signal
self.o.data = pd.DataFrame(
data=self._scaler.fit_transform(self.i.data.values),
columns=self.i.data.columns,
)
if len(self.o.data) == len(self.i.data):
self.o.data.index = self.i.data.index
[docs]class AdaptiveScaler(TimeWindow):
"""Scale the data adaptively.
This nodes transforms the data using a sklearn scaler object that is continuously fitted on a rolling window.
Attributes:
i (Port): Default input, expects DataFrame.
o (Port): Default output, provides DataFrame and meta.
Args:
length (float): The length of the window, in seconds.
method (str): Name of the scaler object (see https://scikit-learn.org/stable/modules/classes.html#module-sklearn.preprocessing)
dropna (bool): Whether or not NaN should be dropped before fitting the estimator. Default to False.
**kwargs : keyword arguments to initialize the scaler.
"""
def __init__(
self,
length,
method="sklearn.preprocessing.StandardScaler",
dropna=False,
**kwargs,
):
super().__init__(length=length, step=0)
self._fitted = False
self._dropna = dropna
try:
self._scaler = make_object(method, kwargs)
except AttributeError:
raise ValueError(f"Module sklearn.preprocessing has no object {method}")
[docs] def update(self):
if not self.i.ready():
return
# At this point, we are sure that we have some data to process
super().update()
# if the window output is ready, fit the scaler with its values
if self.o.ready():
x = self.o.data.values
if self._dropna:
x = x[~np.isnan(x)]
self._scaler.fit(x)
self._fitted = True
self.o.clear()
# copy meta
self.o.meta = self.i.meta
# if the scaler has been fitted, transform the current data
if self._fitted and self.i.ready():
transformed_data = self._scaler.transform(self.i.data.values)
self.o.data = pd.DataFrame(
data=transformed_data,
columns=self.i.data.columns,
index=self.i.data.index,
)
[docs]class FilterBank(Branch):
"""Apply multiple IIR Filters to the signal and stack the components horizontally
Attributes:
i (Port): Default input, expects DataFrame.
o (Port): Default output, provides DataFrame.
Args:
rate (float): Nominal sampling rate of the input data. If None, rate is get
from the meta.
filters (dict|None): Define the iir filter to apply given its name and its params.
"""
def __init__(self, filters, method="IIRFilter", rate=None, **kwargs):
super().__init__()
self._filters = filters
graph = {"nodes": [], "edges": []}
graph["nodes"].append(
{
"id": "stack",
"module": "timeflux_dsp.nodes.helpers",
"class": "Concat",
}
)
for filter_name, filter_params in self._filters.items():
filter_params.update({"rate": rate})
filter_params.update(kwargs)
iir = {
"id": filter_name,
"module": "timeflux_dsp.nodes.filters",
"class": method,
"params": filter_params,
}
rename_columns = {
"id": f"rename_{filter_name}",
"module": "timeflux.nodes.axis",
"class": "AddSuffix",
"params": {"suffix": f"_{filter_name}"},
}
graph["nodes"] += [iir, rename_columns]
graph["edges"] += [
{"source": filter_name, "target": f"rename_{filter_name}"},
{"source": f"rename_{filter_name}", "target": f"stack:{filter_name}"},
]
self.load(graph)
[docs] def update(self):
# When we have not received data, there is nothing to do
if not self.i.ready():
return
# set the data in input of each filter
for filter_name in self._filters.keys():
self.set_port(filter_name, port_id="i", data=self.i.data, meta=self.i.meta)
self.run()
self.o = self.get_port("stack", port_id="o")