Source code for timeflux.helpers.testing

"""A set of tools to facilitate code testing"""

import numpy as np
import pandas as pd
import xarray as xr


[docs]class DummyData: """Generate dummy data.""" def __init__( self, num_rows=1000, num_cols=5, cols=None, rate=10, jitter=0.05, start_date="2018-01-01", seed=42, round=6, ): """ Initialize the dataframe. Args: num_rows (int): Number of rows num_cols (int): Number of columns cols (list): List of column names rate (float): Frequency, in Hertz jitter (float): Amount of jitter, relative to rate start_date (string): Start date seed (int): Seed for random number generation round (int): Number of decimals for random numbers """ np.random.seed(seed) frequency = 1 / rate indices = pd.date_range( start=start_date, periods=num_rows, freq=pd.DateOffset(seconds=frequency) ) jitter = frequency * jitter deltas = pd.to_timedelta(np.random.uniform(-jitter, jitter, num_rows), unit="s") indices = indices + deltas if cols is not None: num_cols = len(cols) rows = np.random.rand(num_rows, num_cols).round(round) self._data = pd.DataFrame(rows, indices) if cols is not None: self._data.columns = cols self._cursor = 0
[docs] def next(self, num_rows=10): """ Get the next chunk of data. Args: num_rows (int): Number of rows to fetch """ start = self._cursor stop = start + num_rows self._cursor += num_rows return self._data[start:stop]
[docs] def reset(self): """ Reset the cursor. """ self._cursor = 0
[docs]class DummyXArray: """Generate dummy data of type XArray.""" def __init__( self, num_time=1000, num_space=5, rate=10, jitter=0.05, start_date="2018-01-01", seed=42, round=6, ): """ Initialize the dataframe. Args: num_time (int): Number of rows num_space (int): Number of columns rate (float): Frequency, in Hertz jitter (float): Amount of jitter, relative to rate start_date (string): Start date seed (int): Seed for random number generation round (int): Number of decimals for random numbers """ np.random.seed(seed) frequency = 1 / rate times = pd.date_range( start=start_date, periods=num_time, freq=pd.DateOffset(seconds=frequency) ) jitter = frequency * jitter deltas = pd.to_timedelta(np.random.uniform(-jitter, jitter, num_time), unit="s") times = times + deltas locs = np.arange(num_space) data = np.random.rand(num_time, num_space).round(round) self._data = xr.DataArray(data, coords=[times, locs], dims=["time", "space"]) self._cursor = 0
[docs] def next(self, num_rows=10): """ Get the next chunk of data. Args: num_rows (int): Number of rows to fetch """ start = self._cursor stop = start + num_rows self._cursor += num_rows return self._data.isel({"time": np.arange(start, stop)})
[docs] def reset(self): """ Reset the cursor. """ self._cursor = 0
[docs]class ReadData: """Generate custom data.""" def __init__(self, data): """ Initialize the dataframe. Args: data (DataFrame): custom data to stream. """ self._data = data self._cursor = 0
[docs] def next(self, num_rows=10): """ Get the next chunk of data. Args: num_rows (int): Number of rows to fetch """ start = self._cursor stop = start + num_rows self._cursor += num_rows return self._data.iloc[start:stop]
[docs] def reset(self): """ Reset the cursor. """ self._cursor = 0
[docs]class Looper: """Mimics the scheduler behavior to allow testing the output of a node offline.""" def __init__(self, generator, node, input_port="i", output_port="o"): """Initialize the helper :param generator (Node): timeflux node to test :param data (Object): data generator object with a method `next` and `reset` """ self._generator = generator self._node = node self._input_port = input_port self._output_port = output_port
[docs] def run(self, chunk_size=None): """Loop across chunks of a generator, update the node and return data and meta. :param chunk_size (int): number of samples per chunk :return: output_data (DataFrame): concatenated output data output_meta: list of meta """ chunk_size = chunk_size or len(self._generator._data) # mimic the scheduler end_of_data = False output_data = [] output_meta = [] while not end_of_data: self._node.clear() chunk = self._generator.next(chunk_size) i = getattr(self._node, self._input_port) i.data = chunk.copy() self._node.update() o = getattr(self._node, self._output_port) output_data.append(o.data) output_meta.append(o.meta) end_of_data = chunk.empty output_data = pd.concat(output_data) return output_data, output_meta