Source code for timeflux.nodes.query

import re
import numpy as np
from timeflux.core.exceptions import WorkerInterrupt
from timeflux.core.node import Node


[docs]class SelectRange(Node): """Select a subset of the given data along vertical (index) or horizontal (columns) axis. Attributes: i (Port): default data input, expects DataFrame with eventually MultiIndex. o (Port): default output, provides DataFrame with eventually MultiIndex. Args: ranges (dict): Dict with keys are level names and values are selection ranges. axis (int): If 0, the level concerns row index, if 1, columns index (`0` or `1`). Default: `0`. inclusive (bool) : Whether the boundaries are strict or included. Default: `False`. Example: In this example, we have an input DataFrame with multi level columns and we want to select data with index from level of name `second` in range `[1,1.5]`. We set: * ``ranges`` = `{"second": [1, 1.5]}` * ``axis`` = `1` * ``inclusive`` = `True` If the data received on port ``i`` is: :: first A ... B second 1.3 1.6 1.9 1.3 1.6 1.9 2017-12-31 23:59:59.998745401 0.185133 0.541901 0.806561 ... 0.732225 0.806561 0.658783 2018-01-01 00:00:00.104507143 0.692277 0.849196 0.987668 ... 0.489425 0.221209 0.987668 2018-01-01 00:00:00.202319939 0.944059 0.039427 0.567945 ... 0.925248 0.180575 0.567945 The data provided on port ``o`` will be: :: first A B second 1.3 1.3 2017-12-31 23:59:59.998745401 0.185133 0.732225 2018-01-01 00:00:00.104507143 0.692277 0.489425 2018-01-01 00:00:00.202319939 0.944059 0.925248 """ def __init__(self, ranges, axis=0, inclusive=False): self._ranges = ranges # list of ranges per level self._inclusive = inclusive # include boundaries. self._axis = axis
[docs] def update(self): if not self.i.ready(): return self.o.meta = self.i.meta if self._axis == 1: self.i.data = self.i.data.T mask = self._mask() self.o.data = self.i.data[np.logical_and.reduce(mask)] if self._axis == 1: self.o.data = self.o.data.T
def _mask(self): if self._inclusive: mask = [ (self.i.data.index.get_level_values(l) >= r[0]) & (self.i.data.index.get_level_values(l) <= r[1]) for l, r in (self._ranges).items() if r is not None ] else: mask = [ (self.i.data.index.get_level_values(l) > r[0]) & (self.i.data.index.get_level_values(l) < r[1]) for l, r in (self._ranges).items() if r is not None ] return mask
[docs]class XsQuery(Node): """Returns a cross-section (row(s) or column(s)) from the data. Attributes: i (Port): default input, expects DataFrame with eventually MultiIndex. o (Port): default output, provides DataFrame with eventually MultiIndex. Args: key (str|tuple): Some label contained in the index, or partially in a MultiIndex index. axis (int): Axis to retrieve cross-section on (`0` or `1`). Default: `0`. level (str|int|tuple) : In case of a key partially contained in a MultiIndex, indicates which levels are used. Levels can be referred by label or position. drop_level (bool) : If False, returns DataFrame with same level. Default: `False`. Example: In this example, we have an input DataFrame with multi level columns and we want to select cross section between `B` from level of name `first` and `1` from level of name `second`. We set: * ``key`` = `("B", 1)` * ``axis`` = `1` * ``level`` = `["first", "second"]` * ``drop_level`` = `False` If the data received on port ``i`` is: :: first A ... B second 1 2 ... 1 2 2017-12-31 23:59:59.998745401 0.185133 0.541901 ... 0.297349 0.806561 2018-01-01 00:00:00.104507143 0.692277 0.849196 ... 0.844549 0.221209 2018-01-01 00:00:00.202319939 0.944059 0.039427 ... 0.120567 0.180575 The data provided on port ``o`` will be: :: first B second 1 2018-01-01 00:00:00.300986584 0.297349 2018-01-01 00:00:00.396560186 0.844549 2018-01-01 00:00:00.496559945 0.120567 References: See the documentation of `pandas.DataFrame.xs <https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.xs.html>`_ . """ def __init__(self, key, **kwargs): """ Args: key (str|tuple): Some label contained in the index, or partially in a MultiIndex index. kwargs: Keyword arguments to call pandas xs method: axis, level, drop_level """ self._key = key self._kwargs = kwargs self._ready = False
[docs] def update(self): if not self.i.ready(): return self.o.meta = self.i.meta if not self._ready: try: self._query() self._ready = True except KeyError as e: raise WorkerInterrupt(e) else: self._query()
def _query(self): self.o.data = self.i.data.xs(key=self._key, **self._kwargs)
[docs]class LocQuery(Node): """Slices DataFrame on group of rows and columns by label(s) Attributes: i (Port): default data input, expects DataFrame. o (Port): default output, provides DataFrame. Args: key (str|list|tuple): Label selection specification. axis (int): Axis to query the label from (`0` or `1`). Default: `1`. Example: In this example, we have an input DataFrame with 5 columns `[A, B, C, D, E]` and we want to select columns A and E. We set: * ``key`` = `["A", "E"]` * ``axis`` = `1` If the data received on port ``i`` is: :: A B ... E F 2017-12-31 23:59:59.998745401 0.185133 0.541901 ... 0.806561 0.658783 2018-01-01 00:00:00.104507143 0.692277 0.849196 ... 0.221209 0.987668 2018-01-01 00:00:00.202319939 0.944059 0.039427 ... 0.180575 0.567945 The data provided on port ``o`` will be: :: A E 2017-12-31 23:59:59.998745401 0.185133 0.806561 2018-01-01 00:00:00.104507143 0.692277 0.221209 2018-01-01 00:00:00.202319939 0.944059 0.180575 References: See the documentation of `pandas.DataFrame.loc <https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.loc.html>`_ . """ def __init__(self, key, axis=1): self._axis = axis if not isinstance(key, (list, tuple)): self._key = [key] else: self._key = key self._ready = False
[docs] def update(self): if not self.i.ready(): return self.o = self.i if not self.i.ready(): return self.o.meta = self.i.meta if not self._ready: try: self._query() self._ready = True except KeyError as e: raise WorkerInterrupt(e) else: self.o.data = self.i.data.loc[:, self._key]
def _query(self): if self._axis == 0: self.o.data = self.i.data.loc[self._key, :] else: # self._axis == 1: self.o.data = self.i.data.loc[:, self._key]
[docs]class Match(Node): """Extract columns matching a regular expression Attributes: i (Port): default data input, expects DataFrame. o (Port): default output, provides DataFrame. Args: expression (str): Regular expression to match against. """ def __init__(self, expression): self._r = re.compile(expression) self._columns = None
[docs] def update(self): if not self.i.ready(): return if not self._columns: columns = list(self.i.data.columns) self._columns = [column for column in columns if self._r.match(column)] if not self._columns: self.logger.error("No matching column") raise WorkerInterrupt() self.o = self.i self.o.data = self.o.data[self._columns]