"""Conversion Utilities submodule."""
from __future__ import annotations
import hashlib
import logging
import os
import re
import shutil
from pathlib import Path
from typing import Any
import cftime
import pandas as pd
import xarray as xr
from dask.diagnostics import ProgressBar
from pandas._libs import NaTType # noqa
logger = logging.getLogger("miranda.convert.utils")
__all__ = [
"_add_coords_to_dataset",
"date_parser",
"find_version_hash",
"get_station_meta",
]
prj_dict = dict(
ghcnd=dict(freq="daily", filetype=".csv"),
canhomt_dly=dict(freq="daily", filetype=".csv"),
ghcnh=dict(freq="hourly", filetype=".psv"),
)
q_flag_dict = {
"ghcnd": {
"D": "failed duplicate check",
"G": "failed gap check",
"I": "failed internal consistency check",
"K": "failed streak/frequent-value check",
"L": "failed check on length of multiday period",
"M": "failed megaconsistency check",
"N": "failed naught check",
"O": "failed climatological outlier check",
"R": "failed lagged range check",
"S": "failed spatial consistency check",
"T": "failed temporal consistency check",
"W": "temperature too warm for snow",
"X": "failed bounds check",
"Z": "flagged as a result of an official Datzilla investigation",
},
"canhomt_dly": {
"I": "infilled data",
},
}
def _assign_coords_to_dataset(ds: xr.Dataset, list_coords: list[str] | str | None = None) -> xr.Dataset:
"""
Promote one or more data variables to coordinates on an xarray Dataset.
For each name in ``list_coords`` this function will check whether the
name exists in ``ds.data_vars`` and is not already present in
``ds.coords``. If both conditions are met, the corresponding
DataArray is assigned as a coordinate on the returned Dataset.
Parameters
----------
ds : xr.Dataset
Dataset to modify.
list_coords : list[str], str, or None, optional
A single variable name, list of variable names to assign as
coordinates, or None. If None, the function returns the Dataset
unchanged. Names that are not present in ``ds.data_vars`` or are
already coordinates are ignored. Default is None.
Returns
-------
xr.Dataset
The input Dataset with the requested coordinates assigned. The
function returns a (possibly) new Dataset object as produced by
xarray's ``assign_coords`` method.
Notes
-----
- The function does not remove the original data variables; it only
creates coordinates that reference the same DataArray objects.
- Passing a single string or a list of strings is supported.
- If ``list_coords`` is None, the input all data variables without a time dimension are assigned as coords.
Example
-------
>>> ds = xr.Dataset({"lon": (("station",), [10.0]), "temp": (("station", "time"), [[1.0]])}, coords={"station": [0], "time": ["2020-01-01"]})
>>> ds = _assign_coords_to_dataset(ds, "lon")
>>> "lon" in ds.coords
True
"""
if list_coords is None:
list_coords = [var for var in ds.data_vars if "time" not in ds[var].dims]
if isinstance(list_coords, str):
list_coords = [list_coords]
for cc in list_coords:
if cc in ds.data_vars and cc not in ds.coords:
ds = ds.assign_coords({cc: ds[cc]})
return ds
def _add_coords_to_dataset(ds: xr.Dataset, df_stat: pd.DataFrame, float_flag=True) -> xr.Dataset:
"""
Add coordinates to the dataset from the station metadata.
Parameters
----------
ds : xr.Dataset
Dataset.
df_stat : pd.DataFrame
Station metadata.
float_flag : bool, optional
Whether to convert data variables to float32. Default is True.
Returns
-------
xr.Dataset
Dataset with added coordinates.
"""
for cc in [c for c in df_stat.columns if c not in ["station_id", "geometry", "index_right"]]:
if cc not in ds.coords:
ds = ds.assign_coords({cc: xr.DataArray([df_stat[cc].values[0]], coords=ds.station.coords)})
if float_flag:
for vv in ds.data_vars:
if ds[vv].dtype == "float64":
ds[vv] = ds[vv].astype("float32")
return ds
[docs]
def find_version_hash(file: str | os.PathLike[str]) -> dict[str, Any]:
"""
Check for an existing version hash file and, if one cannot be found, generate one from file.
Parameters
----------
file : str or os.PathLike
The file to check.
Returns
-------
dict
The version and hash.
"""
def _get_hash(f: str) -> str:
"""
Calculate the sha256sum of a file.
Parameters
----------
f : str or os.PathLike
The file to hash.
Returns
-------
str
The hash.
"""
hash_sha256_writer = hashlib.sha256()
with Path(f).open("rb") as f_opened:
hash_sha256_writer.update(f_opened.read())
sha256sum = hash_sha256_writer.hexdigest()
_msg = f"Calculated sha256sum (starting: {sha256sum[:6]})"
logger.info(_msg)
del hash_sha256_writer
return sha256sum
version_info = dict()
possible_version = Path(file).parent.name
if re.match(r"^v\d+", possible_version, re.IGNORECASE):
version_info["version"] = Path(file).parent.name
version_info["sha256sum"] = _get_hash(file)
else:
file_identity = str(Path(file).name).split(".")[0]
possible_version_signature = Path(file).parent.glob(f"{file_identity}.*")
for sig in possible_version_signature:
found_version = re.search(r"\.(v\d+.+)$", sig.name, re.IGNORECASE)
if found_version:
try:
version_info["version"] = found_version.group()
version_info["sha256sum"] = int(sig.open().read())
except ValueError:
msg = "Unable to read version hash file. Calculating sha256sum."
logger.error(msg)
continue
break
else:
version_info["version"] = "vNotFound"
version_info["sha256sum"] = _get_hash(file)
return version_info
[docs]
def date_parser(
date: str,
*,
end_of_period: bool = False,
output_type: str = "str",
strftime_format: str = "%Y-%m-%d",
) -> str | pd.Timestamp | NaTType:
"""
Parse datetime objects from a string representation of a date or both a start and end date.
Parameters
----------
date : str
Date to be converted.
end_of_period : bool
If True, the date will be the end of month or year depending on what's most appropriate.
output_type : {"datetime", "str"}
Desired returned object type.
strftime_format : str
If output_type=='str', this sets the strftime format.
Returns
-------
pd.Timestamp or str or pd.NaT
Parsed date.
Notes
-----
Adapted from code written by Gabriel Rondeau-Genesse (@RondeauG).
"""
# Formats, ordered depending on string length
formats = {
4: ["%Y"],
6: ["%Y%m"],
7: ["%Y-%m"],
8: ["%Y%m%d"],
10: ["%Y%m%d%H", "%Y-%m-%d"],
12: ["%Y%m%d%H%M"],
13: ["%Y%m-%Y%m"],
17: ["%Y%m%d-%Y%m%d"],
19: ["%Y-%m-%dT%H:%M:%S"],
21: ["%Y%m%d%H-%Y%m%d%H"],
25: ["%Y%m%d%H%M-%Y%m%d%H%M"],
}
end_date_found = False
def _parse_date(d: str, fmts: list[str]) -> tuple[pd.Timestamp, str]:
"""
Parse the date.
Parameters
----------
d : str
The date string.
fmts : list
The list of formats to try.
Returns
-------
pd.Timestamp
The parsed date.
"""
for fmt in fmts:
try:
s = pd.to_datetime(d, format=fmt)
match = fmt
break
except ValueError: # noqa: S110
pass
else:
raise ValueError(f"Can't parse date {d} with supported formats: [{', '.join(fmts)}].")
return s, match
date_format = None
if isinstance(date, str):
if len(date) in [13, 17, 21, 25]:
dates = date.split("-")
if not end_of_period:
date = dates[0]
else:
date = dates[1]
end_date_found = True
try:
possible_formats = formats[len(date)]
date, date_format = _parse_date(date, possible_formats)
except KeyError:
# Return NaT for fixed/missing/ill-formatted date strings
return pd.NaT
elif isinstance(date, cftime.datetime): # noqa
for n in range(3):
try:
date = pd.Timestamp.fromisoformat((date - pd.Timedelta(n)).isoformat())
except ( # noqa: PERF203,S110 # We are NOT catching OutOfBoundsDatetime.
ValueError
):
pass
else:
break
else:
raise ValueError(f"Unable to parse cftime date {date}, even when moving back 2 days.")
elif not isinstance(date, pd.Timestamp):
date = pd.Timestamp(date) # noqa
if end_of_period and date_format and not end_date_found:
if "m" not in date_format:
date = date + pd.tseries.offsets.YearEnd(1) # noqa
elif "d" not in date_format:
date = date + pd.tseries.offsets.MonthEnd(1) # noqa
if output_type == "str":
return date.strftime(strftime_format)
return date
# FIXME: Needs to be moved to different module in a future version
def _get_ghcn_stations(project: str) -> pd.DataFrame:
"""
Get GHCN station metadata.
Parameters
----------
project : str
Project name.
Returns
-------
pd.DataFrame
Station metadata.
"""
if project == "ghcnd":
station_url = "https://noaa-ghcn-pds.s3.amazonaws.com/ghcnd-stations.txt"
dtypes = {
"station_id": str,
"lat": float,
"lon": float,
"elevation": float,
"state": str,
"station_name": str,
"gsn_flag": str,
"hcn_flag": str,
"wmo_id": str,
}
try:
station_df = pd.read_fwf(
station_url,
widths=[11, 9, 10, 7, 3, 31, 4, 4, 6],
names=[d for d in dtypes.keys()],
converters=dtypes,
)
except ValueError:
statfile = Path(__file__).parent.joinpath("data/ghcnd-stations.txt")
station_df = pd.read_fwf(
statfile,
widths=[11, 9, 10, 7, 3, 31, 4, 4, 6],
names=[d for d in dtypes.keys()],
converters=dtypes,
)
else:
raise ValueError(f"unknown project values {project}")
return station_df
# FIXME: Needs to be moved to different module in a future version
def _get_canhomt_stations(project: str) -> pd.DataFrame:
"""
Get CanHomT station metadata.
Parameters
----------
project : str
Project name.
Returns
-------
pd.DataFrame
Station metadata.
"""
if project == "canhomt_dly":
station_url = "https://crd-data-donnees-rdc.ec.gc.ca/CDAS/products/CanHomTV4/Temp_Stations_Gen4_2024_monthly.csv"
try:
station_df = pd.read_csv(
station_url,
)
except ValueError:
statfile = Path(__file__).parent.joinpath("data/eccc-canhomt_Temp_Stations_Gen4_2024_monthly.zip")
station_df = pd.read_csv(
statfile,
)
station_df = station_df.rename(columns={p: p.lower() for p in station_df.columns})
rename = {"name": "station_name", "id": "station_id", "ele": "elevation"}
station_df = station_df.rename(columns=rename)
else:
raise ValueError(f"Unknown project value: {project}")
return station_df
def make_monotonous_time(ds: xr.Dataset, freq: str):
"""
Make time monotonous.
Parameters
----------
ds : xr.Dataset
Dataset.
freq : str
Frequency.
Returns
-------
xr.Dataset
Dataset with monotonous time.
"""
if freq == "daily":
time1 = pd.date_range(ds.time[0].values, ds.time[-1].values, freq="D")
elif freq == "hourly":
time1 = pd.date_range(ds.time[0].values, ds.time[-1].values, freq="h")
else:
raise ValueError(f"Unknown frequency {freq}")
dsnew = xr.Dataset(coords=dict(time=time1, station=ds.station))
for vv in ds.data_vars:
dsnew[vv] = ds[vv]
return dsnew
def write_zarr(
ds: xr.Dataset,
out_zarr: Path,
chunks: dict,
zarr_format: int = 2,
overwrite: bool = False,
) -> None:
"""
Write Zarr file.
Parameters
----------
ds : xr.Dataset
Dataset.
out_zarr : Path
Output Zarr file.
chunks : dict
Chunk sizes.
zarr_format : int
Zarr format version (2 or 3). Default is 2.
overwrite : bool
Whether to overwrite. Default is False.
"""
if not out_zarr.exists() or overwrite:
with ProgressBar():
for vv in ds.data_vars:
if ds[vv].dtype == object:
ds[vv] = ds[vv].astype(str)
ds.chunk(chunks).to_zarr(out_zarr.with_suffix(".tmp.zarr"), mode="w", zarr_format=zarr_format)
shutil.move(out_zarr.with_suffix(".tmp.zarr"), out_zarr)