diff --git a/CPAC/pipeline/cpac_pipeline.py b/CPAC/pipeline/cpac_pipeline.py index 7f228c280e..c0664caa15 100644 --- a/CPAC/pipeline/cpac_pipeline.py +++ b/CPAC/pipeline/cpac_pipeline.py @@ -179,7 +179,7 @@ def run_workflow(sub_dict, c, run, pipeline_timing_info=None, p_name=None, list of pipeline info for reporting timing information p_name : string (optional); default=None name of pipeline - plugin : string (optional); defaule='MultiProc' + plugin : string (optional); default='MultiProc' nipype plugin to utilize when the workflow is ran plugin_args : dictionary (optional); default=None plugin-specific arguments for the workflow plugin @@ -359,6 +359,10 @@ def run_workflow(sub_dict, c, run, pipeline_timing_info=None, p_name=None, subject_id, sub_dict, c, p_name, num_ants_cores ) + for graph2use in {'orig', 'colored', 'exec'}: + workflow.write_graph(os.path.join( + log_dir, f'{graph2use}.dot'), graph2use) + if test_config: import sys logger.info('This has been a test of the pipeline configuration ' diff --git a/CPAC/pipeline/nipype_pipeline_engine/engine.py b/CPAC/pipeline/nipype_pipeline_engine/engine.py index 5b46ce2b25..0d515bc280 100644 --- a/CPAC/pipeline/nipype_pipeline_engine/engine.py +++ b/CPAC/pipeline/nipype_pipeline_engine/engine.py @@ -3,10 +3,19 @@ for C-PAC-specific documentation. See https://nipype.readthedocs.io/en/latest/api/generated/nipype.pipeline.engine.html for Nipype's documentation.''' # noqa E501 +import os +from copy import deepcopy +from functools import partialmethod from nipype import logging from nipype.pipeline import engine as pe -from nipype.pipeline.engine.utils import get_print_name -from functools import partialmethod +from nipype.pipeline.engine.utils import ( + _create_dot_graph, + generate_expanded_graph, + get_print_name, + _replacefunk, + _run_dot +) +from nipype.utils.filemanip import fname_presuffix logger = logging.getLogger("nipype.workflow") @@ -95,7 +104,8 @@ def _get_dot( if level > len(colorset) - 2: level = 3 # Loop back to blue - dotlist = ['%slabel="%s";' % (prefix, self.name)] + dotlist = ['%slabel="%s";' % ( + f'"{prefix}"' if len(prefix.strip()) else prefix, self.name)] for node in nx.topological_sort(self._graph): fullname = ".".join(hierarchy + [node.fullname]) nodename = fullname.replace(".", "_") @@ -106,7 +116,7 @@ def _get_dot( if hasattr(node, "iterables") and node.iterables: dotlist.append( ( - '%s[label="%s", shape=box3d,' + '"%s"[label="%s", shape=box3d,' "style=filled, color=black, colorscheme" "=greys7 fillcolor=2];" ) @@ -115,20 +125,20 @@ def _get_dot( else: if colored: dotlist.append( - ('%s[label="%s", style=filled,' + ('"%s"[label="%s", style=filled,' ' fillcolor="%s"];') % (nodename, node_class_name, colorset[level]) ) else: dotlist.append( - ('%s[label="%s"];') % (nodename, node_class_name) + ('"%s"[label="%s"];') % (nodename, node_class_name) ) for node in nx.topological_sort(self._graph): if isinstance(node, Workflow): fullname = ".".join(hierarchy + [node.fullname]) nodename = fullname.replace(".", "_") - dotlist.append("subgraph cluster_%s {" % nodename) + dotlist.append("subgraph \"cluster_%s\" {" % nodename) if colored: dotlist.append( prefix + prefix + 'edge [color="%s"];' % ( @@ -162,7 +172,7 @@ def _get_dot( for _ in self._graph.get_edge_data( node, subnode )["connect"]: - dotlist.append("%s -> %s;" % ( + dotlist.append('"%s" -> "%s";' % ( nodename, subnodename)) logger.debug("connection: %s", dotlist[-1]) # add between workflow connections @@ -188,8 +198,184 @@ def _get_dot( vname1 += "." + ".".join(dest.split(".")[:-1]) if uname1.split(".")[:-1] != vname1.split(".")[:-1]: dotlist.append( - "%s -> %s;" + '"%s" -> "%s";' % (uname1.replace(".", "_"), vname1.replace(".", "_")) ) logger.debug("cross connection: %s", dotlist[-1]) return ("\n" + prefix).join(dotlist) + + def write_hierarchical_dotfile( + self, dotfilename=None, colored=False, simple_form=True + ): + dotlist = ["digraph \"%s\"{" % self.name] + dotlist.append( + self._get_dot(prefix=" ", colored=colored, + simple_form=simple_form) + ) + dotlist.append("}") + dotstr = "\n".join(dotlist) + if dotfilename: + fp = open(dotfilename, "wt") + fp.writelines(dotstr) + fp.close() + else: + logger.info(dotstr) + + +def export_graph( + graph_in, + base_dir=None, + show=False, + use_execgraph=False, + show_connectinfo=False, + dotfilename="graph.dot", + format="png", + simple_form=True, +): + """Displays the graph layout of the pipeline + This function requires that pygraphviz and matplotlib are available on + the system. + Parameters + ---------- + show : boolean + Indicate whether to generate pygraphviz output fromn + networkx. default [False] + use_execgraph : boolean + Indicates whether to use the specification graph or the + execution graph. default [False] + show_connectioninfo : boolean + Indicates whether to show the edge data on the graph. This + makes the graph rather cluttered. default [False] + """ + import networkx as nx + + graph = deepcopy(graph_in) + if use_execgraph: + graph = generate_expanded_graph(graph) + logger.debug("using execgraph") + else: + logger.debug("using input graph") + if base_dir is None: + base_dir = os.getcwd() + + os.makedirs(base_dir, exist_ok=True) + out_dot = fname_presuffix( + dotfilename, suffix="_detailed.dot", use_ext=False, newpath=base_dir + ) + _write_detailed_dot(graph, out_dot) + + # Convert .dot if format != 'dot' + outfname, res = _run_dot(out_dot, format_ext=format) + if res is not None and res.runtime.returncode: + logger.warning("dot2png: %s", res.runtime.stderr) + + pklgraph = _create_dot_graph(graph, show_connectinfo, simple_form) + simple_dot = fname_presuffix( + dotfilename, suffix=".dot", use_ext=False, newpath=base_dir + ) + nx.drawing.nx_pydot.write_dot(pklgraph, simple_dot) + + # Convert .dot if format != 'dot' + simplefname, res = _run_dot(simple_dot, format_ext=format) + if res is not None and res.runtime.returncode: + logger.warning("dot2png: %s", res.runtime.stderr) + + if show: + pos = nx.graphviz_layout(pklgraph, prog="dot") + nx.draw(pklgraph, pos) + if show_connectinfo: + nx.draw_networkx_edge_labels(pklgraph, pos) + + return simplefname if simple_form else outfname + + +def _write_detailed_dot(graph, dotfilename): + r""" + Create a dot file with connection info :: + digraph structs { + node [shape=record]; + struct1 [label=" left| middle| right"]; + struct2 [label=" one| two"]; + struct3 [label="hello\nworld |{ b |{c| d|e}| f}| g | h"]; + struct1:f1 -> struct2:f0; + struct1:f0 -> struct2:f1; + struct1:f2 -> struct3:here; + } + """ + import networkx as nx + + text = ["digraph structs {", "node [shape=record];"] + # write nodes + edges = [] + for n in nx.topological_sort(graph): + nodename = n.itername + inports = [] + for u, v, d in graph.in_edges(nbunch=n, data=True): + for cd in d["connect"]: + if isinstance(cd[0], (str, bytes)): + outport = cd[0] + else: + outport = cd[0][0] + inport = cd[1] + ipstrip = "in%s" % _replacefunk(inport) + opstrip = "out%s" % _replacefunk(outport) + edges.append( + '"%s":"%s":e -> "%s":"%s":w;' + % ( + u.itername.replace(".", ""), + opstrip, + v.itername.replace(".", ""), + ipstrip, + ) + ) + if inport not in inports: + inports.append(inport) + inputstr = ( + ["{IN"] + + ["| %s" % (_replacefunk(ip), ip) for ip in sorted(inports)] + + ["}"] + ) + outports = [] + for u, v, d in graph.out_edges(nbunch=n, data=True): + for cd in d["connect"]: + if isinstance(cd[0], (str, bytes)): + outport = cd[0] + else: + outport = cd[0][0] + if outport not in outports: + outports.append(outport) + outputstr = ( + ["{OUT"] + + [ + "| %s" % (_replacefunk(oport), oport) + for oport in sorted(outports) + ] + + ["}"] + ) + srcpackage = "" + if hasattr(n, "_interface"): + pkglist = n.interface.__class__.__module__.split(".") + if len(pkglist) > 2: + srcpackage = pkglist[2] + srchierarchy = ".".join(nodename.split(".")[1:-1]) + nodenamestr = "{ %s | %s | %s }" % ( + nodename.split(".")[-1], + srcpackage, + srchierarchy, + ) + text += [ + '"%s" [label="%s|%s|%s"];' + % ( + nodename.replace(".", ""), + "".join(inputstr), + nodenamestr, + "".join(outputstr), + ) + ] + # write edges + for edge in sorted(edges): + text.append(edge) + text.append("}") + with open(dotfilename, "wt") as filep: + filep.write("\n".join(text)) + return text