Source code for timeflux.core.node

"""Node base class."""

import re
import logging
from abc import ABC, abstractmethod
from timeflux.core.io import Port


[docs]class Node(ABC): def __new__(cls, *args, **kwargs): """Create instance and initialize the logger.""" instance = super().__new__(cls) instance.logger = logging.getLogger( "timeflux." + cls.__module__ + "." + cls.__name__ ) instance.ports = {} return instance def __init__(self): """Instantiate the node.""" pass def __getattr__(self, name): """Create input and output ports on the fly. Args: name (string): The name of the port, prefixed with `i_` (input) or `o_` (output). Default ports are also allowed (`i` or `o`). Returns: Port: The newly created port. """ if name == "i" or name.startswith("i_") or name == "o" or name.startswith("o_"): self.ports[name] = Port() setattr(self, name, self.ports[name]) return self.ports[name] raise AttributeError( f"type object '{type(self).__name__}' has no attribute '{name}'" )
[docs] def bind(self, source, target): """Create an alias of a port Args: source (string): The name of the source port target (string): The name of the target port """ if ( target == "i" or target.startswith("i_") or target == "o" or target.startswith("o_") ): getattr(self, source) # Create the source port if it does not already exist self.ports[target] = self.ports[source] setattr(self, target, self.ports[source])
[docs] def iterate(self, name="*"): """Iterate through ports. If ``name`` ends with the globbing character (`*`), the generator iterates through all existing ports matching that pattern. Otherwise, only one port is returned. If it does not already exist, it is automatically created. Args: name (string): The matching pattern. Yields: (tupple): A tupple containing: * name (`string`): The full port name. * suffix (`string`): The part of the name matching the globbing character. * port (`Port`): The port object. """ if name.endswith("*"): skip = len(name) - 1 name = name[:-1] for key, port in self.ports.items(): if key.startswith(name): yield key, key[skip:], port else: yield name, "", getattr(self, name)
[docs] def clear(self): """Reset all ports. It is assumed that numbered ports (i.e. those with a name ending with an underscore followed by numbers) are temporary and must be completely removed. The only exception is when they are bound to a default or named port. All other ports are simply emptied to avoid the cost of reinstanciating a new `Port` object before each update. """ if not hasattr(self, "_re_dynamic_port"): self._re_dynamic_port = re.compile(".*_[0-9]+$") remove = [] ids = [] for name, port in self.ports.items(): port.clear() if self._re_dynamic_port.match(name): if id(port) not in ids: remove.append(name) continue ids.append(id(port)) for name in remove: del self.ports[name] delattr(self, name)
[docs] @abstractmethod def update(self): """Update the input and output ports.""" pass
[docs] def terminate(self): """Perform cleanup upon termination.""" pass