Source code for timeflux.nodes.accumulate

"""Accumulation nodes that either, stack, append or, concatenate data after a gate"""

import pandas as pd
import xarray as xr
from timeflux.core.node import Node


[docs]class AppendDataFrame(Node): """Accumulates and appends data of type DataFrame after a gate. This node should be plugged after a Gate. As long as it receives data, it appends them to an internal buffer. When it receives a meta with key `gate_status` set to `closed`, it releases the accumulated data and empty the buffer. Attributes: i (Port): Default data input, expects DataFrame and meta o (Port): Default output, provides DataFrame Args: **kwargs: key word arguments to pass to pandas.DataFrame.concat method. """ def __init__(self, meta_keys=None, **kwargs): super().__init__() self._meta_keys = meta_keys self._kwargs = kwargs self._reset() def _reset(self): self._data = pd.DataFrame() self._meta = [] def _release(self): self.logger.info( f"AppendDataFrame is releasing {len(self._data)} " f"accumulated rows." ) self.o.data = self._data if self._meta_keys is None: self.o.meta = {"accumulate": self._meta} else: self.o.meta = {key: [] for key in self._meta_keys} for meta_key in self._meta_keys: for meta in self._meta: self.o.meta[meta_key] += meta.get(meta_key, [])
[docs] def update(self): gate_status = self.i.meta.get("gate_status") if self.i.ready(): # update the meta self._meta.append(self.i.meta) # append the data self._data = pd.concat([self._data, self.i.data], **self._kwargs) # if gate is close, release the data and reset the buffer if gate_status == "closed" and not self._data.empty: self._release() self._reset()
[docs]class AppendDataArray(Node): """Accumulates and appends data of type XArray after a gate. This node should be plugged after a Gate. As long as it receives DataArrays, it appends them to a buffer list. When it receives a meta with key `gate_status` set to `closed`, it concatenates the list of accumulated DataArray, releases it and empty the buffer list. Attributes: i (Port): Default data input, expects DataArray and meta o (Port): Default output, provides DataArray Args: dim: Name of the dimension to concatenate along. **kwargs: key word arguments to pass to xarray.concat method. """ def __init__(self, dim, meta_keys=None, **kwargs): super().__init__() self._dim = dim self._meta_keys = meta_keys self._kwargs = kwargs self._reset() def _reset(self): self._data_list = [] self._meta = [] def _release(self): self.logger.info( f"AppendDataArray is releasing {len(self._data_list)} " f"accumulated data chunks." ) self.o.data = xr.concat(self._data_list, self._dim, **self._kwargs) if self._meta_keys is None: self.o.meta = {"accumulate": self._meta} else: self.o.meta = {key: [] for key in self._meta_keys} for meta_key in self._meta_keys: for meta in self._meta: self.o.meta[meta_key] += meta.get(meta_key, [])
[docs] def update(self): gate_status = self.i.meta.get("gate_status") # append the data if self.i.ready(): self._data_list.append(self.i.data) # update the meta self._meta.append(self.i.meta) # if gate is close, release the data and reset the buffer if gate_status == "closed" and self._data_list: self._release() self._reset()