"""Dejittering nodes"""
import numpy as np
import pandas as pd
from time import time
from timeflux.core.node import Node
from timeflux.core.exceptions import WorkerInterrupt
[docs]class Reindex(Node):
"""A simple dejittering node that will reindex the data according to the sampling rate.
This node is useful when datetime indices are not monotonic, which can happen if the
stream is acquired from a LSL inlet.
Attributes:
i (Port): Default input, expects DataFrame and meta.
o (Port): Default output, provides DataFrame and meta.
Args:
rate (float|None): Nominal sampling rate. If `None`, the value will be read from the meta data.
Notes:
This node assumes that no samples were lost and that the device clock is relatively stable.
"""
def __init__(self, rate=None):
self._rate = rate
self._frequency = None
self._delta = None
self._index = None
[docs] def update(self):
if not self.i.ready():
return
if self._rate is None:
self._rate = self.i.meta.get("rate")
if self._rate is None:
self.logger.error("The rate parameter is required")
raise WorkerInterrupt
if self._frequency is None:
frequency = 1 / self._rate
self._frequency = pd.DateOffset(seconds=frequency)
self._delta = pd.Timedelta(frequency, "second")
if self._index is None:
self._index = self.i.data.index.values[0]
indices = pd.date_range(
start=self._index, periods=len(self.i.data), freq=self._frequency
)
self._index = indices.values[-1] + self._delta
self.o = self.i
self.o.data.index = indices
self.o.meta["rate"] = self._rate
[docs]class Snap(Node):
"""Snap timestamps to the nearest occurring frequency.
Attributes:
i (Port): Default input, expects DataFrame and meta.
o (Port): Default output, provides DataFrame and meta.
Args:
rate (float|None): (optional) nominal sampling frequency of the data, to round
the timestamps to (in Hz). If None, the rate will be obtained from the meta
of the input port.
"""
def __init__(self, rate=None):
self._rate = rate
[docs] def update(self):
# copy the meta and the data
self.o = self.i
# if the rate has not been set in the constructor, get it from the meta
if self._rate is None:
self._rate = self.i.meta.get("rate")
# When we have not received data, there is nothing to do
if self._rate is None or self.i.data is None or self.i.data.empty:
return
# At this point, we are sure that we have some data to process
self.o.data.index = self.o.data.index.round(str(1 / self._rate) + "S")
self.o.meta["rate"] = self._rate
[docs]class Interpolate(Node):
"""Dejitter data with values interpolation.
This nodes continuously buffers a small amount of data to allow for interpolating
missing samples.
The output data is resampled at a fixed rate.
The interpolation is performed by Pandas methods.
Attributes:
i (Port): Default input, expects DataFrame and meta.
o (Port): Default output, provides DataFrame and meta.
Args:
rate (float|None): (optional) nominal sampling frequency of the data. If None, the rate will be obtained from the meta of the input port.
method: interpolation method. See the pandas.DataFrame.interpolate documentation.
n_min: minimum number of samples to perform the interpolation.
n_max: number of samples to keep in the buffer.
Notes:
Computation cost mainly depends on the window size and the estimation is
performed in the main thread. Hence, the user should be careful on the
computation duration.
"""
def __init__(self, rate=None, method="cubic", n_min=3, n_max=10):
self._rate = rate
if self._rate is not None:
self._set_timedelta()
self._method = method # {‘linear’, ‘time’, ‘index’, ‘values’, ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘barycentric’, ‘krogh’, ‘polynomial’, ‘spline’, ‘piecewise_polynomial’, ‘from_derivatives’, ‘pchip’, ‘akima’}
self._n_min = n_min
self._n_max = n_max
self._last_datetime = None
def _set_timedelta(self):
self._timespan = 1 / self._rate
self._timedelta = pd.to_timedelta(
self._timespan, "s"
) # sampling period of the interpolated signal
[docs] def update(self):
self.o.meta = self.i.meta
# if the rate has not been set in the constructor, get it from the meta
if self._rate is None:
self._rate = self.i.meta.get("rate")
self._set_timedelta()
self.o.meta["rate"] = self._rate
if self.i.data is None or self.i.data.empty:
return
# initialize the first datetime index.
if self._last_datetime is None:
self._last_datetime = self.i.data.index.round(str(self._timespan) + "S")[0]
self._buffer = pd.DataFrame()
self._times = np.arange(
self._last_datetime,
self.i.data.index[-1],
self._timedelta,
dtype="datetime64[us]",
)
else:
self._times = np.arange(
self._last_datetime + self._timedelta,
self.i.data.index[-1],
self._timedelta,
dtype="datetime64[us]",
)
# interpolate
self._interpolate()
def _drop_duplicates(self, data):
return data.loc[~data.index.duplicated(keep="first")]
def _make_monotonic(self, data):
return data[
np.diff(pd.Index([self._last_datetime]).append(data.index))
/ np.timedelta64(1, "s")
> 0
]
def _interpolate(self):
# interpolate current chunk
self._buffer = pd.concat(
[self._buffer, self.i.data], sort=True
) # append last sample be able to interpolate
if not self._buffer.index.is_monotonic_increasing:
self.logger.warning("Data index should be strictly monotonic")
self._buffer = self._make_monotonic(self._buffer)
data_to_interpolate = pd.concat(
[self._buffer, pd.DataFrame(index=self._times)], sort=True
)
data_to_interpolate = self._drop_duplicates(data_to_interpolate).sort_index()
if (self._buffer.notnull().sum(axis=0) > self._n_min).all():
self.o.data = (
data_to_interpolate.interpolate(axis=0, method=self._method)
.reindex(self._times)
.loc[self._last_datetime :]
.dropna(axis=0, how="any")
)
if not self.o.data.empty:
self._last_datetime = self.o.data.index[-1]
self._buffer = self._buffer.tail(self._n_max)
[docs]class Space(Node):
"""Evenly space timestamps.
This is useful to correct drifting data streams.
Attributes:
i (Port): Default input, expects DataFrame and meta.
o (Port): Default output, provides DataFrame and meta.
Example:
.. literalinclude:: /../examples/dejitter_space.yaml
:language: yaml
"""
def __init__(self):
self._stop = int(time() * 1e6)
[docs] def update(self):
if self.i.ready():
self.o = self.i
start = self._stop
self._stop = int(time() * 1e6)
indices = np.linspace(
start, self._stop, len(self.o.data), False, dtype="datetime64[us]"
)
self.o.data.index = indices