Source code for miranda.convert.deh

"""DEH Hydrograph Conversion module."""

from __future__ import annotations
import json
import logging
import os
import re
from pathlib import Path

import pandas as pd
import xarray as xr
from xclim.core.units import units as u


logger = logging.getLogger("miranda.convert.deh")

__all__ = ["open_txt"]

# CMOR-like attributes
cmor = json.load(Path(__file__).parent.joinpath("data").joinpath("deh_cf_attrs.json").open())[  # noqa
    "variable_entry"
]

# TODO: Some potentially useful attributes were skipped
# because they would be complicated to include in a dataset since they vary per station
meta_patterns = {
    "Station: ": "name",
    "Bassin versant: ": "bv",
    "Coordonnées: (NAD83) ": "coords",
}

data_header_pattern = "Station Date Débit (m³/s) Remarque\n"


def extract_daily(path: os.PathLike | str) -> tuple[dict, pd.DataFrame]:
    """
    Extract data and metadata from DEH (MELCCFP) stream flow file.

    Parameters
    ----------
    path : os.PathLike or str
        The path to the file.

    Returns
    -------
    tuple[dict, pd.DataFrame]
        The metadata and the data.
    """
    with Path(path).open("r", encoding="latin1") as fh:
        txt = fh.read()
        txt = re.sub(" +", " ", txt)
        meta, data = txt.split(data_header_pattern)

    m = dict()
    for key in meta_patterns:
        # Various possible separators to take into account
        m[meta_patterns[key]] = meta.split(key)[1].split(" \n")[0].split("\n")[0].split(" Régime")[0]

    d = pd.read_csv(
        path,
        delimiter=r"\s+",
        skiprows=len(meta.splitlines()),
        encoding="latin1",
        converters={0: lambda x: str(x)},  # noqa
        index_col=1,
        parse_dates=True,
        infer_datetime_format=True,
    )
    if len(d["Station"].unique()) == 1:
        m["station"] = d["Station"].unique()[0]
        d = d.drop("Station", axis=1)
    else:
        raise ValueError("Multiple stations detected in the same file.")
    d = d.rename(columns={"Remarque": "Nan", "(m³/s)": "Remarque"})
    d.index.names = ["time"]
    d = d.drop("Nan", axis=1)

    return m, d


def to_cf(meta: dict, data: pd.DataFrame, cf_table: dict) -> xr.Dataset:
    """
    Return CF-compliant metadata.

    Parameters
    ----------
    meta : dict
        The metadata dictionary.
    data : pd.DataFrame
        The data DataFrame.
    cf_table : dict
        The CF table dictionary.

    Returns
    -------
    xr.Dataset
        The CF-compliant dataset.
    """
    ds = xr.Dataset()

    ds["q"] = xr.DataArray(data["Débit"], attrs=cf_table["q"])
    ds["flag"] = xr.DataArray(data["Remarque"], attrs=cf_table["flag"])

    ds["name"] = xr.DataArray(meta["name"])
    ds["station_id"] = xr.DataArray(meta["station"])

    ds["area"] = xr.DataArray(
        u.convert(float(meta["bv"].split(" ")[0]), meta["bv"].split(" ")[1], "km²"),
        attrs={"long_name": "drainage area", "units": "km2"},
    )

    def _parse_dms(coord: str) -> float:
        """
        Parse dimensions.

        Parameters
        ----------
        coord : str
            The coordinate string.

        Returns
        -------
        float
            The parsed coordinate.
        """
        deg, minutes, seconds, _ = re.split("[°'\"]", coord)
        if float(deg) > 0:
            return round(float(deg) + float(minutes) / 60 + float(seconds) / (60 * 60), 6)
        return round(float(deg) - (float(minutes) / 60 + float(seconds) / (60 * 60)), 6)

    coords = meta["coords"].split(" // ")
    ds["lat"] = xr.DataArray(
        _parse_dms(coords[0]),
        attrs={
            "standard_name": "latitude",
            "long_name": "latitude",
            "units": "decimal_degrees",
        },
    )
    ds["lon"] = xr.DataArray(
        _parse_dms(coords[1]),
        attrs={
            "standard_name": "longitude",
            "long_name": "longitude",
            "units": "decimal_degrees",
        },
    )

    ds.attrs["institution"] = "Ministère de l'Environnement et de la Lutte contre les changements climatiques"
    ds.attrs["source"] = "Hydrometric data <https://www.cehq.gouv.qc.ca/hydrometrie/historique_donnees/index.asp>"
    ds.attrs["redistribution"] = "Redistribution policy unknown. For internal use only."

    return ds


[docs] def open_txt(path: str | Path, cf_table: dict | None = cmor) -> xr.Dataset: """ Extract daily HQ meteorological data and convert to xr.DataArray with CF-Convention attributes. Parameters ---------- path : str or Path The path to the file. cf_table : dict, optional The CF table dictionary. Returns ------- xr.Dataset The CF-compliant dataset. """ meta, data = extract_daily(path) return to_cf(meta, data, cf_table)