Source code for miranda.structure._structure

from __future__ import annotations

import hashlib
import logging.config
import multiprocessing
import os
import shutil
from functools import partial
from pathlib import Path
from types import GeneratorType

import yaml
from schema import SchemaError

from miranda.cv import VALIDATION_ENABLED
from miranda.decode import Decoder, DecoderError, guess_project
from miranda.io import discover_data
from miranda.scripting import LOGGING_CONFIG

if VALIDATION_ENABLED:
    from miranda.validators import validation_schemas

logging.config.dictConfig(LOGGING_CONFIG)

__all__ = [
    "create_version_hash_files",
    "build_path_from_schema",
    "structure_datasets",
]


def _verify(hash_value: str, hash_file: os.PathLike) -> None:
    try:
        with open(hash_file) as f:
            found_sha256sum = f.read()

        if hash_value != found_sha256sum:
            raise ValueError()
    except ValueError:
        logging.error(
            f"Found sha256sum (starting: {found_sha256sum[:6]}) "
            f"does not match current value (starting: {hash_value[:6]}) "
            f"for file `{Path(hash_file).name}."
        )


def generate_hash_file(
    in_file: os.PathLike, out_file: os.PathLike, verify: bool = False
) -> None:
    if not Path(out_file).exists():
        hash_sha256_writer = hashlib.sha256()
        with open(in_file, "rb") as f:
            hash_sha256_writer.update(f.read())
        sha256sum = hash_sha256_writer.hexdigest()

        print(
            f"Writing sha256sum (starting: {sha256sum[:6]}) to file: {Path(out_file).name}"
        )
        try:
            with open(out_file, "w") as f:
                f.write(sha256sum)
        except PermissionError:
            logging.error("Unable to write file. Ensure access privileges.")

        del hash_sha256_writer
        del sha256sum
    elif verify:
        hash_sha256_writer = hashlib.sha256()
        with open(in_file, "rb") as f:
            hash_sha256_writer.update(f.read())
        calculated_sha256sum = hash_sha256_writer.hexdigest()

        _verify(calculated_sha256sum, out_file)

    else:
        print(f"Writing sha256sum file `{Path(out_file).name}` exists. Continuing...")


def generate_hash_metadata(
    in_file: os.PathLike,
    version: str | None = None,
    hash_file: os.PathLike | None = None,
    verify: bool = False,
) -> dict[str, list[str]]:
    hashversion = dict()

    if version is None:
        version = "vNotFound"

    if not Path(hash_file).exists():
        hash_sha256_writer = hashlib.sha256()
        with open(in_file, "rb") as f:
            hash_sha256_writer.update(f.read())
        sha256sum = hash_sha256_writer.hexdigest()

        print(f"Calculated sha256sum (starting: {sha256sum[:6]})")

        hashversion[Path(in_file).name] = [version, sha256sum]
        del hash_sha256_writer

    else:
        hash_sha256_writer = hashlib.sha256()
        with open(in_file, "rb") as f:
            hash_sha256_writer.update(f.read())
        calculated_sha256sum = hash_sha256_writer.hexdigest()

        if verify:
            _verify(calculated_sha256sum, hash_file)

        hashversion[Path(in_file).name] = [version, calculated_sha256sum]

    return hashversion


[docs] def create_version_hash_files( input_files: str | os.PathLike | list[str | os.PathLike] | GeneratorType | None = None, facet_dict: dict | None = None, verify_hash: bool = False, ) -> None: """Create version hashes based on files or a facets dictionary. Parameters ---------- input_files : str, os.PathLike, list of str or os.PathLike, or GeneratorType facet_dict : dict, optional verify_hash : bool Returns ------- None """ if not facet_dict and not input_files: raise ValueError("Facets dictionary or sequence of filepaths required.") if input_files: if isinstance(input_files, os.PathLike): input_files = [input_files] for f in input_files: project = guess_project(f) decoder = Decoder(project) decoder.decode(f) break else: raise FileNotFoundError() decoder.decode(input_files) facet_dict = decoder.file_facets() version_hash_paths = dict() for file, facets in facet_dict.items(): version_hash_file = f"{Path(file).stem}.{facets['version']}" version_hash_paths.update( {Path(file): Path(file).parent.joinpath(version_hash_file)} ) hash_func = partial(generate_hash_file, verify=verify_hash) with multiprocessing.Pool() as pool: pool.starmap( hash_func, zip(version_hash_paths.keys(), version_hash_paths.values()), ) pool.close() pool.join()
def _structure_datasets( in_file: str | os.PathLike, out_path: str | os.PathLike, method: str, dry_run: bool = False, ): if isinstance(in_file, str): in_file = Path(in_file) if method.lower() in ["move", "copy"]: meth = "Moved" if method.lower() == "move" else "Copied" output_file = Path(out_path).joinpath(in_file.name) try: if not dry_run: method_mod = "" if in_file.is_dir() and method.lower() == "copy": method_mod = "tree" getattr(shutil, f"{method}{method_mod}")(in_file, output_file) print(f"{meth} {in_file.name} to {output_file}.") except FileExistsError: print(f"{in_file.name} already exists at location. Continuing...") def parse_schema( facets: dict, schema: str | os.PathLike | dict, top_folder: str = "datasets" ) -> list: """Parse the schema from a YAML schema configuration and construct path using a dictionary of facets. Parameters ---------- facets : dict schema : str or os.PathLike or dict top_folder : str Returns ------- list """ def _parse_top_level(schematic: dict, facet_dict: dict, top: str): try: parent = schematic[top] except KeyError: logging.error("Schema is not a valid facet-tree reference.") raise for i, options in enumerate(parent): if {"option", "structure", "value"}.issubset(options.keys()): option = options["option"] value = options["value"] if option in facet_dict.keys(): if facet_dict[option] == value: return {"branch": value, "structure": options["structure"]} continue raise ValueError("Couldn\nt parse top level.") def _parse_structure(branch_dict: dict, facet_dict: dict) -> list: structure = branch_dict.get("structure") folder_tree = list() for level in structure: if isinstance(level, str): folder_tree.append(level) continue elif isinstance(level, dict): if {"option", "is_true"}.issubset(level.keys()): option = level["option"] if option not in facet_dict and "value" in level: raise ValueError( f"Necessary facet not found for schema: `{option}`." ) is_true = level.get("is_true") else_value = level.get("else") facet = facet_dict.get(option) if "value" not in level: # The absence of "value" means that "is_true / else" refer to the presence or not of "option" in the facets # We also treat falsy values (empty string, None) as the absence of "option" from the facets if not bool(facet) and else_value: folder_tree.append(else_value) elif bool(facet): folder_tree.append(is_true) else: # "option" absent from the facets and no "else": skip. pass else: value = level["value"] if facet_dict[option] == value: folder_tree.append(is_true) elif else_value: folder_tree.append(else_value) else: # "option" not equal to "value", but no "else" : skip. pass else: raise ValueError("Supplied schema is invalid.") return folder_tree if isinstance(schema, (str, os.PathLike)): with Path(schema).open() as f: schema = yaml.safe_load(f.read()) branch = _parse_top_level(schema, facets, top_folder) tree = list() # noqa tree.extend(_parse_structure(branch, facets)) return tree
[docs] def build_path_from_schema( facets: dict, output_folder: str | os.PathLike, schema: str | os.PathLike | dict | None = None, top_folder: str = "datasets", validate: bool = True, ) -> Path | None: """Build a filepath based on a valid data schema. Parameters ---------- facets : dict Facets for a given dataset. output_folder : str or os.PathLike Parent folder on which to extend the filetree structure. schema : str or os.PathLike, optional Path to YAML schematic of database structure. If None, will use Ouranos schema. top_folder : str Top-level of supplied schema, used for validation purposes. Default: "datasets". validate: bool Run facets-validation checks over given file. Default: True. Returns ------- Path or None """ if schema is None: schema = Path(__file__).parent.joinpath("data").joinpath("ouranos_schema.yml") tree = parse_schema(facets, schema, top_folder) branch = tree[0] if validate and VALIDATION_ENABLED: if facets[branch] in validation_schemas.keys(): try: validation_schemas[facets[branch]].validate(facets) except SchemaError as e: logging.error( f"Validation issues found for file matching schema: {facets}: {e}" ) return elif facets[branch] not in validation_schemas.keys(): logging.error( f"No appropriate data schemas found for file matching schema: {facets}", DecoderError, ) return elif validate and not VALIDATION_ENABLED: logging.warning( "Facets validation requires pyessv-archive source files. Skipping validation checks." ) file_location = list() for facet in tree: # Remove spaces in folder paths file_location.append(str(facets[facet]).replace(" ", "-")) return Path(output_folder).joinpath("/".join(file_location))
[docs] def structure_datasets( input_files: str | os.PathLike | list[str | os.PathLike] | GeneratorType, output_folder: str | os.PathLike, *, project: str | None = None, guess: bool = True, dry_run: bool = False, method: str = "copy", make_dirs: bool = False, set_version_hashes: bool = False, verify_hashes: bool = False, suffix: str = "nc", ) -> dict[Path, Path]: """ Parameters ---------- input_files : str, Path, list of str or Path, or GeneratorType Files to be sorted. output_folder : str or Path The desired location for the folder-tree. project : {"cordex", "cmip5", "cmip6", "isimip-ft", "pcic-candcs-u6", "converted"}, optional Project used to parse the facets of all supplied datasets. If not supplied, will attempt parsing with all available data categories for each file (slow) unless `guess` is True. guess : bool If project not supplied, suggest to decoder that activity is the same for all input_files. Default: True. dry_run : bool Prints changes that would have been made without performing them. Default: False. method : {"move", "copy"} Method to transfer files to intended location. Default: "move". make_dirs : bool Make folder tree if it does not already exist. Default: False. set_version_hashes : bool Make an accompanying file with version in filename and sha256sum in contents. Default: False. verify_hashes : bool Ensure that any existing she256sum files correspond with companion file. Raise on error. Default: False. suffix : {"nc", "zarr"} If "zarr", will perform a 'glob' with provided pattern. Otherwise, will perform an 'rglob' (recursive) operation. Returns ------- dict[Path, Path] """ input_files = discover_data(input_files, suffix) if guess and project is None: # Examine the first file from a list or generator for f in input_files: project = guess_project(f) decoder = Decoder(project) decoder.decode(f) break else: raise FileNotFoundError() decoder.decode(input_files) else: decoder = Decoder(project) decoder.decode(input_files) all_file_paths = dict() existing_hashes = dict() version_hash_paths = dict() errored_files = list() for file, facets in decoder.file_facets().items(): output_filepath = build_path_from_schema(facets, output_folder) if isinstance(output_filepath, Path): all_file_paths.update({Path(file): output_filepath}) else: errored_files.append(Path(file).name) continue if set_version_hashes: version_hash_file = f"{Path(file).stem}.{facets['version']}" if Path(file).parent.joinpath(version_hash_file).exists(): existing_hashes.update( { Path(file).parent.joinpath( version_hash_file ): output_filepath.joinpath(version_hash_file) } ) else: version_hash_paths.update( {Path(file): output_filepath.joinpath(version_hash_file)} ) if errored_files: if len(errored_files) < 10: logging.warning( f"Some files were unable to be structured: [{', '.join(errored_files)}]" ) else: logging.warning( f"Many files were unable to be structured (n={len(errored_files)})" ) if make_dirs: for new_paths in set(all_file_paths.values()): Path(new_paths).mkdir(exist_ok=True, parents=True) if set_version_hashes: hash_func = partial(generate_hash_file, verify=verify_hashes) with multiprocessing.Pool() as pool: if existing_hashes: print( f"Sha256sum signatures exist for {len(existing_hashes)} files. " f"Transferring them via `{method}` method." ) pool.starmap( getattr(shutil, method), zip(existing_hashes.keys(), existing_hashes.values()), ) pool.starmap( hash_func, zip(version_hash_paths.keys(), version_hash_paths.values()), ) pool.close() pool.join() # multiprocessing copy structure_func = partial(_structure_datasets, method=method, dry_run=dry_run) with multiprocessing.Pool() as pool: pool.starmap( structure_func, zip(all_file_paths.keys(), all_file_paths.values()) ) pool.close() pool.join() return all_file_paths