Skip to content

Commit

Permalink
🚧 DOT graphs
Browse files Browse the repository at this point in the history
  • Loading branch information
shnizzedy committed Apr 30, 2021
1 parent 310d629 commit c5f322e
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 10 deletions.
6 changes: 5 additions & 1 deletion CPAC/pipeline/cpac_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def run_workflow(sub_dict, c, run, pipeline_timing_info=None, p_name=None,
list of pipeline info for reporting timing information
p_name : string (optional); default=None
name of pipeline
plugin : string (optional); defaule='MultiProc'
plugin : string (optional); default='MultiProc'
nipype plugin to utilize when the workflow is ran
plugin_args : dictionary (optional); default=None
plugin-specific arguments for the workflow plugin
Expand Down Expand Up @@ -359,6 +359,10 @@ def run_workflow(sub_dict, c, run, pipeline_timing_info=None, p_name=None,
subject_id, sub_dict, c, p_name, num_ants_cores
)

for graph2use in {'orig', 'colored', 'exec'}:
workflow.write_graph(os.path.join(
log_dir, f'{graph2use}.dot'), graph2use)

if test_config:
import sys
logger.info('This has been a test of the pipeline configuration '
Expand Down
204 changes: 195 additions & 9 deletions CPAC/pipeline/nipype_pipeline_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@
for C-PAC-specific documentation.
See https://nipype.readthedocs.io/en/latest/api/generated/nipype.pipeline.engine.html
for Nipype's documentation.''' # noqa E501
import os
from copy import deepcopy
from functools import partialmethod
from nipype import logging
from nipype.pipeline import engine as pe
from nipype.pipeline.engine.utils import get_print_name
from functools import partialmethod
from nipype.pipeline.engine.utils import (
_create_dot_graph,
generate_expanded_graph,
get_print_name,
_replacefunk,
_run_dot
)
from nipype.utils.filemanip import fname_presuffix

logger = logging.getLogger("nipype.workflow")

Expand Down Expand Up @@ -95,7 +104,8 @@ def _get_dot(
if level > len(colorset) - 2:
level = 3 # Loop back to blue

dotlist = ['%slabel="%s";' % (prefix, self.name)]
dotlist = ['%slabel="%s";' % (
f'"{prefix}"' if len(prefix.strip()) else prefix, self.name)]
for node in nx.topological_sort(self._graph):
fullname = ".".join(hierarchy + [node.fullname])
nodename = fullname.replace(".", "_")
Expand All @@ -106,7 +116,7 @@ def _get_dot(
if hasattr(node, "iterables") and node.iterables:
dotlist.append(
(
'%s[label="%s", shape=box3d,'
'"%s"[label="%s", shape=box3d,'
"style=filled, color=black, colorscheme"
"=greys7 fillcolor=2];"
)
Expand All @@ -115,20 +125,20 @@ def _get_dot(
else:
if colored:
dotlist.append(
('%s[label="%s", style=filled,'
('"%s"[label="%s", style=filled,'
' fillcolor="%s"];')
% (nodename, node_class_name, colorset[level])
)
else:
dotlist.append(
('%s[label="%s"];') % (nodename, node_class_name)
('"%s"[label="%s"];') % (nodename, node_class_name)
)

for node in nx.topological_sort(self._graph):
if isinstance(node, Workflow):
fullname = ".".join(hierarchy + [node.fullname])
nodename = fullname.replace(".", "_")
dotlist.append("subgraph cluster_%s {" % nodename)
dotlist.append("subgraph \"cluster_%s\" {" % nodename)
if colored:
dotlist.append(
prefix + prefix + 'edge [color="%s"];' % (
Expand Down Expand Up @@ -162,7 +172,7 @@ def _get_dot(
for _ in self._graph.get_edge_data(
node, subnode
)["connect"]:
dotlist.append("%s -> %s;" % (
dotlist.append('"%s" -> "%s";' % (
nodename, subnodename))
logger.debug("connection: %s", dotlist[-1])
# add between workflow connections
Expand All @@ -188,8 +198,184 @@ def _get_dot(
vname1 += "." + ".".join(dest.split(".")[:-1])
if uname1.split(".")[:-1] != vname1.split(".")[:-1]:
dotlist.append(
"%s -> %s;"
'"%s" -> "%s";'
% (uname1.replace(".", "_"), vname1.replace(".", "_"))
)
logger.debug("cross connection: %s", dotlist[-1])
return ("\n" + prefix).join(dotlist)

def write_hierarchical_dotfile(
self, dotfilename=None, colored=False, simple_form=True
):
dotlist = ["digraph \"%s\"{" % self.name]
dotlist.append(
self._get_dot(prefix=" ", colored=colored,
simple_form=simple_form)
)
dotlist.append("}")
dotstr = "\n".join(dotlist)
if dotfilename:
fp = open(dotfilename, "wt")
fp.writelines(dotstr)
fp.close()
else:
logger.info(dotstr)


def export_graph(
graph_in,
base_dir=None,
show=False,
use_execgraph=False,
show_connectinfo=False,
dotfilename="graph.dot",
format="png",
simple_form=True,
):
"""Displays the graph layout of the pipeline
This function requires that pygraphviz and matplotlib are available on
the system.
Parameters
----------
show : boolean
Indicate whether to generate pygraphviz output fromn
networkx. default [False]
use_execgraph : boolean
Indicates whether to use the specification graph or the
execution graph. default [False]
show_connectioninfo : boolean
Indicates whether to show the edge data on the graph. This
makes the graph rather cluttered. default [False]
"""
import networkx as nx

graph = deepcopy(graph_in)
if use_execgraph:
graph = generate_expanded_graph(graph)
logger.debug("using execgraph")
else:
logger.debug("using input graph")
if base_dir is None:
base_dir = os.getcwd()

os.makedirs(base_dir, exist_ok=True)
out_dot = fname_presuffix(
dotfilename, suffix="_detailed.dot", use_ext=False, newpath=base_dir
)
_write_detailed_dot(graph, out_dot)

# Convert .dot if format != 'dot'
outfname, res = _run_dot(out_dot, format_ext=format)
if res is not None and res.runtime.returncode:
logger.warning("dot2png: %s", res.runtime.stderr)

pklgraph = _create_dot_graph(graph, show_connectinfo, simple_form)
simple_dot = fname_presuffix(
dotfilename, suffix=".dot", use_ext=False, newpath=base_dir
)
nx.drawing.nx_pydot.write_dot(pklgraph, simple_dot)

# Convert .dot if format != 'dot'
simplefname, res = _run_dot(simple_dot, format_ext=format)
if res is not None and res.runtime.returncode:
logger.warning("dot2png: %s", res.runtime.stderr)

if show:
pos = nx.graphviz_layout(pklgraph, prog="dot")
nx.draw(pklgraph, pos)
if show_connectinfo:
nx.draw_networkx_edge_labels(pklgraph, pos)

return simplefname if simple_form else outfname


def _write_detailed_dot(graph, dotfilename):
r"""
Create a dot file with connection info ::
digraph structs {
node [shape=record];
struct1 [label="<f0> left|<f1> middle|<f2> right"];
struct2 [label="<f0> one|<f1> two"];
struct3 [label="hello\nworld |{ b |{c|<here> d|e}| f}| g | h"];
struct1:f1 -> struct2:f0;
struct1:f0 -> struct2:f1;
struct1:f2 -> struct3:here;
}
"""
import networkx as nx

text = ["digraph structs {", "node [shape=record];"]
# write nodes
edges = []
for n in nx.topological_sort(graph):
nodename = n.itername
inports = []
for u, v, d in graph.in_edges(nbunch=n, data=True):
for cd in d["connect"]:
if isinstance(cd[0], (str, bytes)):
outport = cd[0]
else:
outport = cd[0][0]
inport = cd[1]
ipstrip = "in%s" % _replacefunk(inport)
opstrip = "out%s" % _replacefunk(outport)
edges.append(
'"%s":"%s":e -> "%s":"%s":w;'
% (
u.itername.replace(".", ""),
opstrip,
v.itername.replace(".", ""),
ipstrip,
)
)
if inport not in inports:
inports.append(inport)
inputstr = (
["{IN"]
+ ["|<in%s> %s" % (_replacefunk(ip), ip) for ip in sorted(inports)]
+ ["}"]
)
outports = []
for u, v, d in graph.out_edges(nbunch=n, data=True):
for cd in d["connect"]:
if isinstance(cd[0], (str, bytes)):
outport = cd[0]
else:
outport = cd[0][0]
if outport not in outports:
outports.append(outport)
outputstr = (
["{OUT"]
+ [
"|<out%s> %s" % (_replacefunk(oport), oport)
for oport in sorted(outports)
]
+ ["}"]
)
srcpackage = ""
if hasattr(n, "_interface"):
pkglist = n.interface.__class__.__module__.split(".")
if len(pkglist) > 2:
srcpackage = pkglist[2]
srchierarchy = ".".join(nodename.split(".")[1:-1])
nodenamestr = "{ %s | %s | %s }" % (
nodename.split(".")[-1],
srcpackage,
srchierarchy,
)
text += [
'"%s" [label="%s|%s|%s"];'
% (
nodename.replace(".", ""),
"".join(inputstr),
nodenamestr,
"".join(outputstr),
)
]
# write edges
for edge in sorted(edges):
text.append(edge)
text.append("}")
with open(dotfilename, "wt") as filep:
filep.write("\n".join(text))
return text

0 comments on commit c5f322e

Please sign in to comment.