Source code for timeflux.core.worker

"""timeflux.core.worker: spawn processes."""

import importlib
import logging
import signal
from multiprocessing import Process
from timeflux.core.logging import get_queue, init_worker
from timeflux.core.graph import Graph
from timeflux.core.scheduler import Scheduler
from timeflux.core.registry import Registry
from timeflux.core.exceptions import *


[docs]class Worker: """Spawn a process and launch a scheduler.""" def __init__(self, graph): self._graph = graph
[docs] def run(self): """Run the process""" p = Process(target=self._run, args=(get_queue(),), name=self._graph["id"]) p.start() return p
[docs] def load(self): # Build the graph and compute the traversal path g = Graph(self._graph) graph = g.build() path = g.traverse() # Set rate Registry.rate = self._graph["rate"] # Load nodes nodes = {} for step in path: node = self._load_node(graph.nodes[step["node"]], step["node"]) nodes[step["node"]] = node return path, nodes
def _run(self, log_queue=None): # Initialize logging if log_queue: init_worker(log_queue) logger = logging.getLogger(__name__) scheduler = None try: # Initialize the graph and instantiate the nodes path, nodes = self.load() # Launch scheduler and run it scheduler = Scheduler(path, nodes, self._graph["rate"]) scheduler.run() except KeyboardInterrupt: # Ignore further interrupts signal.signal(signal.SIGINT, signal.SIG_IGN) logger.debug("Interrupting") except ( GraphDuplicateNode, GraphUndefinedNode, WorkerLoadError, ValidationError, ) as error: logger.error(error) except WorkerInterrupt as error: logger.debug(error) except Exception as error: logger.exception(error) if scheduler is not None: logger.info("Terminating") scheduler.terminate() def _load_node(self, node, nid): """Import a module and instantiate class.""" # Import module try: m = importlib.import_module(node["module"]) except ModuleNotFoundError as error: if node["module"] in error.msg: # Missing or invalid node raise WorkerLoadError( f"Node '{nid}': no module named '{node['module']}'" ) else: # Missing or invalid dependency raise error # Get class try: c = getattr(m, node["class"]) except AttributeError: raise WorkerLoadError( f"Node '{nid}': no class named '{node['class']}' in module '{node['module']}'" ) # Instantiate class try: n = c(**node["params"]) except TypeError as error: raise WorkerLoadError(f"Node '{nid}': {error}") return n