Source code for timeflux.core.manager

"""timeflux.core.manager: manage workers"""

import logging
import sys
import os
import time
import signal
import json
import yaml
from jinja2 import Template
from timeflux.core.validate import validate
from timeflux.core.worker import Worker

[docs]class Manager: """Load configuration and spawn workers.""" def __init__(self, config): """Load configuration Args: config (string|dict): The configuration can either be a path to a YAML or JSON file or a dict. """ # Initialize logger self.logger = logging.getLogger(__name__) # Hold the names of the imported applications self._imports = [] # Hold the graphs self._graphs = [] # Hold the children processes self._processes = [] # Load application if isinstance(config, dict): app = config path = os.getcwd() elif isinstance(config, str): app = self._load_file(config) self._imports.append(os.path.abspath(config)) path = os.path.dirname(self._imports[0]) else: raise ValueError("Could not load application file") # Validate validate(app) # Populate the graph list if "graphs" in app: self._graphs = app["graphs"] # Import sub applications self._import(app, path) # Update the search path for app in self._imports: path = os.path.dirname(app) if path not in sys.path: sys.path.append(path)
[docs] def run(self): """Launch as many workers as there are graphs.""" try: # Launch workers self._launch() # Monitor them self._monitor() except KeyboardInterrupt: # Ignore further interrupts signal.signal(signal.SIGINT, signal.SIG_IGN)"Interrupting") # Terminate gracefully self._terminate()
def _launch(self): """Launch workers.""" for graph in self._graphs: worker = Worker(graph) process = self._processes.append(process) self.logger.debug("Worker spawned with PID %d", def _monitor(self): """Wait for at least one worker to terminate.""" if not self._processes: return while True: if any(not process.is_alive() for process in self._processes): return time.sleep(0.1) def _terminate(self): """Terminate all workers.""" # interrupt = signal.CTRL_C_EVENT if sys.platform == "win32" else signal.SIGINT # Try to terminate gracefully for process in self._processes: if process.is_alive(): os.kill(, interrupt) # Wait 10 seconds and kill the remaining ones self._wait(10) for process in self._processes: if process.is_alive(): process.terminate() def _wait(self, timeout=None): """Wait for all workers to die.""" if not self._processes: return start = time.time() while True: try: if all(not process.is_alive() for process in self._processes): # All the workers are dead return if timeout and time.time() - start >= timeout: # Timeout return time.sleep(0.1) except: pass def _import(self, app, path): if not "import" in app: return old_path = os.getcwd() os.chdir(path) for filename in app["import"]: filename = os.path.abspath(filename) if filename in self._imports: self.logger.debug("Application %s will not be loaded twice", filename) continue self.logger.debug("Importing %s", filename) self._imports.append(filename) sub = self._load_file(filename) try: validate(sub) except ValueError as error: raise ValueError(f"Validation failed ({filename})") if "graphs" in sub: self._graphs += sub["graphs"] self._import(sub, os.path.dirname(filename)) os.chdir(old_path) def _load_file(self, filename): # Read file as string with open(filename) as stream: app = # Parse the template app = self._parse_template(app) # Return a dict extension = filename.split(".")[-1] if extension in ("yml", "yaml"): return yaml.safe_load(app) elif extension == "json": return json.loads(app) def _parse_template(self, template): template = Template(template) return template.render(dict(os.environ))