Source code for timeflux.nodes.events

"""Generate random events"""

import copy
import datetime
import json
import random
import string

import numpy as np

from timeflux.core.node import Node


[docs]class Events(Node): def __init__( self, rows_min=1, rows_max=10, string_min=3, string_max=12, items_min=0, items_max=5, seed=None, ): """Return random integers from value_min to value_max (inclusive)""" self._rows_min = rows_min self._rows_max = rows_max self._string_min = string_min self._string_max = string_max self._items_min = items_min self._items_max = items_max random.seed(seed)
[docs] def random_string(self, length): return "".join(random.choice(string.ascii_letters) for m in range(length))
[docs] def update(self): rows = [] for i in range(random.randint(self._rows_min, self._rows_max)): row = [] row.append( self.random_string(random.randint(self._string_min, self._string_max)) ) data = {} for j in range(random.randint(self._items_min, self._items_max)): key = self.random_string( random.randint(self._string_min, self._string_max) ) value = self.random_string( random.randint(self._string_min, self._string_max) ) data[key] = value row.append(json.dumps(data)) rows.append(row) self.o.set(rows, names=["label", "data"])
[docs]class Periodic(Node): """Node that sends events at a regular interval. This node sends regular events after the first time the update method is called. If the update method is called at time `t`, and this node has a interval `ti` and phase `ts`, then the first event will be at `t + ts`. Then there will be one event at `t + ts + k ti` where `k` is 1, 2, ... Args: label (str): Event name that will be generated by this node. data (dict): Dictionary sent in each event. interval (dict): Dictionary with keyword arguments passed to `datetime.timedelta` to define the time interval between events. This can be seconds, milliseconds, microseconds, etc. phase (dict): Dictionary with keyword arguments passed to `datetime.timedelta` to define a phase for the stimulations. The first stimulation will happen after this time delta is observed. If not set, the phase will be as the interval. Attributes: o (Port): Default output, provides a pandas.DataFrame with events. Examples: The following YAML can be used to generate events every half second but only after 5 seconds have elapsed .. code-block:: yaml graphs: - nodes: - id: clock module: timeflux.nodes.events class: Periodic params: label: my-event-label interval: milliseconds: 500 phase: seconds: 5 - id: display module: timeflux.nodes.debug class: Display rate: 20 edges: - source: clock target: display """ def __init__(self, label="clock", data=None, interval=None, phase=None): super().__init__() interval = interval or {} phase = phase or interval delta_interval = datetime.timedelta(**interval) delta_phase = datetime.timedelta(**phase) if delta_interval.total_seconds() <= 0: raise ValueError("Periodic must have positive interval") if delta_phase.total_seconds() <= 0: raise ValueError("Periodic must have positive phase") self._period = np.timedelta64(delta_interval) self._phase = np.timedelta64(delta_phase) self._next_timestamp = None self._label = label self._data = data
[docs] def update(self): now = np.datetime64(datetime.datetime.now()) if self._next_timestamp is None: self._next_timestamp = now + self._phase self.logger.debug( "Periodic will start sending events at %s", self._next_timestamp ) data = [] times = [] while self._next_timestamp < now: content = copy.deepcopy(self._data) data.append([self._label, content]) times.append(self._next_timestamp) self._next_timestamp += self._period if data: self.logger.debug("Sending %d events", len(data)) self.o.set(data, names=["label", "data"], timestamps=times)
def _set_next(self, reference): self._next_timestamp = reference + self._period