# Copyright 2022-2025 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.
from collections import defaultdict
import enum
import itertools
import graphlib
import ramble.error
import ramble.util.graph
from ramble.util.naming import NS_SEPARATOR
from ramble.util.logger import logger
[docs]
class AttributeGraph:
node_type = "object"
def __init__(self, obj_inst):
self._obj_inst = obj_inst
self.node_definitions = {}
self.adj_list = {}
self._prepared = False
self._sorted = None
def _make_editable(self):
"""Make this graph editable, and remove any defined ordering"""
if self._prepared:
self._sorted = None
self._prepared = False
[docs]
def update_graph(self, node, dep_nodes=None, internal_order=False):
"""Update the graph with a new node and / or new dependencies.
Given a node, and list of dependencies, define new edges in the
graph. If the node is new, also construct a new phase node.
Args:
node (GraphNode): Node to inject or modify
dep_nodes (list(GraphNode) | None): List of nodes that are dependencies
internal_order (Boolean): True to process internal dependencies,
False to skip
"""
if dep_nodes is None:
dep_nodes = []
self._make_editable()
self.add_node(node)
self.define_edges(node, dep_nodes, internal_order=internal_order)
[docs]
def add_node(self, node):
"""Add a node to the graph
Args:
node (GraphNode): Node to add into graph
"""
self._make_editable()
if node.key not in self.node_definitions:
self.node_definitions[node.key] = node
if node not in self.adj_list:
self.adj_list[node] = set()
[docs]
def define_edges(self, node, dep_nodes=None, internal_order=False):
"""Define graph edges
Process dependencies, and internal orderings (inside the node object)
to define new graph edges.
Args:
node (GraphNode): Node to inject or modify
dep_nodes (list(GraphNode) | None): List of nodes that are dependencies
internal_order (Boolean): True to process internal dependencies,
False to skip
"""
if dep_nodes is None:
dep_nodes = []
for dep in dep_nodes:
if dep.key not in self.node_definitions:
self.node_definitions[dep.key] = dep
self.adj_list[dep] = set()
self.adj_list[node].add(dep)
if internal_order:
for dep in node._order_after:
dep_node = self.node_definitions[dep]
self.adj_list[node].add(dep_node)
for dep in node._order_before:
dep_node = self.node_definitions[dep]
self.adj_list[dep_node].add(node)
[docs]
def walk(self):
"""Walk the graph in topological ordering and yield each node.
Construct a topological ordering of the current graph, walk it, and
yield each node one by one.
Yields:
node (GraphNode): Each node in the graph
"""
if not self._prepared:
try:
sorter = graphlib.TopologicalSorter(self.adj_list)
except AttributeError:
logger.die(
"graphlib.TopologicalSorter is not found."
"Ensure requirements.txt are installed (including backports, where needed)."
)
try:
self._sorted = tuple(sorter.static_order())
except graphlib.CycleError as e:
try:
exp_name = self._obj_inst.expander.experiment_namespace
except AttributeError:
exp_name = self._obj_inst.name
raise GraphCycleError(
f"In experiment {exp_name} a cycle was detected "
f"when processing the {self.node_type} graph.\n" + str(e)
)
self._prepared = True
yield from self._sorted
[docs]
def get_node(self, key):
"""Given a key, return the node containing this key
Args:
key (str): Name of key to find in the graph
Returns:
(GraphNode): Node representing the key requested. Returns None if
the key isn't found.
"""
for node in self.walk():
if node.key == key:
return node
return None
[docs]
class PhaseGraph(AttributeGraph):
node_type = "phase"
def __init__(self, phase_definitions, obj_inst):
"""Construct a phase graph for a pipeline
Parse a single pipeline's phase definitions, and build an adjacency
list from this. Using the graph utiltites, construct a topological
sorting of the graph.
Args:
phase_definitions (dict): Definitions of phases. Should be of the
format {'phase_name': GraphNode}
obj_inst (obj): Object instance to extract phase functions from
"""
super().__init__(obj_inst)
# Define all graph nodes
for phase_node in phase_definitions.values():
if phase_node.obj_inst is None:
phase_node.obj_inst = obj_inst
if phase_node.attribute is None:
phase_func = getattr(obj_inst, f"_{phase_node.key}")
phase_node.set_attribute(phase_func)
self.add_node(phase_node)
# Define graph edges
for phase_node in phase_definitions.values():
self.define_edges(phase_node, internal_order=True)
[docs]
def add_node(self, node, obj_inst=None):
"""Add a new phase node to the graph
Extract the phase function from the object instance, and inject a new node into the graph.
Args:
node (GraphNode): Phase node to add into graph
obj_inst (Object): Object that owns the phase
"""
func_obj = obj_inst
if func_obj is None:
func_obj = self._obj_inst
phase_func = getattr(func_obj, f"_{node.key}")
node.set_attribute(phase_func)
super().add_node(node)
[docs]
def update_graph(self, phase_name, dependencies=None, internal_order=False, obj_inst=None):
"""Update the graph with a new phase and / or new dependencies.
Given a phase name, and list of dependencies, define new edges in the
graph. If the phase is new, also construct a new phase node.
Args:
phase_name (str): Name of the phase to inject or modify
dependencies (list(str) | None): List of phase names to inject dependencies on
internal_order (Boolean): True to process internal dependencies,
False to skip
obj_inst (object): Application or modifier instance to extract phase function from
"""
if dependencies is None:
dependencies = []
if self._prepared:
del self._sorted
self._sorted = None
self._prepared = False
if phase_name not in self.node_definitions:
phase_node = ramble.util.graph.GraphNode(phase_name)
self.add_node(phase_node, obj_inst)
phase_node = self.node_definitions[phase_name]
dep_nodes = []
for dep in dependencies:
if dep not in self.node_definitions:
dep_node = ramble.util.graph.GraphNode(dep)
self.add_node(dep_node, obj_inst)
dep_node = self.node_definitions[dep]
dep_nodes.append(dep_node)
super().define_edges(phase_node, dep_nodes)
[docs]
class ExecutableGraph(AttributeGraph):
"""Graph that handles command executables and builtins"""
node_type = "command executable"
supported_injection_orders = enum.Enum("supported_injection_orders", ["before", "after"])
def __init__(self, exec_order, executables, builtin_objects, builtin_groups, obj_inst):
"""Construct a new ExecutableGraph
Executable graphs have node attributes that are either of type
CommandExecutable, or are a function pointer to a builtin.
Args:
exec_order (list(str)): List of executable names in execution order
executables (dict): Dictionary of executable definitions.
Keys are executable names, values are CommandExecutables
builtin_objects (list(object)): List of objects to associate with each builtin
group (in order)
builtins (list(dict)): List of dictionaries containing definitions of builtins.
Keys are names values are configurations of builtins.
modifier_builtins (dict): Dictionary containing definitions of modifier builtins.
Keys are names values are configurations of builtins.
Modifier builtins are inserted between application builtins
and executables.
obj_inst (object): Object instance to extract attributes from (when necessary)
"""
super().__init__(obj_inst)
self._builtin_dependencies = defaultdict(list)
# Mapping from shorter_name -> fully qualified names
self._builtin_aliases = defaultdict(list)
# Define nodes for executable
for exec_name, cmd_exec in executables.items():
exec_node = ramble.util.graph.GraphNode(exec_name, cmd_exec, obj_inst=obj_inst)
self.node_definitions[exec_name] = exec_node
if exec_name in exec_order:
super().update_graph(exec_node)
# Define nodes for builtins
for builtin_obj, builtins in zip(builtin_objects, builtin_groups):
for builtin, blt_conf in builtins.items():
self._builtin_dependencies[builtin] = blt_conf["depends_on"].copy()
blt_func = getattr(builtin_obj, blt_conf["name"])
exec_node = ramble.util.graph.GraphNode(
builtin, attribute=blt_func, obj_inst=builtin_obj
)
self.node_definitions[builtin] = exec_node
self._build_builtin_aliases(builtin)
for builtin, blt_conf in builtins.items():
dependents = blt_conf["dependents"].copy()
for dependent in dependents:
self._builtin_dependencies[dependent].append(builtin)
dep_exec = None
for exec_name in exec_order:
if dep_exec is not None:
exec_node = self.node_definitions[exec_name]
dep_node = self.node_definitions[dep_exec]
super().update_graph(exec_node, [dep_node])
dep_exec = exec_name
head_node = None
tail_node = None
for node in self.walk():
if head_node is None:
head_node = node
tail_node = node
tail_prepend_builtin = None
tail_append_builtin = None
# Add (missing) required builtins
for builtins in builtin_groups:
for builtin, blt_conf in builtins.items():
if blt_conf["required"] and self.get_node(builtin) is None:
blt_node = self.node_definitions[builtin]
super().update_graph(blt_node)
# TODO: This should include `depends_on` as well.
relative = bool(blt_conf["dependents"])
if not relative and blt_conf["injection_method"] == "prepend":
if head_node is not None:
super().update_graph(head_node, [blt_node])
if tail_prepend_builtin is not None:
super().update_graph(blt_node, [tail_prepend_builtin])
tail_prepend_builtin = blt_node
elif not relative and blt_conf["injection_method"] == "append":
if tail_node is not None:
super().update_graph(blt_node, [tail_node])
if tail_append_builtin is not None:
super().update_graph(blt_node, [tail_append_builtin])
tail_append_builtin = blt_node
if blt_conf["depends_on"]:
deps = []
for dep in blt_conf["depends_on"]:
dep_node = self._resolve_builtin_node(dep)
super().update_graph(dep_node)
deps.append(dep_node)
exec_node = self.node_definitions[builtin]
super().update_graph(exec_node, deps)
if blt_conf["dependents"]:
exec_node = self.node_definitions[builtin]
super().update_graph(exec_node)
for dependent in blt_conf["dependents"]:
dependent_node = self._resolve_builtin_node(dependent)
super().update_graph(dependent_node, [exec_node])
[docs]
def inject_executable(self, exec_name, injection_order, relative):
"""Inject an executable into the graph
Args:
exec_name (str): Name of executable to inject
injection_order (str): Order for injection. Can be 'before' or 'after'
relative (str): Name of executable to inject relative to. Can be
None to inject relative to the whole set of executables.
"""
# Order can be 'before' or 'after.
# If `relative_to` is not set, then before adds to be the beginning of the list
# and after (default) adds to the end of the list
# If `relative_to` IS set, then before adds before the first instance of
# the executable in the list
# and after (default) adds after the last instance of the
# executable in the list
# If `relative_to` is set, and the executable name is not found, raise a fatal error.
exec_node = self.node_definitions[exec_name]
cur_exec_order = []
for node in self.walk():
cur_exec_order.append(node)
exp_name = self._obj_inst.expander.experiment_namespace
order = self.supported_injection_orders.after
if injection_order is not None:
if not hasattr(self.supported_injection_orders, injection_order):
logger.die(
"In experiment "
f'"{exp_name}" '
f'injection order of executable "{exec_name}" is set to an '
f'invalid value of "{injection_order}".\n'
f"Valid values are {self.supported_injection_orders}."
)
order = getattr(self.supported_injection_orders, injection_order)
if exec_name not in self.node_definitions:
logger.die(
"In experiment "
f'"{exp_name}" '
f'attempting to inject a non existing executable "{exec_name}".'
)
if relative is not None:
relative_error = False
if relative not in self.node_definitions:
relative_error = True
relative_node = self.node_definitions[relative]
if relative_node not in cur_exec_order:
relative_error = True
if relative_error:
logger.die(
"In experiment "
f'"{exp_name}" '
f'attempting to inject executable "{exec_name}" '
f'relative to a non existing executable "{relative}".'
)
relative_node = self.node_definitions[relative]
order_index = cur_exec_order.index(relative_node)
if order == self.supported_injection_orders.before:
super().update_graph(relative_node, [exec_node])
if order_index > 0:
super().update_graph(exec_node, [cur_exec_order[order_index - 1]])
elif order == self.supported_injection_orders.after:
super().update_graph(exec_node, [relative_node])
if order_index < len(cur_exec_order) - 1:
super().update_graph(cur_exec_order[order_index + 1], [exec_node])
else:
# If relative is none, determine head and tail nodes to inject properly
head_node = cur_exec_order[0]
tail_node = cur_exec_order[-1]
super().update_graph(exec_node)
if order == self.supported_injection_orders.before:
super().update_graph(head_node, [exec_node])
elif order == self.supported_injection_orders.after:
super().update_graph(exec_node, [tail_node])
# If exec_name is a builtin, inject edges to it's dependencies
if exec_name in self._builtin_dependencies:
dep_nodes = []
for dep in self._builtin_dependencies[exec_name]:
dep_node = self.node_definitions[dep]
dep_nodes.append(dep_node)
super().update_graph(exec_node, dep_nodes)
def _build_builtin_aliases(self, full_builtin_name):
ns_list_r = full_builtin_name.split(NS_SEPARATOR)[::-1][:-1]
for alias in itertools.accumulate(ns_list_r, lambda accu, ns: f"{ns}{NS_SEPARATOR}{accu}"):
self._builtin_aliases[alias].append(full_builtin_name)
def _resolve_builtin_node(self, builtin_name):
if builtin_name in self.node_definitions:
return self.node_definitions[builtin_name]
full_names = self._builtin_aliases.get(builtin_name, None)
if full_names is None:
raise GraphNodeNotFoundError(f"builtin {builtin_name} does not exist")
if len(full_names) > 1:
raise GraphNodeAmbiguousError(
f"builtin {builtin_name} matches more than one node ({full_names})"
)
return self.node_definitions[full_names[0]]
[docs]
class GraphError(ramble.error.RambleError):
"""
Exception raised with errors in a graph type
"""
[docs]
class GraphCycleError(GraphError):
"""
Exception raised when a cycle is detected in a graph
"""
[docs]
class GraphNodeAmbiguousError(GraphError):
"""
Exception raised when the given name can be resolved to non-unique nodes
"""
[docs]
class GraphNodeNotFoundError(GraphError):
"""
Exception raised when the given name cannot be resolved to a node
"""