Source code for miranda.preprocess._metadata

from __future__ import annotations
import logging
from typing import Any

from miranda import __version__ as __miranda_version__
from miranda.treatments.utils import load_json_data_mappings


__all__ = [
    "eccc_variable_metadata",
    "homogenized_column_definitions",
    "obs_column_definitions",
]


[docs] def eccc_variable_metadata( variable_code: str | int, project: str, generation: int | None = None, metadata: dict | None = None, ) -> dict[str, Any]: """ Return the metadata for a given variable code and project. Parameters ---------- variable_code: str or int project: {"eccc-ahccd", "eccc-obs", "eccc-obs-summary"} generation: {1, 2, 3}, optional metadata: dict, optional Returns ------- dict """ if project == "eccc-ahccd": generation = {1: "First", 2: "Second", 3: "Third"}.get(generation) if not generation: raise NotImplementedError(f"Generation '{generation}' not supported") else: generation = None if not metadata: metadata = load_json_data_mappings(project) if isinstance(variable_code, int): variable_code = str(variable_code).zfill(3) # code = find_project_variable_codes(variable_code, metadata) # Variable metadata variable_meta = metadata["variables"].get(variable_code) if variable_meta is None: raise ValueError(f"No metadata found for variable code: {variable_code}") variable_name = "" variable_name_fields = ["_variable_name", "_cf_variable_name"] if set(variable_name_fields).issubset(variable_meta.keys()): for variable_field in variable_name_fields: variable_name = variable_meta.get(variable_field) if variable_name: variable_meta["original_variable_code"] = variable_code del variable_meta[variable_field] variable_meta = {variable_name: variable_meta} else: variable_meta = {variable_code: variable_meta} if not variable_name: variable_name = variable_code # Dataset metadata header = metadata.get("Header") # Static handling of version global attributes miranda_version = header.get("_miranda_version") if miranda_version: if isinstance(miranda_version, bool): header["miranda_version"] = __miranda_version__ elif isinstance(miranda_version, dict): if project in miranda_version.keys(): header["miranda_version"] = __miranda_version__ else: msg = f"`_miranda_version` not properly configured for project `{project}`. Not appending." logging.warning(msg) if "_miranda_version" in header: del header["_miranda_version"] to_delete = [] # Conditional handling of global attributes based on fields for field in [f for f in header if f.startswith("_")]: if isinstance(header[field], bool): if header[field] and field == "_variable": header[field[1:]] = variable_name elif isinstance(header[field], dict) and generation: attr_treatment = header[field]["generation"] if field in ["_citation_product"]: for attribute, value in attr_treatment.items(): if attribute == generation: header[field[1:]] = value else: raise AttributeError(f"Attribute treatment configuration for field `{field}` is not properly configured. Verify JSON.") to_delete.append(field) for field in to_delete: del header[field] return dict(metadata=variable_meta, header=header)
[docs] def homogenized_column_definitions( variable_code: str, ) -> tuple[dict, list[tuple[int, int]], dict[str, type[str | int | float] | Any], int]: """ Return the column names, widths, and data types for the AHCCD fixed-width format data. Parameters ---------- variable_code : str Returns ------- tuple[dict, list[tuple[int, int]], dict[str, type[str | int | float] | Any], int] """ metadata = load_json_data_mappings("eccc-homogenized") variable = metadata["variables"][variable_code]["_variable_name"] if variable.startswith("tas"): column_dtypes = { "No": str, "StnId": str, "Station name": str, "Prov": str, "FromYear": int, "FromMonth": int, "ToYear": int, "ToMonth": int, "%Miss": float, "Lat(deg)": float, "Long(deg)": float, "Elev(m)": int, "Joined": str, "RCS": str, } column_spaces = [(0, 5), (5, 6), (6, 8), (8, 9)] ii = 9 # 31 days in a month for _ in range(31): column_spaces.append((ii, ii + 7)) ii += 7 column_spaces.append((ii, ii + 1)) ii += 1 header_row = 3 elif variable.startswith("pr"): column_dtypes = { "Prov": str, "Station name": str, "stnid": str, "beg yr": int, "beg mon": int, "end yr": int, "end mon": int, "lat (deg)": float, "long (deg)": float, "elev (m)": int, "stns joined": str, } column_spaces = [(0, 4), (4, 5), (5, 7), (7, 8)] ii = 8 # 31 days in a month for _ in range(31): column_spaces.append((ii, ii + 8)) ii += 8 column_spaces.append((ii, ii + 1)) ii += 1 header_row = 0 else: raise KeyError column_names = {col.lower().split("(")[0].replace("%", "pct_").strip().replace(" ", "_"): col for col in list(column_dtypes.keys())} return column_names, column_spaces, column_dtypes, header_row
[docs] def obs_column_definitions( time_frequency: str, ) -> tuple[list[str], list[int], list[type[str | int]], int]: """Return the column names, widths, and data types for the fixed-width format.""" if time_frequency.lower() in ["h", "hour", "hourly"]: num_observations = 24 column_names = ["code", "year", "month", "day", "code_var"] column_widths = [7, 4, 2, 2, 3] column_dtypes = [str, int, int, int, str] elif time_frequency.lower() in ["d", "day", "daily"]: num_observations = 31 column_names = ["code", "year", "month", "code_var"] column_widths = [7, 4, 2, 3] column_dtypes = [str, int, int, str] else: raise NotImplementedError("`mode` must be 'h'/'hourly or 'd'/'daily'.") header = 0 # Add the data columns for i in range(1, num_observations + 1): data_entry, flag_entry = f"D{i:0n}", f"F{i:0n}" column_names.append(data_entry) column_names.append(flag_entry) column_widths.extend([6, 1] * num_observations) column_dtypes.extend([str, str]) return column_names, column_widths, column_dtypes, header