Source code for ramble.mirror

# Copyright 2022-2025 The Ramble Authors
#
# Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
# https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
# <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
# option. This file may not be copied, modified, or distributed
# except according to those terms.


"""
This file contains code for creating ramble mirror directories.  A
mirror is an organized hierarchy containing specially named archive
files.  This enabled ramble to know where to find files in a mirror if
the main server for a particular input is down.  Or, if the computer
where ramble is run is not connected to the internet, it allows ramble
to download inputs directly from a mirror (e.g., on an intranet).
"""
import collections
import operator
import os
import os.path

import ruamel.yaml.error as yaml_error

from llnl.util.compat import Mapping

import ramble.config
import ramble.error
import ramble.fetch_strategy as fs
from ramble.util.logger import logger

import spack.url
import spack.util.spack_json
import spack.util.spack_yaml
import spack.util.url as url_util
from spack.util.spack_yaml import syaml_dict


def _is_string(url):
    return isinstance(url, str)


def _display_mirror_entry(size, name, url, type_=None):
    if type_:
        type_ = "".join((" (", type_, ")"))
    else:
        type_ = ""

    print("%-*s%s%s" % (size + 4, name, url, type_))


[docs] class Mirror: """Represents a named location for storing input tarballs. Mirrors have a fetch_url that indicate where and how artifacts are fetched from them, and a push_url that indicate where and how artifacts are pushed to them. These two URLs are usually the same. """ def __init__(self, fetch_url, push_url=None, name=None): self._fetch_url = fetch_url self._push_url = push_url self._name = name def __eq__(self, other): return self._fetch_url == other._fetch_url and self._push_url == other._push_url
[docs] def to_json(self, stream=None): return spack.util.spack_json.dump(self.to_dict(), stream)
[docs] def to_yaml(self, stream=None): return spack.util.spack_yaml.dump(self.to_dict(), stream)
[docs] @staticmethod def from_yaml(stream, name=None): try: data = spack.util.spack_yaml.load(stream) return Mirror.from_dict(data, name) except yaml_error.MarkedYAMLError as e: raise spack.util.spack_yaml.SpackYAMLError("error parsing YAML mirror:", str(e)) from e
[docs] @staticmethod def from_json(stream, name=None): try: d = spack.util.spack_json.load(stream) return Mirror.from_dict(d, name) except Exception as e: raise spack.util.spack_json.SpackJSONError("error parsing JSON mirror:", str(e)) from e
[docs] def to_dict(self): return syaml_dict( [("fetch", self._fetch_url), ("push", self._push_url or self._fetch_url)] )
[docs] @staticmethod def from_dict(d, name=None): if isinstance(d, str): return Mirror(d, name=name) else: return Mirror(d["fetch"], d["push"], name=name)
[docs] def display(self, max_len=0): if self._push_url is None: _display_mirror_entry(max_len, self._name, self.fetch_url) else: _display_mirror_entry(max_len, self._name, self.fetch_url, "fetch") _display_mirror_entry(max_len, self._name, self.push_url, "push")
def __str__(self): name = self._name if name is None: name = "" else: name = ' "%s"' % name if self._push_url is None: return f"[Mirror{name} ({self._fetch_url})]" return f"[Mirror{name} (fetch: {self._fetch_url}, push: {self._push_url})]" def __repr__(self): return "".join( ( "Mirror(", ", ".join( f"{k}={repr(v)}" for k, v in ( ("fetch_url", self._fetch_url), ("push_url", self._push_url), ("name", self._name), ) if k == "fetch_url" or v ), ")", ) ) @property def name(self): return self._name or "<unnamed>"
[docs] def get_profile(self, url_type): if isinstance(self._fetch_url, dict): if url_type == "push": return self._push_url.get("profile", None) return self._fetch_url.get("profile", None) else: return None
[docs] def set_profile(self, url_type, profile): if url_type == "push": self._push_url["profile"] = profile else: self._fetch_url["profile"] = profile
[docs] def get_access_pair(self, url_type): if isinstance(self._fetch_url, dict): if url_type == "push": return self._push_url.get("access_pair", None) return self._fetch_url.get("access_pair", None) else: return None
[docs] def set_access_pair(self, url_type, connection_tuple): if url_type == "push": self._push_url["access_pair"] = connection_tuple else: self._fetch_url["access_pair"] = connection_tuple
[docs] def get_endpoint_url(self, url_type): if isinstance(self._fetch_url, dict): if url_type == "push": return self._push_url.get("endpoint_url", None) return self._fetch_url.get("endpoint_url", None) else: return None
[docs] def set_endpoint_url(self, url_type, url): if url_type == "push": self._push_url["endpoint_url"] = url else: self._fetch_url["endpoint_url"] = url
[docs] def get_access_token(self, url_type): if isinstance(self._fetch_url, dict): if url_type == "push": return self._push_url.get("access_token", None) return self._fetch_url.get("access_token", None) else: return None
[docs] def set_access_token(self, url_type, connection_token): if url_type == "push": self._push_url["access_token"] = connection_token else: self._fetch_url["access_token"] = connection_token
@property def fetch_url(self): return self._fetch_url if _is_string(self._fetch_url) else self._fetch_url["url"] @fetch_url.setter def fetch_url(self, url): self._fetch_url["url"] = url self._normalize() @property def push_url(self): if self._push_url is None: return self._fetch_url if _is_string(self._fetch_url) else self._fetch_url["url"] return self._push_url if _is_string(self._push_url) else self._push_url["url"] @push_url.setter def push_url(self, url): self._push_url["url"] = url self._normalize() def _normalize(self): if self._push_url is not None and self._push_url == self._fetch_url: self._push_url = None
[docs] class MirrorCollection(Mapping): """A mapping of mirror names to mirrors.""" def __init__(self, mirrors=None, scope=None): self._mirrors = collections.OrderedDict( (name, Mirror.from_dict(mirror, name)) for name, mirror in ( mirrors.items() if mirrors is not None else ramble.config.get("mirrors", scope=scope).items() ) ) def __eq__(self, other): return self._mirrors == other._mirrors
[docs] def to_json(self, stream=None): return spack.util.spack_json.dump(self.to_dict(True), stream)
[docs] def to_yaml(self, stream=None): return spack.util.spack_yaml.dump(self.to_dict(True), stream)
[docs] def to_dict(self, recursive=False): return syaml_dict( sorted( ((k, (v.to_dict() if recursive else v)) for (k, v) in self._mirrors.items()), key=operator.itemgetter(0), ) )
[docs] @staticmethod def from_dict(d): return MirrorCollection(d)
def __getitem__(self, item): return self._mirrors[item]
[docs] def display(self): max_len = max(len(mirror.name) for mirror in self._mirrors.values()) for mirror in self._mirrors.values(): mirror.display(max_len)
[docs] def lookup(self, name_or_url): """Looks up and returns a Mirror. If this MirrorCollection contains a named Mirror under the name [name_or_url], then that mirror is returned. Otherwise, [name_or_url] is assumed to be a mirror URL, and an anonymous mirror with the given URL is returned. """ result = self.get(name_or_url) if result is None: result = Mirror(fetch_url=name_or_url) return result
def __iter__(self): return iter(self._mirrors) def __len__(self): return len(self._mirrors)
def _determine_extension(fetcher): if isinstance(fetcher, fs.URLFetchStrategy): if fetcher.expand_archive: # If we fetch with a URLFetchStrategy, use URL's archive type ext = spack.url.determine_url_file_extension(fetcher.url) if ext: # Remove any leading dots ext = ext.lstrip(".") else: # TODO: Clean up this message... # TODO: Add extension to input files... msg = """\ Unable to parse extension from {0}. If this URL is for a tarball but does not include the file extension in the name, you can explicitly declare it with the following syntax: input_file('1.2.3', 'hash', extension='tar.gz') If this URL is for a download like a .jar or .whl that does not need to be expanded, or an uncompressed installation script, you can tell Ramble not to expand it with the following syntax: input_file('1.2.3', 'hash', expand=False) """ raise MirrorError(msg.format(fetcher.url)) else: # If the archive shouldn't be expanded, don't check extension. ext = None else: # Otherwise we'll make a .tar.gz ourselves ext = "tar.gz" return ext
[docs] class MirrorReference: """A ``MirrorReference`` stores the relative paths where you can store a resource in a mirror directory. The appropriate storage location is given by ``storage_path``. The ``cosmetic_path`` property provides a reference that a human could generate themselves based on reading the details of the input. A user can iterate over a ``MirrorReference`` object to get all the possible names that might be used to refer to the resource in a mirror; this includes names generated by previous naming schemes that are no-longer reported by ``storage_path`` or ``cosmetic_path``. """ def __init__(self, cosmetic_path, global_path=None): self.global_path = global_path self.cosmetic_path = cosmetic_path @property def storage_path(self): if self.global_path: return self.global_path else: return self.cosmetic_path def __iter__(self): if self.global_path: yield self.global_path yield self.cosmetic_path
[docs] def mirror_archive_paths(fetcher, per_input_ref): """Returns a ``MirrorReference`` object which keeps track of the relative storage path of the resource associated with the specified ``fetcher``.""" ext = None or _determine_extension(fetcher) if ext: per_input_ref += ".%s" % ext global_ref = fetcher.mirror_id() if global_ref: global_ref = os.path.join("_input-cache", global_ref) if global_ref and ext: global_ref += ".%s" % ext return MirrorReference(per_input_ref, global_ref)
[docs] def add(name, url, scope): """Add a named mirror in the given scope""" mirrors = ramble.config.get("mirrors", scope=scope) if not mirrors: mirrors = syaml_dict() if name in mirrors: logger.die(f"Mirror with name {name} already exists.") items = [(n, u) for n, u in mirrors.items()] mirror_data = url items.insert(0, (name, mirror_data)) mirrors = syaml_dict(items) ramble.config.set("mirrors", mirrors, scope=scope)
[docs] def remove(name, scope): """Remove the named mirror in the given scope""" mirrors = ramble.config.get("mirrors", scope=scope) if not mirrors: mirrors = syaml_dict() if name not in mirrors: logger.die(f"No mirror with name {name}") old_value = mirrors.pop(name) ramble.config.set("mirrors", mirrors, scope=scope) debug_msg_url = "url %s" debug_msg = ["Removed mirror %s with"] values = [name] try: fetch_value = old_value["fetch"] push_value = old_value["push"] debug_msg.extend(("fetch", debug_msg_url, "and push", debug_msg_url)) values.extend((fetch_value, push_value)) except TypeError: debug_msg.append(debug_msg_url) values.append(old_value) logger.debug(" ".join(debug_msg) % tuple(values)) logger.msg(f"Removed mirror {name}.")
[docs] class MirrorStats: def __init__(self): self.present = {} self.new = {} self.errors = {} self.current_spec = None self.added_resources = set() self.existing_resources = set()
[docs] def next_spec(self, spec): self._tally_current_spec() self.current_spec = spec
def _tally_current_spec(self): if self.current_spec: if self.added_resources: self.new[self.current_spec] = len(self.added_resources) if self.existing_resources: self.present[self.current_spec] = len(self.existing_resources) self.added_resources = set() self.existing_resources = set() self.current_spec = None
[docs] def stats(self): self._tally_current_spec() return list(self.present), list(self.new), list(self.errors)
[docs] def already_existed(self, resource): self.present[resource] = True
[docs] def added(self, resource): self.new[resource] = True
[docs] def error(self, resource): self.errors.add(self.current_spec)
[docs] def push_url_from_directory(output_directory): """Given a directory in the local filesystem, return the URL on which to push resources. """ scheme = url_util.parse(output_directory, scheme="<missing>").scheme if scheme != "<missing>": raise ValueError("expected a local path, but got a URL instead") mirror_url = "file://" + output_directory mirror = ramble.mirror.MirrorCollection().lookup(mirror_url) return url_util.format(mirror.push_url)
[docs] def push_url_from_mirror_name(mirror_name): """Given a mirror name, return the URL on which to push resources.""" mirror = ramble.mirror.MirrorCollection().lookup(mirror_name) if mirror.name == "<unnamed>": raise ValueError(f'no mirror named "{mirror_name}"') return url_util.format(mirror.push_url)
[docs] def push_url_from_mirror_url(mirror_url): """Given a mirror URL, return the URL on which to push resources.""" scheme = url_util.parse(mirror_url, scheme="<missing>").scheme if scheme == "<missing>": raise ValueError(f'"{mirror_url}" is not a valid URL') mirror = ramble.mirror.MirrorCollection().lookup(mirror_url) return url_util.format(mirror.push_url)
[docs] class MirrorError(ramble.error.RambleError): """Superclass of all mirror-creation related errors.""" def __init__(self, msg, long_msg=None): super().__init__(msg, long_msg)