Source code for timeflux_example.nodes.dynamic

"""Illustrates dynamic inputs and outputs."""

import random
from timeflux.core.node import Node


[docs]class Outputs(Node): """Randomly generate dynamic outputs. At each update, this node generates a random number of outputs and sets the default output to the number it has created. Attributes: o (Port): Default output, provides DataFrame. o_* (Port): Dynamic outputs. Args: seed (int): The random number generator seed. prefix (string): The prefix to add to each dynamic output. Example: .. literalinclude:: /../examples/dynamic_prefixed.yaml :language: yaml """ def __init__(self, prefix=None, seed=None): random.seed(seed) self.prefix = "" if prefix is None else prefix + "_"
[docs] def update(self): # Lazily create new ports for i in range(random.randint(0, 10)): getattr(self, "o_" + self.prefix + str(i)) # Count outputs = len(list(self.iterate("o_*"))) # Set default output self.o.set([[outputs]], names=["outputs"])
[docs]class Inputs(Node): """Count the dynamic outputs. At each update, this node loops over all dynamic inputs and sets the default output to the number it has found. Attributes: i_* (Port): Dynamic inputs. o (Port): Default output, provides DataFrame. Args: prefix (string): The prefix to add to match dynamic inputs. Example: .. literalinclude:: /../examples/dynamic.yaml :language: yaml """ def __init__(self, prefix=None): self.prefix = "" if prefix is None else prefix + "_"
[docs] def update(self): # Count inputs = len(list(self.iterate("i_" + self.prefix + "*"))) # Set default output self.o.set([[inputs]], names=["inputs"])