Source code for timeflux.nodes.random

"""timeflux.nodes.random: generate random data"""

import random
import time
import numpy as np
from timeflux.core.node import Node


[docs]class Random(Node): def __init__( self, columns=5, rows_min=2, rows_max=10, value_min=0, value_max=9, names=None, seed=None, ): """Return random integers between value_min and value_max (inclusive)""" self._rows_min = rows_min self._rows_max = rows_max self._value_min = value_min self._value_max = value_max self._names = names self._columns = len(names) if names else columns random.seed(seed) np.random.seed(seed)
[docs] def update(self): rows = random.randint(self._rows_min, self._rows_max) shape = (rows, self._columns) self.o.set( np.random.randint(self._value_min, self._value_max + 1, size=shape), names=self._names, )
[docs]class Signal(Node): def __init__(self, channels=5, rate=100, amplitude=200, names=None, seed=None): """Return random floats within the given peak-to-peak amplitude""" self._rate = rate self._amplitude = amplitude self._names = names self._channels = len(names) if names else channels self._updated = time.time() random.seed(seed) np.random.seed(seed)
[docs] def update(self): now = time.time() elapsed = now - self._updated self._updated = now samples = round(elapsed / (1 / self._rate)) if samples > 0: shape = (samples, self._channels) self.o.set( np.random.random_sample(shape) * self._amplitude - (self._amplitude / 2), names=self._names, meta={"rate": self._rate}, )