Source code for timeflux.nodes.monitor

"""Monitor a signal"""

from timeflux.core.node import Node


[docs]class Monitor(Node):
[docs] def update(self): status = 0.0 if self.i.data is not None: if not self.i.data.empty: status = 1.0 self.o.set([status], names=["streaming"])