Source code for miranda.convert.eccc_rdrs

import logging.config
import os
from pathlib import Path
from typing import List, Optional, Set, Union

import xarray as xr
from numpy import unique

from miranda.io import fetch_chunk_config, write_dataset_dict
from miranda.scripting import LOGGING_CONFIG
from miranda.units import get_time_frequency

from ._aggregation import aggregate
from ._data_corrections import dataset_conversion, load_json_data_mappings
from ._data_definitions import gather_raw_rdrs_by_years, gather_rdrs

logging.config.dictConfig(LOGGING_CONFIG)


__all__ = ["convert_rdrs", "rdrs_to_daily"]


# FIXME: Can we use `name_output_file` instead? We already have a better version of this function.


def _get_drop_vars(
    file: Union[str, os.PathLike], *, keep_vars: Union[List[str], Set[str]]
):
    drop_vars = list(xr.open_dataset(file).data_vars)
    return list(set(drop_vars) - set(keep_vars))


[docs]def convert_rdrs( project: str, input_folder: Union[str, os.PathLike], output_folder: Union[str, os.PathLike], output_format: str = "zarr", working_folder: Optional[Union[str, os.PathLike]] = None, overwrite: bool = False, **dask_kwargs, ): """ Parameters ---------- project input_folder output_folder output_format working_folder overwrite dask_kwargs Returns ------- """ # TODO: This setup configuration is near-universally portable. Should we consider applying it to all conversions? var_attrs = load_json_data_mappings(project=project)["variables"] freq_dict = dict(h="hr", d="day") if isinstance(input_folder, str): input_folder = Path(input_folder).expanduser() if isinstance(output_folder, str): output_folder = Path(output_folder).expanduser() if isinstance(working_folder, str): working_folder = Path(working_folder).expanduser() # FIXME: Do we want to collect everything? Maybe return a dictionary with years and associated files? gathered = gather_raw_rdrs_by_years(input_folder) for year, ncfiles in gathered[project].items(): ds_allvars = None if len(ncfiles) >= 28: for nc in ncfiles: ds1 = xr.open_dataset(nc, chunks="auto") if ds_allvars is None: ds_allvars = ds1 outfreq, meaning = get_time_frequency(ds1) outfreq = ( f"{outfreq[0]}{freq_dict[outfreq[1]]}" if meaning == "hour" else freq_dict[outfreq[1]] ) ds_allvars.attrs["frequency"] = outfreq else: ds_allvars = xr.concat( [ds_allvars, ds1], data_vars="minimal", dim="time" ) ds_allvars = ds_allvars.sel(time=f"{year}") # This is the heart of the conversion utility; We could apply this to multiple projects. for month in unique(ds_allvars.time.dt.month): ds_month = ds_allvars.sel(time=f"{year}-{str(month).zfill(2)}") for var_attr in var_attrs.keys(): drop_vars = _get_drop_vars( ncfiles[0], keep_vars=[var_attr, "rotated_pole"] ) ds_out = ds_month.drop_vars(drop_vars) ds_out = ds_out.assign_coords(rotated_pole=ds_out["rotated_pole"]) ds_corr = dataset_conversion( ds_out, project=project, add_version_hashes=False, overwrite=overwrite, ) chunks = fetch_chunk_config(project=project, freq=outfreq) chunks["time"] = len(ds_corr.time) write_dataset_dict( {var_attrs[var_attr]["_cf_variable_name"]: ds_corr}, output_folder=output_folder.joinpath(outfreq), temp_folder=working_folder, output_format=output_format, overwrite=False, chunks=chunks, **dask_kwargs, )
# FIXME: This looks mostly like code to stage writing out files. Should it be moved to an IO module?
[docs]def rdrs_to_daily( project: str, input_folder: Union[str, os.PathLike], output_folder: Union[str, os.PathLike], working_folder: Optional[Union[str, os.PathLike]] = None, overwrite: bool = False, output_format: str = "zarr", year_start: Optional[int] = None, year_end: Optional[int] = None, process_variables: Optional[list] = None, **dask_kwargs, ): """ Parameters ---------- project input_folder output_folder working_folder overwrite output_format year_start year_end process_variables dask_kwargs Returns ------- """ if isinstance(input_folder, str): input_folder = Path(input_folder).expanduser() if isinstance(output_folder, str): output_folder = Path(output_folder).expanduser() # noqa if isinstance(working_folder, str): working_folder = Path(working_folder).expanduser() # GATHER ALL RDRS FILES gathered = gather_rdrs(project, input_folder, "zarr", "cf") files = gathered["rdrs-v21"] # noqa if process_variables: for vv in [f for f in files.keys() if f not in process_variables]: files.pop(vv) for vv, zarrs in files.items(): zarrs = sorted(zarrs) if not year_start: year_start = xr.open_zarr(zarrs[0]).time.dt.year.min().values if not year_end: year_end = xr.open_zarr(zarrs[-1]).time.dt.year.max().values for year in range(year_start, year_end + 1): infiles = [z for z in zarrs if f"_{year}" in z.name] if len(infiles) != 12: raise ValueError(f"Found {len(infiles)} input files. Expected 12.") # out_variables = aggregate( xr.open_mfdataset(infiles, engine="zarr"), freq="day" ) chunks = fetch_chunk_config(project=project, freq="day") chunks["time"] = len(out_variables[list(out_variables.keys())[0]].time) write_dataset_dict( out_variables, output_folder=output_folder, temp_folder=working_folder, output_format=output_format, overwrite=overwrite, chunks=chunks, **dask_kwargs, )