Source code for miranda.ncar._aws_cordex

from __future__ import annotations

import ast
import functools
import json
import logging.config
import warnings
from json.decoder import JSONDecodeError
from pathlib import Path

import schema
import xarray as xr
from dask.diagnostics import ProgressBar
from xclim.core import calendar as xcal  # noqa

from miranda.gis import subsetting_domains
from miranda.scripting import LOGGING_CONFIG

logging.config.dictConfig(LOGGING_CONFIG)

try:
    import intake  # noqa
    import intake_esm  # noqa
    import numcodecs  # noqa
    import s3fs  # noqa
except ImportError:
    warnings.warn(
        f"{__name__} functions require additional dependencies. "
        "Please install them with `pip install miranda[remote]`."
    )


__all__ = [
    "cordex_aws_calendar_correction",
    "cordex_aws_download",
]

_allowed_args = schema.Schema(
    {
        schema.Optional("variable"): schema.Schema(
            schema.Or(
                [
                    "hurs",
                    "huss",
                    "pr",
                    "prec",
                    "ps",
                    "rsds",
                    "sfcWind",
                    "tas",
                    "tasmax",
                    "tasmin",
                    "temp",
                    "tmax",
                    "tmin",
                    "uas",
                    "vas",
                ]
            )
        ),
        schema.Optional("frequency"): "day",
        schema.Optional("scenario"): schema.Schema(
            schema.Or(["eval", "hist", "rcp45", "rcp85", "hist-rcp45", "hist-rcp85"])
        ),
        schema.Optional("grid"): schema.Or(["NAM-22i", "NAM-44i"]),
        schema.Optional("bias_correction"): schema.Or(
            ["raw", "mbcn-Daymet", "mbcn-gridMET"]
        ),
    }
)


[docs] def cordex_aws_calendar_correction(ds) -> xr.Dataset | None: """AWS-stored CORDEX datasets are all on the same standard calendar, this converts the data back to the original calendar, removing added NaNs. Credit: Pascal Bourgault (@aulemahal) """ orig_calendar = ds.attrs.get("original_calendar", "standard") if orig_calendar in ["365_day", "360_day"]: logging.info(f"Converting calendar to {orig_calendar}") ds = xcal.convert_calendar(ds, "noleap") # drops Feb 29th if orig_calendar == "360_day": time = xcal.date_range_like(ds.time, calendar="360_day") ds = ds.where( ~ds.time.dt.dayofyear.isin([31, 90, 151, 243, 304]), drop=True ) if ds.time.size != time.size: raise ValueError("Conversion of dataset to 360_day calendar failed.") ds["time"] = time return ds
[docs] def cordex_aws_download( target_folder: str | Path, *, search: dict[str, str | list[str]], correct_times: bool = False, domain: str | None = None, ): """Download CORDEX interpolated grid for North America from Amazon S3.""" def _subset_preprocess(d: xr.Dataset, dom: list[float]) -> xr.Dataset: try: from clisops.core import subset_bbox n, w, s, e = subsetting_domains(dom) return subset_bbox(d, lon_bnds=[w, e], lat_bnds=[s, n]) except ModuleNotFoundError: log_msg = ( "This function requires the `clisops` library which is not installed. " "Domain subsetting step will be skipped." ) warnings.warn(log_msg) return d schema.Schema(_allowed_args).validate(search) # Define the catalog description file location. catalog_url = ( "https://ncar-na-cordex.s3-us-west-2.amazonaws.com/catalogs/aws-na-cordex.json" ) # Interpret the "na-cordex-models" column as a list of values. col = intake.open_esm_datastore( catalog_url, csv_kwargs={"converters": {"na-cordex-models": ast.literal_eval}}, ) col_subset = col.search(**search) additional_kwargs = dict() if domain: additional_kwargs["preprocess"] = functools.partial( _subset_preprocess, dom=domain ) dsets = col_subset.to_dataset_dict( zarr_kwargs={"consolidated": True}, storage_options={"anon": True}, **additional_kwargs, ) logging.info(f"\nDataset dictionary keys:\n {dsets.keys()}") dds = list() for key in list(dsets.keys()): logging.info(f"Adding {key} to the search criteria.") dds.append(dsets[key]) with ProgressBar(): for ds in dds: scen = ds.attrs["experiment_id"] grid = str(ds.attrs["intake_esm_dataset_key"]).split(".")[-2] bias_correction = str(ds.attrs["intake_esm_dataset_key"]).split(".")[-1] attrs_ref = ds.attrs.copy() for i, member in enumerate(ds.member_id): for var in ds.variables: if var in search["variable"]: var_out = var break new_attrs = dict() for key, vals in attrs_ref.items(): try: mapped = json.loads(vals) if isinstance(mapped, dict): new_attrs[key] = mapped.get(str(member.values), "") except JSONDecodeError: new_attrs[key] = vals except TypeError: if len(vals) == 1: new_attrs[key] = vals[0] else: raise RuntimeError() if correct_times: try: ds = cordex_aws_calendar_correction(ds) except ValueError: logging.warning( f"Calendar failed to convert for {member.values} and variable {var_out}. Skipping..." ) continue years, datasets = zip(*ds.isel(member_id=i).groupby("time.year")) for d in datasets: d.attrs.update(new_attrs) out_folder = target_folder.joinpath(f"{member.values}_{scen}") out_folder.mkdir(exist_ok=True) file_name_pattern = ( f"{var_out}_{member.values}_day_{scen}_{grid}_{bias_correction}" ) logging.info(f"Writing out files for {file_name_pattern}.") paths = [ out_folder.joinpath(f"{file_name_pattern}_{y}.nc") for y in years if not out_folder.joinpath(f"{file_name_pattern}_{y}.nc").exists() ] datasets = [ d for y, d in zip(years, datasets) if not out_folder.joinpath(f"{file_name_pattern}_{y}.nc").exists() ] if len(datasets) == 0: logging.warning( f"All files currently exist for {scen} and {member.name}. Continuing..." ) continue logging.info(f"Final count of files: {len(datasets)}") xr.save_mfdataset( datasets, paths, engine="h5netcdf", format="NETCDF4_CLASSIC" )