Source code for miranda.validate.schema

"""Validate outputted metadata against CF-like schemas."""

from __future__ import annotations
import json
import logging
from pathlib import Path

from schema import And, Optional, Or, Regex, Schema, SchemaError

from ._dimensions import cf_dimensions_schema
from ._regex import (
    CF_CONVENTIONS_REGEX,
    PROJECT_NAME_REGEX,
)
from ._variables import cf_variables_schema


__all__ = [
    "cf_dimensions_schema",
    "cf_header_schema",
    "cf_variables_schema",
    "converter_schema",
    "validate_json",
]

LICENSES_TYPES = ["open", "permissive", "proprietary", "restricted"]


def _source_in_header(header_dict: dict):
    """
    Check for source in the header

    Parameters
    ----------
    header_dict : dict
        The Schema dictionary for the header.

    Returns
    -------
    Schema
        The validated header Schema dictionary.

    Raises
    ------
    ValueError
        If the header dict has neither "source" nor {"_source": project_name}
        If the header dict contains both "source" and {"_source": project_name}
    """
    if not header_dict or not isinstance(header_dict, dict):
        raise ValueError("'time' must be present and be a dictionary.")

    # Must contain either 'source' or '_source', but not both
    has_source = "source" in header_dict
    has_dynamic_source = "_source" in header_dict

    if has_source and has_dynamic_source:  # both true or both false
        raise ValueError("Time dimension may contain either 'units' or '_units', but not both")

    if has_source:
        if not isinstance(header_dict["source"], str):
            raise ValueError("'source' must be a string")

    if has_dynamic_source:
        dynamic_source = header_dict["_source"]
        if not isinstance(dynamic_source, dict) or not any(isinstance(k, str) and isinstance(v, str) for k, v in dynamic_source.items()):
            raise ValueError("'_source' must be a dict of {str: str}")

    return header_dict


def _institution_in_header(header_dict: dict):
    """
    Check for institution metadata in header.

    Parameters
    ----------
    header_dict : dict
        The Schema dictionary for the header.

    Returns
    -------
    Schema
        The validated header Schema dictionary.

    Raises
    ------
    ValueError
        If the Header has neither "institution" nor {"_map_attrs": {str : "institution"}
        If the Header contains both "institution" and {"_map_attrs": {str : "institution"}
    """
    # Must contain either 'institution' or '_map_attrs', but not both
    has_institution = "institution" in header_dict
    has_map_attrs = "_map_attrs" in header_dict

    if has_institution == has_map_attrs:  # both true or both false
        raise ValueError("Header must contain either 'institution' or '_map_attrs', but not both")

    if has_institution:
        if not isinstance(header_dict["institution"], str):
            raise ValueError("'institution' must be a string")

    if has_map_attrs:
        map_attrs = header_dict["_map_attrs"]
        if not isinstance(map_attrs, dict) or not any(isinstance(k, str) and v == "institution" for k, v in map_attrs.items()):
            raise ValueError("'_map_attrs' must be a dict of {str: 'institution'}")

    return header_dict


cf_header_schema = Schema(
    And(
        Schema(
            {
                "Conventions": Regex(CF_CONVENTIONS_REGEX),
                "type": str,  # FIXME: Should this be constrained to specific values? e.g., "simulation", "observation", etc.
                "processing_level": Or("raw", "biasadjusted"),
                Optional(Regex(r"^license$|^licence$")): str,
                Regex(r"^license_type$|^licence_type$"): Or(
                    *LICENSES_TYPES,
                    Schema({Regex(PROJECT_NAME_REGEX): Or(*LICENSES_TYPES)}),
                ),
                "table_id": str,
                Optional(Regex(PROJECT_NAME_REGEX)): str,
                Optional("_frequency"): bool,
                Optional(Regex(r"^_license$|^_licence$")): {str: Or(str, Schema({str: str}))},
                Optional("_miranda_version"): bool,
                Optional("_remove_attrs"): Or(
                    Schema(Regex(PROJECT_NAME_REGEX)),
                    Schema({Regex(PROJECT_NAME_REGEX): Or(str, Schema([str]))}),
                ),
                Optional(Regex(r"^_")): {str: Or(str, bool, Schema({str: str}))},
            }
        ),
        _institution_in_header,
        _source_in_header,
    ),
    name="header_schema",
)


# Converter Schema
converter_schema = Schema(
    {
        "Header": cf_header_schema,
        "variables": cf_variables_schema,
        "dimensions": cf_dimensions_schema,
    },
    ignore_extra_keys=False,  # Extra entries will raise a ValidationError
    name="convert_schema",
)


# This function accepts a path to a JSON file, loads the JSON data, and validates it using schema
[docs] def validate_json(json_file: str | Path, schema: Schema | None = None) -> bool: """ Validate a JSON file against a schema. Parameters ---------- json_file : str or pathlib.Path The path to the JSON file. schema : Schema, optional The schema to validate against. If None, will choose a definition based on filename parameters. Returns ------- bool True if the JSON file is valid, False otherwise. Raises ------ ValueError If the JSON file does not exist, or if the schema is not CF-compliant. OSError If there is an error reading the JSON file. json.JSONDecodeError If the JSON file is not valid JSON. SchemaError If the JSON data does not conform to the schema. """ if not Path(json_file).is_file(): msg = f"{json_file} is not a file." raise ValueError(msg) if schema is None: if "_cf_" in Path(json_file).name: schema = converter_schema else: raise ValueError("Schema is not CF-compliant. No validation is possible.") elif not isinstance(schema, Schema): raise ValueError("'schema' must be a Schema instance.") try: with Path(json_file).open() as f: data = json.load(f) schema.validate(data) return True except (OSError, json.JSONDecodeError) as e: msg = f"Error validating JSON file {json_file}: {e}" logging.error(msg) raise except SchemaError as e: msg = f"Schema validation error in {json_file}: {e}" logging.error(msg) raise