Source code for ramble.spec

# Copyright 2022-2025 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

import io

import llnl.util.tty.color as clr

import ramble.error
from ramble.util.logger import logger

import spack.parse

#: These are possible token types in the spec grammar.
HASH, DEP, AT, COLON, COMMA, ON, OFF, PCT, EQ, ID, VAL, FILE = range(12)

#: Regex for fully qualified spec names. (e.g., builtin.hdf5)
spec_id_re = r"\w[\w.-]*"

color_formats = {}

default_format = "{name}"


[docs] class SpecLexer(spack.parse.Lexer): """Parses tokens that make up spack specs.""" def __init__(self): super().__init__( [ (r"\^", lambda scanner, val: self.token(DEP, val)), (r"\@", lambda scanner, val: self.token(AT, val)), (r"\:", lambda scanner, val: self.token(COLON, val)), (r"\,", lambda scanner, val: self.token(COMMA, val)), (r"\+", lambda scanner, val: self.token(ON, val)), (r"\-", lambda scanner, val: self.token(OFF, val)), (r"\~", lambda scanner, val: self.token(OFF, val)), (r"\%", lambda scanner, val: self.token(PCT, val)), (r"\=", lambda scanner, val: self.token(EQ, val)), # Filenames match before identifiers, so no initial filename # component is parsed as a spec (e.g., in subdir/spec.yaml) (r"[/\w.-]*/[/\w/-]+\.yaml[^\b]*", lambda scanner, v: self.token(FILE, v)), # Hash match after filename. No valid filename can be a hash # (files end w/.yaml), but a hash can match a filename prefix. (r"/", lambda scanner, val: self.token(HASH, val)), # Identifiers match after filenames and hashes. (spec_id_re, lambda scanner, val: self.token(ID, val)), (r"\s+", lambda scanner, val: None), ], [EQ], [ (r"[\S].*", lambda scanner, val: self.token(VAL, val)), (r"\s+", lambda scanner, val: None), ], [VAL], )
# Lexer is always the same for every parser _lexer = SpecLexer()
[docs] class SpecParser(spack.parse.Parser): def __init__(self, initial_spec=None): """Construct a new SpecParser. Args: initial_spec (Spec, optional): provide a Spec that we'll parse directly into. This is used to avoid construction of a superfluous Spec object in the Spec constructor. """ logger.debug(f"Starting parser with spec {initial_spec}") super().__init__(_lexer) self.previous = None self._initial = initial_spec
[docs] def do_parse(self): app_spec = None try: if self.accept(ID): app_spec = Spec(self.token.value) while self.next: if self.accept(ID): name = self.workload() app_spec.workloads[name] = True else: break else: self.next_token_error("Invalid token found") except spack.parse.ParseError as e: raise SpecParseError(e) return [app_spec]
[docs] def workload(self, name=None): """Return the name of the workload""" if name: return name else: self.expect(ID) self.check_identifier() return self.token.value
[docs] def check_identifier(self, id=None): """The only identifiers that can contain '.' are versions, but version ids are context-sensitive so we have to check on a case-by-case basis. Call this if we detect a version id where it shouldn't be. """ if not id: id = self.token.value if "." in id: self.last_token_error(f"{id}: Identifier cannot contain '.'")
[docs] class Spec: def __init__(self, spec_like=None): """Create a new Spec. Arguments: spec_like (optional string): If not provided we initialize an anonymous Spec that matches any Spec object; if provided we parse this as a Spec string. """ # Copy if spec_like is a Spec. if isinstance(spec_like, Spec): self._dup(spec_like) return # init an empty spec that matches anything. self.name = None self.namespace = None self.workloads = {} if isinstance(spec_like, str): namespace, _, spec_name = spec_like.rpartition(".") if not namespace: namespace = None self.name = spec_name self.namespace = namespace
[docs] def copy(self): new_spec = Spec() new_spec._dup(self) return new_spec
def _dup(self, other): self.name = other.name self.namespace = other.namespace
[docs] def format(self, format_string=default_format, **kwargs): r"""Prints out particular pieces of a spec, depending on what is in the format string. Using the ``{attribute}`` syntax, any field of the spec can be selected. Those attributes can be recursive. Commonly used attributes of the Spec for format strings include:: name workloads Args: format_string (str): string containing the format to be expanded Keyword Args: color (bool): True if returned string is colored transform (dict): maps full-string formats to a callable \ that accepts a string and returns another one """ color = kwargs.get("color", False) transform = kwargs.get("transform", {}) out = io.StringIO() def write(s, c=None): f = clr.cescape(s) if c is not None: f = color_formats[c] + f + "@." clr.cwrite(f, stream=out, color=color) def write_attribute(spec, attribute, color): current = spec if attribute == "": raise SpecFormatStringError("Format string attributes must be non-empty") attribute = attribute.lower() parts = attribute.split(".") assert parts # find the morph function for our attribute morph = transform.get(attribute, lambda s, x: x) # Iterate over components using getattr to get next element for idx, part in enumerate(parts): if not part: raise SpecFormatStringError("Format string attributes must be non-empty") if part.startswith("_"): raise SpecFormatStringError("Attempted to format private attribute") else: try: current = getattr(current, part) except AttributeError: parent = ".".join(parts[:idx]) m = "Attempted to format attribute %s." % attribute m += f"Spec {parent} has no attribute {part}" raise SpecFormatStringError(m) if callable(current): raise SpecFormatStringError("Attempted to format callable object") if not current: # We're not printing anything return # Finally, write the output col = None write(morph(spec, str(current)), col) attribute = "" in_attribute = False escape = False for c in format_string: if escape: out.write(c) escape = False elif c == "\\": escape = True elif in_attribute: if c == "}": write_attribute(self, attribute, color) attribute = "" in_attribute = False else: attribute += c else: if c == "}": raise SpecFormatStringError("Encountered closing } before opening {") elif c == "{": in_attribute = True else: out.write(c) if in_attribute: raise SpecFormatStringError( "Format string terminated while reading attribute." "Missing terminating }." ) formatted_spec = out.getvalue() return formatted_spec.strip()
[docs] def cformat(self, *args, **kwargs): """Same as format, but color defaults to auto instead of False.""" kwargs = kwargs.copy() kwargs.setdefault("color", None) return self.format(*args, **kwargs)
def __str__(self): return self.name @property def fullname(self): return ( (f"{self.namespace}.{self.name}") if self.namespace else (self.name if self.name else "") )
[docs] def parse(string): """Returns a spec from an input string.""" return SpecParser().parse(string)
[docs] class SpecParseError(ramble.error.SpecError): """Wrapper for ParseError for when we're parsing specs.""" def __init__(self, parse_error): super().__init__(parse_error.message) self.string = parse_error.string
[docs] class SpecFormatStringError(spack.error.SpecError): """Called for errors in Spec format strings."""