Source code for timeflux.helpers.viz

#!/usr/bin/env python

"""Export Timeflux apps as images.

This module can be imported or used as a standalone tool. It is quite useful to visually
inspect complex apps. It offers a graphical representation of multiple direcred acyclic
graphs. It uses Graphviz under the hood, and outputs the `dot` representation on the
standard output, so it can optionnally be redirected to a file for further processing.

Example:
    ``python -m timeflux.helpers.viz foobar.yaml``

"""

import sys
import os
from dotenv import load_dotenv
from timeflux.core.manager import Manager
from timeflux.core.graph import Graph

try:
    import graphviz as gv
except ModuleNotFoundError:
    raise SystemExit(
        "Graphviz is not installed. Optional dependencies can be installed with: 'pip install timeflux[opt]'."
    )


[docs]def yaml_to_png(filename, format="png", sort=False): """Generate an image from a YAML application file. Args: filename (string): The path to the YAML application file. format (string): The image format. Default: `png`. sort (boolean): If `True`, the graphs will be sorted in the same topological order that is used to run the application. Default: `False`. """ # Load environment data load_dotenv(".env") # Load graphs graphs = Manager(filename)._graphs # Default graph attributes graph_attr = { "splines": "spline", "rankdir": "LR", "style": "filled", "fillcolor": "lightgrey", "color": "black", "fontname": "helvetica", "fontsize": "16", } # Default node attributes node_attr = { "shape": "ellipse", "style": "filled", "fillcolor": "white", "color": "black", "fontname": "helvetica", "fontsize": "14", } # Default edge attributes edge_attr = { "fontname": "helvetica", "fontsize": "9", "fontcolor": "snow4", "labeldistance": "1.5", "labelangle": "-25", } # Special nodes broker = None publishers = [] subscribers = [] # Initialize the viz using the default dot engine dot = gv.Digraph( format=format, graph_attr=graph_attr, node_attr=node_attr, edge_attr=edge_attr ) # Add clusters for ci, graph in enumerate(graphs): cluster_name = f"cluster_{ci}" with dot.subgraph(name=cluster_name) as cluster: # Set label if "id" in graph: cluster.attr(label=graph["id"]) # Add nodes nodes = {} for ni, node in enumerate(graph["nodes"]): node_name = f"{cluster_name}_node_{ni}" nodes[node["id"]] = node_name cluster.node(node_name, label=node["id"]) # Check for special nodes if node["module"] == "timeflux.nodes.zmq": if node["class"] in ("Broker", "BrokerMonitored", "BrokerLVC"): broker = node_name elif node["class"] == "Pub": publishers.append( {"name": node_name, "topic": node["params"]["topic"]} ) elif node["class"] == "Sub": subscribers.append( {"name": node_name, "topics": node["params"]["topics"]} ) # Add edges if "edges" in graph: for edge in graph["edges"]: src = edge["source"].split(":") dst = edge["target"].split(":") taillabel = src[1] if len(src) == 2 else "" headlabel = dst[1] if len(dst) == 2 else "" cluster.edge( nodes[src[0]], nodes[dst[0]], taillabel=taillabel, headlabel=headlabel, ) # Sort nodes according to the topoligical order of the graph # This is still a bit hacky and should be considered as a WIP. if sort: path = Graph(graph).traverse() edge = [] for node in path: edge.append(nodes[node["node"]]) if len(edge) == 2: cluster.edge(edge[0], edge[1], style="invis", weight="100") edge = [edge[1]] # Add special edges if broker: dot.attr("edge", style="dashed") for publisher in publishers: dot.edge(publisher["name"], broker, label=publisher["topic"]) for subscriber in subscribers: for topic in subscriber["topics"]: dot.edge(broker, subscriber["name"], label=topic) print(dot.source) out = os.path.splitext(os.path.basename(filename))[0] dot.render(out, cleanup=True, view=True)
if __name__ == "__main__": if len(sys.argv) != 2: sys.exit() yaml_to_png(sys.argv[1], sort=False)