Source code for miranda.ncar._aws_cordex

from __future__ import annotations
import ast
import functools
import json
import logging
import warnings
from json.decoder import JSONDecodeError
from pathlib import Path

import schema
import xarray as xr
from dask.diagnostics import ProgressBar
from xclim.core import calendar as xcal  # noqa

from miranda.gis import subsetting_domains


try:
    import intake
    import intake_esm  # noqa: F401
    import numcodecs  # noqa: F401
    import s3fs  # noqa: F401
except ImportError:
    intake = None
    warnings.warn(f"{__name__} functions require additional dependencies. Please install them with `pip install miranda[remote]`.", stacklevel=2)


__all__ = [
    "cordex_aws_calendar_correction",
    "cordex_aws_download",
]

_allowed_args = schema.Schema(
    {
        schema.Optional("variable"): schema.Schema(
            schema.Or(
                [
                    "hurs",
                    "huss",
                    "pr",
                    "prec",
                    "ps",
                    "rsds",
                    "sfcWind",
                    "tas",
                    "tasmax",
                    "tasmin",
                    "temp",
                    "tmax",
                    "tmin",
                    "uas",
                    "vas",
                ]
            )
        ),
        schema.Optional("frequency"): "day",
        schema.Optional("scenario"): schema.Schema(schema.Or(["eval", "hist", "rcp45", "rcp85", "hist-rcp45", "hist-rcp85"])),
        schema.Optional("grid"): schema.Or(["NAM-22i", "NAM-44i"]),
        schema.Optional("bias_correction"): schema.Or(["raw", "mbcn-Daymet", "mbcn-gridMET"]),
    }
)


[docs] def cordex_aws_calendar_correction(ds) -> xr.Dataset | None: """ AWS-stored CORDEX datasets are all on the same standard calendar, this converts the data back to the original calendar, removing added NaNs. Credit: Pascal Bourgault (@aulemahal) """ orig_calendar = ds.attrs.get("original_calendar", "standard") if orig_calendar in ["365_day", "360_day"]: msg = f"Converting calendar to {orig_calendar}." logging.info(msg) ds = xcal.convert_calendar(ds, "noleap") # drops Feb 29th if orig_calendar == "360_day": time = xcal.date_range_like(ds.time, calendar="360_day") ds = ds.where(~ds.time.dt.dayofyear.isin([31, 90, 151, 243, 304]), drop=True) if ds.time.size != time.size: raise ValueError("Conversion of dataset to 360_day calendar failed.") ds["time"] = time return ds
[docs] def cordex_aws_download( target_folder: str | Path, *, search: dict[str, str | list[str]], correct_times: bool = False, domain: str | None = None, ): """Download CORDEX interpolated grid for North America from Amazon S3.""" if intake is None: msg = f"{__name__} functions require additional dependencies. Please install them with `pip install miranda[remote]`." raise ImportError(msg) def _subset_preprocess(d: xr.Dataset, dom: list[float]) -> xr.Dataset: try: from clisops.core import subset_bbox n, w, s, e = subsetting_domains(dom) return subset_bbox(d, lon_bnds=[w, e], lat_bnds=[s, n]) except ModuleNotFoundError: msg = "This function requires the `clisops` library which is not installed. Domain subsetting step will be skipped." warnings.warn(msg, stacklevel=2) return d schema.Schema(_allowed_args).validate(search) # Define the catalog description file location. catalog_url = "https://ncar-na-cordex.s3-us-west-2.amazonaws.com/catalogs/aws-na-cordex.json" # Interpret the "na-cordex-models" column as a list of values. col = intake.open_esm_datastore( catalog_url, csv_kwargs={"converters": {"na-cordex-models": ast.literal_eval}}, ) col_subset = col.search(**search) additional_kwargs = dict() if domain: additional_kwargs["preprocess"] = functools.partial(_subset_preprocess, dom=domain) dsets = col_subset.to_dataset_dict( zarr_kwargs={"consolidated": True}, storage_options={"anon": True}, **additional_kwargs, ) msg = f"\nDataset dictionary keys:\n {dsets.keys()}" logging.info(msg) dds = list() for key in list(dsets.keys()): msg = f"Adding {key} to the search criteria." logging.info(msg) dds.append(dsets[key]) with ProgressBar(): for ds in dds: scen = ds.attrs["experiment_id"] grid = str(ds.attrs["intake_esm_dataset_key"]).split(".")[-2] bias_correction = str(ds.attrs["intake_esm_dataset_key"]).split(".")[-1] attrs_ref = ds.attrs.copy() for i, member in enumerate(ds.member_id): for var in ds.variables: if var in search["variable"]: var_out = var break new_attrs = dict() for key, vals in attrs_ref.items(): try: mapped = json.loads(vals) if isinstance(mapped, dict): new_attrs[key] = mapped.get(str(member.values), "") except JSONDecodeError: # noqa: PERF203 new_attrs[key] = vals except TypeError as err: if len(vals) == 1: new_attrs[key] = vals[0] else: raise RuntimeError("Attribute mapping failed.") from err if correct_times: try: ds = cordex_aws_calendar_correction(ds) except ValueError: msg = f"Calendar failed to convert for {member.values} and variable {var_out}. Skipping..." logging.warning(msg) continue years, datasets = zip(*ds.isel(member_id=i).groupby("time.year"), strict=False) for d in datasets: d.attrs.update(new_attrs) out_folder = target_folder.joinpath(f"{member.values}_{scen}") out_folder.mkdir(exist_ok=True) file_name_pattern = f"{var_out}_{member.values}_day_{scen}_{grid}_{bias_correction}" msg = f"Writing out files for {file_name_pattern}." logging.info() paths = [ out_folder.joinpath(f"{file_name_pattern}_{y}.nc") for y in years if not out_folder.joinpath(f"{file_name_pattern}_{y}.nc").exists() ] datasets = [d for y, d in zip(years, datasets, strict=False) if not out_folder.joinpath(f"{file_name_pattern}_{y}.nc").exists()] if len(datasets) == 0: msg = f"All files currently exist for {scen} and {member.name}. Continuing..." logging.warning(msg) continue msg = f"Final count of files: {len(datasets)}" logging.info(msg) xr.save_mfdataset(datasets, paths, engine="h5netcdf", format="NETCDF4_CLASSIC")