Source code for miranda.convert.hq

"""Hydro Quebec Weather Station Data Conversion module."""

from __future__ import annotations
import csv
import datetime as dt
import json
import logging
import os
import re
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd
import xarray as xr
from xclim.core.units import units as u
from xclim.core.units import units2pint


logger = logging.getLogger("miranda.convert.hq")

__all__ = ["open_csv"]

# CMOR-like attributes
cmor = json.load(Path(__file__).parent.joinpath("data").joinpath("hq_cf_attrs.json").open("r"))["variables"]

fp = r"[-+]?\d*,\d+|\d+"

section_patterns = r"(\w+) :\n"

meta_patterns = {
    "Installation": {
        "nom": "Nom;(.*)",
        "type": "Type;(.*)",
        "ouverture": "Ouverture;(.+)",
        "fermeture": "Fermeture;(.+)",
        "x": rf"XCOORD \(degrés\.décimales\);({fp})",
        "y": rf"YCOORD \(degrés\.décimales\);({fp})",
        "z": rf"ZCOORD \(mètres\);({fp})",
    },
    "Données": {
        "variable": "Type de donnée;(.*)",
        "fuseau": "Fuseau horaire;(.*)",
        "pas": "Pas de temps;(.*)",
        "mesure": "Type de mesure;(.*)",
        "unité": "Unité;(.*)",
    },
}

csv_patterns = ("Propriétaires", "Correspondances")

data_header_pattern = "Dateheure;Valeur;Qualite;Statut"

converters = {
    "ouverture": np.datetime64,
    "fermeture": np.datetime64,
    "x": lambda x: float(x.replace(",", ".")),
    "y": lambda x: float(x.replace(",", ".")),
    "z": lambda x: float(x.replace(",", ".")),
    "Dateheure": lambda x: dt.datetime.strptime(x, "%Y-%m-%d %H:%M"),
    "Valeur": lambda x: float(x.replace(",", ".")) if x == "" else np.nan,
    "Qualite": lambda x: int(x.split("-")[0]),
    "Statut": lambda x: int(x.split("-")[0]),
    "unité": lambda x: cf_units.get(x, x),
    "pas": lambda x: cf_frequency.get(x, x),
}


def guess_variable(meta: dict[str, Any], cf_table: dict | None) -> tuple[str, str | None]:
    """
    Return the corresponding CMOR variable.

    Parameters
    ----------
    meta : dict
        The metadata.
    cf_table : dict, optional
        The CMOR table.

    Returns
    -------
    tuple
        The variable name and the table name.
    """
    if cf_table is None:
        cf_table = cmor

    v = meta["variable"]

    corr = {
        "Pluie (mm)": "prlp",
        "Neige (mm)": "prsn",
        "Épaisseur de neige": "snd",
        "Vitesse du vent 10 mètres": "sfcWind",
        "Direction du vent 10 mètres": "sfcWindAz",
        "Humidité relative 2 mètres": "hurs",
    }

    name = ""
    table_name = None
    if v in corr:
        name = corr[v]
    else:
        if v == "Température":
            if meta["mesure"] == "Maximum":
                name = "tasmax"
            elif meta["mesure"] == "Minimum":
                name = "tasmin"

            table_name = f"{name}_{meta['pas']}"

    if meta["pas"] != cf_table[table_name or name]["frequency"]:
        raise ValueError("Unexpected frequency.")

    return name, table_name or name


cf_units = {"°C": "celsius", "mm": "mm/day"}
cf_frequency = {
    "Fin du pas journalier": "day",
    "Instantanée du pas horaire": "1h",
    "Fin du pas horaire": "1h",
}
cf_attrs_names = {"x": "lon", "y": "lat", "z": "elevation", "nom": "site"}


def extract_daily(path: str | os.PathLike[str]) -> tuple[dict, pd.DataFrame]:
    """
    Extract data and metadata from HQ météo file.

    Parameters
    ----------
    path : os.PathLike or str
        The path to the file.

    Returns
    -------
    tuple
        The metadata and the data.
    """
    with Path(path).open("r", encoding="latin1") as fh:
        txt = fh.read()
        meta, data = re.split(data_header_pattern, txt, maxsplit=2)

    sections = iter(re.split(section_patterns, meta)[1:])

    m = {}
    for sec in sections:
        if sec in meta_patterns:
            content = next(sections)
            for key, pat in meta_patterns[sec].items():
                match = re.search(pat, content)
                if match:
                    m[key] = match.groups()[0]

        elif sec in csv_patterns:
            content = next(sections).strip()
            m[sec] = list(csv.reader(content.splitlines(), dialect="unix", delimiter=";"))

    d = pd.read_csv(
        path,
        delimiter=";",
        skiprows=len(meta.splitlines()),
        encoding="latin1",
        index_col=0,
        parse_dates=True,
        infer_datetime_format=True,
        decimal=",",
    )

    return m, d


def to_cf(meta: dict[str, Any], data: pd.DataFrame, cf_table: dict[str, Any] | None = None) -> xr.DataArray:
    """
    Return CF-compliant metadata.

    Parameters
    ----------
    meta : dict
        The metadata dictionary.
    data : pd.DataFrame
        The data DataFrame.
    cf_table : dict, optional
        The CF table dictionary.

    Returns
    -------
    xr.DataArray
        The CF-compliant xarray DataArray.
    """
    if cf_table is None:
        cf_table = {}

    # Convert meta values
    m = {}
    for key, val in meta.items():
        m[key] = converters.get(key, lambda q: q)(val)

    # Get default variable attributes
    name, table_name = guess_variable(m, cf_table)
    attrs = cf_table.get(table_name)

    # Add custom HQ attributes
    for key, val in cf_attrs_names.items():
        if key in m:
            attrs[val] = m[key]

    x = data["Valeur"].values.astype(float)

    # Convert units
    if attrs["units"] != m["unité"]:
        x = u.convert(x, m["unité"], units2pint(attrs["units"]))

    coords = {k: attrs.pop(k, np.nan) for k in ["lon", "lat", "elevation", "site"]}
    coords["time"] = data.index.values
    cf_corrected = xr.DataArray(data=x, dims="time", coords=coords, name=name, attrs=attrs)
    return cf_corrected


[docs] def open_csv(path: str | os.PathLike[str], cf_table: dict[str, Any] | None = cmor) -> xr.DataArray: """ Extract daily HQ meteo data and convert to xr.DataArray with CF-Convention attributes. Parameters ---------- path : os.PathLike or str The path to the file. cf_table : dict, optional The CF table dictionary. Returns ------- xr.DataArray The CF-compliant xarray DataArray. """ meta, data = extract_daily(path) return to_cf(meta, data, cf_table)