"""Environment and Climate Change Canada RDRS conversion tools."""
from __future__ import annotations
import logging
import os
import pathlib
from pathlib import Path
from typing import Any
import h5py
import xarray as xr
from numpy import unique
from miranda.io import fetch_chunk_config, write_dataset_dict
from miranda.units import check_time_frequency
from ._aggregation import aggregate
from ._data_corrections import dataset_conversion, load_json_data_mappings
from ._data_definitions import gather_eccc_rdrs, gather_raw_rdrs_by_years
logger = logging.getLogger("miranda.convert.eccc_rdrs")
CONFIG_FOLDER = pathlib.Path(__file__).parent / "data"
CONFIG_FILES = {
"rdrs-v21": "eccc_rdrs_cf_attrs.json",
"casr-v31": "eccc_casr_cf_attrs.json",
"casr-v32": "eccc_casr_cf_attrs.json",
"ORRC-v10": "ouranos_orrc_cf_attrs.json",
"ORRC-v11": "ouranos_orrc_cf_attrs.json",
}
for k, v in CONFIG_FILES.items():
CONFIG_FILES[k] = CONFIG_FOLDER / v
# __all__ = ["convert_rdrs", "rdrs_to_daily"]
# FIXME: Can we use `name_output_file` instead? We already have a better version of this function.
def _get_drop_vars(file: str | os.PathLike[str], *, keep_vars: list[str] | set[str]):
"""
Determine dropped variables.
Parameters
----------
file : str or os.PathLike
The file to check.
keep_vars : list or set of str
The variables to keep.
Returns
-------
list
The dropped variables.
"""
drop_vars = list(xr.open_dataset(file).data_vars)
return list(set(drop_vars) - set(keep_vars))
[docs]
def convert_rdrs(
project: str,
input_folder: str | os.PathLike[str],
output_folder: str | os.PathLike[str],
output_format: str = "zarr",
working_folder: str | os.PathLike[str] | None = None,
overwrite: bool = False,
year_start: int | None = None,
year_end: int | None = None,
cfvariable_list: list | None = None,
**dask_kwargs: dict[str, Any],
) -> None:
r"""
Convert RDRS dataset.
Parameters
----------
project : str
The project name.
input_folder : str or os.PathLike
The input folder.
output_folder : str or os.PathLike
The output folder.
output_format : {"netcdf", "zarr"}
The output format.
working_folder : str or os.PathLike, optional
The working folder.
overwrite : bool
Whether to overwrite existing files. Default: False.
year_start : int, optional
The start year.
If not provided, the minimum year in the dataset will be used.
year_end : int, optional
The end year.
If not provided, the maximum year in the dataset will be used.
cfvariable_list : list, optional
The CF variable list.
**dask_kwargs : dict
Additional keyword arguments passed to the Dask scheduler.
"""
# TODO: This setup configuration is near-universally portable. Should we consider applying it to all conversions?
var_attrs = load_json_data_mappings(project)["variables"]
prefix = load_json_data_mappings(project)["Header"]["_prefix"][project]
if cfvariable_list:
var_attrs = {
v: var_attrs[v] for v in var_attrs if "_cf_variable_name" in var_attrs[v] and var_attrs[v]["_cf_variable_name"] in cfvariable_list
}
freq_dict = dict(h="hr", d="day")
var_name_map = {v: f"{prefix}{v}" for v in var_attrs.keys()}
if isinstance(input_folder, str):
input_folder = Path(input_folder).expanduser()
if isinstance(output_folder, str):
output_folder = Path(output_folder).expanduser()
if isinstance(working_folder, str):
working_folder = Path(working_folder).expanduser()
# FIXME: Do we want to collect everything? Maybe return a dictionary with years and associated files?
gathered = gather_raw_rdrs_by_years(input_folder, project)
for year, ncfiles in gathered[project].items():
if year_start and int(year) < year_start:
continue
if year_end and int(year) > year_end:
continue
if not ncfiles:
continue
ds_allvars = None
for nc in ncfiles:
eng = "h5netcdf" if h5py.is_hdf5(nc) else "netcdf4"
try:
ds1 = xr.open_dataset(nc, chunks="auto", engine=eng, cache=False)
except (OSError, RuntimeError) as e:
msg = f"Failed to open {nc} with engine {eng}. Error: {e}"
logger.error(msg)
raise RuntimeError(msg) from e
if isinstance(ds1.indexes["time"], xr.coding.cftimeindex.CFTimeIndex):
ds1 = ds1.copy()
ds1["time"] = ("time", ds1.indexes["time"].to_datetimeindex())
if ds_allvars is None:
out_freq = None
ds_allvars = ds1
if out_freq is None:
out_freq, meaning = check_time_frequency(ds1)
out_freq = f"{out_freq[0]}{freq_dict[out_freq[1]]}" if meaning == "hour" else freq_dict[out_freq[1]]
ds_allvars.attrs["frequency"] = out_freq
else:
ds_allvars = xr.concat([ds_allvars, ds1], data_vars="minimal", dim="time")
ds_allvars = ds_allvars.sel(time=f"{year}")
# This is the heart of the conversion utility; We could apply this to multiple projects.
for month in unique(ds_allvars.time.dt.month):
ds_month = ds_allvars.sel(time=f"{year}-{str(month).zfill(2)}")
for short_var in var_attrs.keys():
real_var_name = var_name_map[short_var]
drop_vars = _get_drop_vars(ncfiles[0], keep_vars=[real_var_name, "rotated_pole"])
ds_out = ds_month.drop_vars(drop_vars)
ds_out = ds_out.rename({real_var_name: short_var})
ds_out = ds_out.assign_coords(rotated_pole=ds_out["rotated_pole"])
if ds_out[short_var].attrs["units"] == "-":
ds_out[short_var].attrs["units"] = "1"
ds_corr = dataset_conversion(
ds_out,
project=project,
add_version_hashes=False,
overwrite=overwrite,
)
# Code to handle _use_snapshot attribute: will write directly to `day` subfolder
modfreq_flg = False
out_freq1 = None
meaning1 = None
if "_use_snapshot" in var_attrs[short_var]:
# make sure it is not explicitly set to False
if var_attrs[short_var]["_use_snapshot"]:
modfreq_flg = True
out_freq1, meaning1 = check_time_frequency(ds_corr, minimum_continuous_period="1h")
out_freq1 = f"{out_freq1[0]}{freq_dict[out_freq1[1]]}" if meaning1 == "hour" else freq_dict[out_freq1[1].lower()]
if "level" in ds_corr.dims:
ds_corr = ds_corr.squeeze()
ds_corr = ds_corr.drop_vars(["a", "b", "level"])
chunks = fetch_chunk_config(priority="time", freq=out_freq, dims=ds_corr.dims)
chunks["time"] = len(ds_corr.time)
cf_var = var_attrs[short_var]["_cf_variable_name"] if var_attrs[short_var]["_cf_variable_name"] else short_var
outfolder1 = output_folder.joinpath(out_freq1) if modfreq_flg else output_folder.joinpath(out_freq)
write_dataset_dict(
{cf_var: ds_corr},
output_folder=outfolder1,
temp_folder=working_folder,
output_format=output_format,
overwrite=overwrite,
chunks=chunks,
**dask_kwargs,
)
# FIXME: This looks mostly like code to stage writing out files. Should it be moved to an IO module?
[docs]
def rdrs_to_daily(
project: str,
input_folder: str | os.PathLike,
output_folder: str | os.PathLike,
working_folder: str | os.PathLike | None = None,
overwrite: bool = False,
output_format: str = "zarr",
year_start: int | None = None,
year_end: int | None = None,
process_variables: list[str] | None = None,
**dask_kwargs: dict[str, Any],
) -> None:
r"""
Write out RDRS files to daily-timestep files.
Parameters
----------
project : str
The project name.
input_folder : str or os.PathLike
The input folder.
output_folder : str or os.PathLike
The output folder.
working_folder : str or os.PathLike
The working folder.
overwrite : bool
Whether to overwrite existing files. Default: False.
output_format : {"netcdf", "zarr"}
The output format.
year_start : int, optional
The start year.
If not provided, the minimum year in the dataset will be used.
year_end : int, optional
The end year.
If not provided, the maximum year in the dataset will be used.
process_variables : list of str, optional
The variables to process.
If not provided, all variables will be processed.
**dask_kwargs : dict
Additional keyword arguments passed to the Dask scheduler.
"""
if isinstance(input_folder, str):
input_folder = Path(input_folder).expanduser()
if isinstance(output_folder, str):
output_folder = Path(output_folder).expanduser() # noqa
if isinstance(working_folder, str):
working_folder = Path(working_folder).expanduser()
# GATHER ALL RDRS FILES
gathered = gather_eccc_rdrs(project, input_folder, "zarr", "cf")
files = gathered[project] # noqa
if process_variables:
for vv in [f for f in files.keys() if f not in process_variables]:
files.pop(vv)
for zarrs in files.values():
zarrs = sorted(zarrs)
if not year_start:
year_start = xr.open_zarr(zarrs[0]).time.dt.year.min().values
if not year_end:
year_end = xr.open_zarr(zarrs[-1]).time.dt.year.max().values
for year in range(year_start, year_end + 1):
infiles = [z for z in zarrs if f"_{year}" in z.name]
if len(infiles) != 12:
msg = f"Found {len(infiles)} input files for {year}. The year is incomplete."
logger.warning(msg)
out_variables = aggregate(xr.open_mfdataset(infiles, engine="zarr"), freq="day")
dims = set(next(iter(out_variables.values())).dims)
chunks = fetch_chunk_config(priority="time", freq="day", dims=dims)
chunks["time"] = len(out_variables[list(out_variables.keys())[0]].time)
write_dataset_dict(
out_variables,
output_folder=output_folder,
temp_folder=working_folder,
output_format=output_format,
overwrite=overwrite,
chunks=chunks,
**dask_kwargs,
)