Source code for ramble.success_criteria

# Copyright 2022-2025 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.

import re
import fnmatch

from ramble.util.logger import logger


[docs] class ScopedCriteriaList: """A scoped list of success criteria This object represents a list of success criteria. The criteria are scoped based on which portion of a workspace they are defined in. Possible scopes are: - application_definition - application - workload - experiment To see if success was met, all criteria will be checked and are AND-ed together. """ _valid_scopes = [ "application_definition", "modifier_definition", "workspace", "application", "workload", "experiment", ] _flush_scopes = { "experiment": ["experiment"], "workload": ["workload", "experiment"], "application": ["application", "workload", "experiment"], "workspace": ["workspace", "application", "workload", "experiment"], "modifier_definition": ["modifier_definition"], "application_definition": ["application_definition"], } def __init__(self): self.criteria = {} for scope in self._valid_scopes: self.criteria[scope] = []
[docs] def reset(self): for scope in self._valid_scopes: for criteria in self.criteria[scope]: criteria.reset()
[docs] def validate_scope(self, scope): if scope not in self._valid_scopes: logger.die( f"Success scope {scope} is not valid. " f"Possible scopes are: {self._valid_scopes}" )
[docs] def add_criteria(self, scope, name, mode, *args, **kwargs): self.validate_scope(scope) exists = self.find_criteria(name) if exists: logger.die(f"Success criteria {name} is not unique.") self.criteria[scope].append(SuccessCriteria(name, mode, *args, **kwargs))
[docs] def flush_scope(self, scope): """Remove criteria within a scope, and lower level scopes Scope to be purged are defined in self._flush_scopes. """ self.validate_scope(scope) for scope in self._flush_scopes[scope]: logger.debug(f" Flushing scope: {scope}") logger.debug(" It contained:") for crit in self.criteria[scope]: logger.debug(f" {crit.name}") del self.criteria[scope] self.criteria[scope] = []
[docs] def passed(self): succeed = True for scope in self._valid_scopes: for criteria in self.criteria[scope]: succeed = succeed and criteria.ok() return succeed
[docs] def all_criteria(self): for scope in self._valid_scopes: yield from self.criteria[scope]
[docs] def find_criteria(self, name): for scope in self._valid_scopes: for criteria in self.criteria[scope]: if criteria.name == name: return criteria return None
[docs] class SuccessCriteria: """A single success criteria object This object represents a single criteria for success for a Ramble experiment. """ _valid_modes = ["string", "application_function", "fom_comparison"] _success_function = "evaluate_success" def __init__( self, name, mode, match=None, file="{log_file}", fom_name=None, fom_context="null", formula=None, anti_match=None, ): self.name = name if mode not in self._valid_modes: logger.die(f"{mode} is not valid. Possible modes are: {self._valid_modes}") self.mode = mode self.match = None self.anti_match = None self.file = None self.fom_name = None self.fom_context = None self.fom_formula = None self.found = False self.anti_found = False if mode == "string": if match is None and anti_match is None: logger.die( f'Success criteria with mode="{mode}" ' 'require a "match" or "anti_match" attribute.' ) if anti_match is not None and match is not None: logger.die( f'Success criteria {name} with mode="{mode}" ' 'require exactly one of "anti_match" and "match" to be set.' ) if match is not None: self.match = re.compile(match) else: self.anti_match = re.compile(anti_match) self.file = file elif mode == "fom_comparison": if formula is None or fom_name is None: logger.die( f'Success criteria with mode="{mode}" ' 'require a "fom_name" and "formula" attribute.' ) self.fom_formula = formula self.fom_name = fom_name self.fom_context = fom_context
[docs] def passed(self, test=None, app_inst=None, fom_values=None): logger.debug(f"Testing criteria {self.name} mode = {self.mode}") if self.mode == "string": if self.match is not None: match_obj = self.match.match(test) if match_obj: return True elif self.mode == "application_function": if hasattr(app_inst, self._success_function): func = getattr(app_inst, self._success_function) return func() elif self.mode == "fom_comparison": if fom_values is None: logger.die( f'Success criteria of mode="{self.mode}" requires ' 'defined fom_values attribute in "passed" function.' ) if app_inst is None: logger.die( f'Success criteria of mode="{self.mode}" requires ' 'defined app_inst attribute in "passed" function.' ) comparison_tested = False result = True contexts = fnmatch.filter( fom_values.keys(), app_inst.expander.expand_var(self.fom_context) ) # If fom context doesn't exist, fail the comparison if not contexts: logger.debug( f'When checking success criteria "{self.name}" FOM ' f'context "{self.fom_context}" matches no contexts.' ) return False for context in contexts: fom_names = fnmatch.filter( fom_values[context].keys(), app_inst.expander.expand_var(self.fom_name) ) for fom_name in fom_names: comparison_vars = { "value": fom_values[context][fom_name]["value"], } comparison_tested = True result = app_inst.expander.evaluate_predicate( self.fom_formula, extra_vars=comparison_vars ) # If fom doesn't match any fom names, fail the comparison if not comparison_tested: logger.debug( f'When checking success criteria "{self.name}" FOM ' f'"{self.fom_name}" did not match any FOMs.' ) return False return result return False
[docs] def anti_matched(self, test=None): logger.debug(f"Testing anti-criterion {self.name} mode = {self.mode}") if self.mode == "string": if self.anti_match is not None: anti_match_obj = self.anti_match.match(test) if anti_match_obj: return True return False
[docs] def mark_found(self): logger.debug(f" {self.name} was matched!") self.found = True
[docs] def mark_anti_found(self): logger.debug(f" {self.name} was anti-matched!") self.anti_found = True
[docs] def reset(self): self.found = False self.anti_found = False
[docs] def ok(self): if self.anti_match is not None: return not self.anti_found return self.found