Source code for timeflux.nodes.apply

"""Arbitrary operations on DataFrames"""

from timeflux.core.node import Node
from importlib import import_module
import pandas as pd


[docs]class ApplyMethod(Node): """Apply a function along an axis of the DataFrame. This node applies a function along an axis of the DataFrame. Objects passed to the function are Series objects whose index is either the DataFrame's index (``axis`` = `0`) or the DataFrame's columns (``axis`` = `1`). Attributes: i (Port): default data input, expects DataFrame. o (Port): default output, provides DataFrame. Args: func (func): custom function specified directely that takes as input a n_array (eg. lambda x: x+1). Default: None. method (str): name of the module to import, in which the method is defined. eg. `numpy.mean`. apply_mode (str`): {`universal`, `reduce`, `expand` }. Default: `universal`. - `universal` if function is a transformation from n_array to n_array - `reduce` if function is a transformation from n_array to scalar - `expand` if function is a transformation from n_array to nk_array [not yet implemented] axis (int) : if 0, the transformation is applied to columns, if 1 to rows. Default: `0`. closed (str) : {`left`, `right`, `center`}: timestamp to transfer in the output, only when method_type is "reduce" and axis = 0, in which case, the output port's lenght is 1. Default: `right`. kwargs: additional keyword arguments to pass as keywords arguments to `func`. Notes: Note that the passed function will receive ndarray objects for performance purposes. For universal functions, ie. transformation from n_array to n_array, input and output ports have the same size. For reducing function, ie. from n_array to scalar, output ports's index is set to first (if ``closed`` = `left`), last (if ``closed`` = `right`), or middle (if ``closed`` = `center`) .. todo:: Allow expanding functions such as n_array to nk_array (with XArray usage) Example: Universal function: in this example, we apply `numpy.sqrt` to each value of the data. Shapes of input and output data are the same. * ``method`` = `numpy.sqrt` * ``method_type`` = `universal` If data in input port is ``i`` is: :: 0 2018-10-25 07:33:41.871131 9.0 2018-10-25 07:33:41.873084 16.0 2018-10-25 07:33:41.875037 1.0 2018-10-25 07:33:41.876990 4.0 It returns the squared root of the data on port ``o``: :: 0 2018-10-25 07:33:41.871131 3.0 2018-10-25 07:33:41.873084 4.0 2018-10-25 07:33:41.875037 1.0 2018-10-25 07:33:41.876990 2.0 Example: Reducing function: in this example, we apply `numpy.sum` to each value of the data. Shapes of input and output data are not the same. We set: * ``method`` = `numpy.sum` * ``method_type`` = `reduce` * ``axis`` = `0` * ``closed`` = `right` If data in input port is ``i`` is: :: 0 1 2018-10-25 07:33:41.871131 9.0 10.0 2018-10-25 07:33:41.873084 16.0 2.0 2018-10-25 07:33:41.875037 1.0 5.0 2018-10-25 07:33:41.876990 4.0 2.0 It returns the sum amongst row axis on port ``o``: :: 0 1 2018-10-25 07:33:41.876990 30.0 19.0 References: See the documentation of `pandas.apply <https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.apply.html>`_ . """ def __init__( self, method, apply_mode="universal", axis=0, closed="right", func=None, **kwargs, ): self._axis = axis self._closed = closed self._kwargs = kwargs self._apply_mode = apply_mode if func is not None: self._func = func else: module_name, function_name = method.rsplit(".", 1) try: module = import_module(module_name) except ImportError: raise ImportError(f"Could not import module {module_name}") try: self._func = getattr(module, function_name) except AttributeError: raise ValueError( f"Module {module_name} has no function {function_name}" ) if not callable(self._func): raise ValueError(f"Could not call the method {self._methode_name}") self._kwargs.update({"raw": True, "axis": axis}) if self._apply_mode == "reduce": self._kwargs["result_type"] = "reduce"
[docs] def update(self): if not self.i.ready(): return self.o.meta = self.i.meta self.o.data = self.i.data.apply(func=self._func, **self._kwargs) if self._apply_mode == "reduce": if self._axis == 0: if self._closed == "right": index_to_keep = self.i.data.index[-1] elif self._closed == "left": index_to_keep = self.i.data.index[0] else: # self._closed == 'middle': index_to_keep = self.i.data.index[len(self.i.data) // 2] self.o.data = pd.DataFrame(self.o.data, columns=[index_to_keep]).T else: # self._axis == 1: self.o.data = self.o.data.to_frame()