from __future__ import annotations
import importlib.util as ilu
import logging
import multiprocessing as mp
import os
import re
import warnings
from functools import partial
from os import PathLike
from pathlib import Path
from types import GeneratorType
import h5netcdf
import pandas as pd
import schema
import xarray as xr
import zarr
from pandas._libs.tslibs import NaTType # noqa
from miranda.convert.utils import date_parser, find_version_hash # noqa
from miranda.cv import VALIDATION_ENABLED
from miranda.units import check_time_frequency
from ._time import TIME_UNITS_TO_FREQUENCY, TIME_UNITS_TO_TIMEDELTA, DecoderError
HAS_NETCDF4 = bool(ilu.find_spec("netCDF4"))
if VALIDATION_ENABLED:
from miranda.cv import INSTITUTIONS, PROJECT_MODELS
from miranda.validate import FACETS_SCHEMA # noqa
logger = logging.getLogger("miranda.decode.decoder")
__all__ = [
"Decoder",
"guess_project",
]
[docs]
def guess_project(file: os.PathLike | str) -> str:
"""
Guess the name of the project
Parameters
----------
file : str or os.PathLike
Returns
-------
str
"""
file_name = Path(file).stem
potential_names = file_name.split("_")
if VALIDATION_ENABLED:
for project, models in PROJECT_MODELS.items():
if any([model in potential_names for model in models]):
return project
raise DecoderError(f"Unable to determine project from file name: '{file_name}'.")
raise DecoderError("Project determination requires pyessv-archive source files.")
[docs]
class Decoder:
project = None
guess = False
_file_facets = dict()
def __init__(self, project: str | None):
self.project = project
@staticmethod
def _decoder(
d: dict,
fail_early: bool,
proj: str,
guess: bool,
lock: mp.Lock,
file: str | Path,
) -> None:
if proj is None:
if guess:
try:
proj = guess_project(file)
except DecoderError:
print(f"Unable to determine 'activity': Signature for 'activity' must be set manually for file: {file}.")
if fail_early:
raise
else:
proj = "converted"
decode_function_name = f"decode_{proj.lower().replace('-', '_')}"
try:
with lock:
_deciphered = getattr(Decoder, decode_function_name)(Path(file))
if fail_early:
if VALIDATION_ENABLED:
FACETS_SCHEMA.validate(_deciphered)
else:
print("Validation requires pyessv-archive source files. Skipping validation checks.")
print(f"Deciphered the following from {Path(file).name}:\n{_deciphered.items()}")
d[file] = _deciphered
except (AttributeError, NotImplementedError):
print(f"Unable to read data from {Path(file)}. Ensure pathname is correct.")
raise
except schema.SchemaError as e:
print(f"Decoded facets from {Path(file).name} are not valid: {e}")
[docs]
def decode(
self,
files: os.PathLike | str | list[str | os.PathLike] | GeneratorType,
chunks: int | None = None,
raise_error: bool = False,
) -> None:
"""
Decode facets from file or list of files.
Parameters
----------
files : str or Path or list of str or Path or generator
The files to decode.
chunks : int, optional
The chunk size used when processing files. Not to be confused with xarray chunks for dimensions.
raise_error : bool
Whether to raise an error if a file cannot be decoded.
"""
if isinstance(files, (str, os.PathLike)):
files = [files]
if chunks is None and isinstance(files, list):
if len(files) >= 10:
chunk_size = 10
elif 1 <= len(files) < 10:
chunk_size = len(files)
else:
raise ValueError("No file entries found.")
elif isinstance(files, GeneratorType):
chunk_size = 10
else:
chunk_size = chunks
if self.project is None:
warnings.warn("The decoder 'project' is not set; Decoding step will be much slower.", stacklevel=2)
else:
msg = f"Deciphering metadata with project = '{self.project}'"
logger.info(msg)
with mp.Manager() as manager:
_file_facets = manager.dict()
lock = manager.Lock()
func = partial(self._decoder, _file_facets, raise_error, self.project, self.guess, lock)
with mp.Pool() as pool:
pool.imap(func, files, chunksize=chunk_size)
pool.close()
pool.join()
self._file_facets.update(_file_facets)
[docs]
def facets_table(self):
raise NotImplementedError()
[docs]
def file_facets(self) -> dict[os.PathLike, dict]:
return self._file_facets
@classmethod
def _from_dataset(cls, file: Path | str) -> tuple[str, str, dict]:
file_path = Path(file)
try:
variable_name = str(cls._decode_primary_variable(file_path))
except DecoderError:
msg = f"Unable to open dataset: {file_path.name}"
logger.error(msg)
raise
datetimes = file_path.name.split("_")[-1]
data = {}
if file_path.is_file() and file_path.suffix in [".nc", ".nc4"]:
if HAS_NETCDF4:
import netCDF4
with netCDF4.Dataset(file, mode="r") as ds:
for k in ds.ncattrs():
data[k] = getattr(ds, k)
else:
with h5netcdf.File(file, mode="r") as ds:
for k in ds.attrs:
data[k] = ds.attrs[k]
elif file_path.is_dir() and file_path.suffix == ".zarr":
with zarr.open_group(store=file_path, mode="r") as ds:
data.update(ds.attrs.asdict())
else:
raise DecoderError(f"Unable to read dataset: `{file_path.name}`.")
return variable_name, datetimes, data
@staticmethod
def _decode_primary_variable(file: Path) -> str | None:
"""
Attempts to find the primary variable of a netCDF
Parameters
----------
file: Path
Returns
-------
str
"""
dimsvar_dict = {}
coords = (
"height",
"lat",
"latitude",
"lev",
"level",
"lon",
"longitude",
"rlat",
"rlon",
"rotated_pole",
"time",
)
try:
if file.is_file() and file.suffix in [".nc", ".nc4"]:
if HAS_NETCDF4:
import netCDF4
with netCDF4.Dataset(file, mode="r") as ds:
for var_name, var_attrs in ds.variables.items():
dimsvar_dict[var_name] = {k: var_attrs.getncattr(k) for k in var_attrs.ncattrs()}
else:
with h5netcdf.File(file, mode="r") as ds:
for var_name, var_attrs in ds.variables.items():
dimsvar_dict[var_name] = {k: var_attrs.attrs[k] for k in var_attrs.attrs}
for k in dimsvar_dict.keys():
if not str(k).startswith(coords) and k in file.stem:
return str(k)
elif file.is_dir() and file.suffix == ".zarr":
with zarr.open_group(file, mode="r") as ds:
for k in ds.array_keys():
if not str(k).startswith(coords) and k in file.stem:
return str(k)
else:
msg = "File format is not supported."
raise NotImplementedError(msg)
except ValueError as err:
msg = f"Unable to open dataset: {file.name}"
raise DecoderError(msg) from err
@staticmethod
def _decode_hour_of_day_info(
file: PathLike | str,
) -> dict:
"""
Decode hour of day information.
Parameters
----------
file : Path or str
Returns
-------
dict
"""
file_path = Path(file)
if file_path.is_file() and file_path.suffix in [".nc", ".nc4"]:
if HAS_NETCDF4:
import netCDF4
with netCDF4.Dataset(file, mode="r") as ds:
if "time" in ds.variables.keys():
hour = netCDF4.num2date(ds["time"][0], ds["time"].units, ds["time"].calendar).hour
else:
hour = None
return dict(hour_of_day=hour)
else:
warnings.warn("This is not currently implemented for h5netcdf. Install netCDF4.", stacklevel=2)
return dict()
elif file_path.is_dir() and file_path.suffix == ".zarr":
warnings.warn("This is not currently implemented for zarr. Install netCDF4.", stacklevel=2)
return dict()
else:
raise NotImplementedError()
@staticmethod
def _decode_time_info( # noqa: C901
file: PathLike | str | list[str] | None = None,
data: dict | None = None,
term: str | None = None,
*,
field: str | None = None,
) -> str | pd.Timedelta | NaTType:
"""
Decode time information.
Parameters
----------
file : os.PathLike or str, optional
data : dict, optional
term : str
field : {"timedelta", "frequency"}
Returns
-------
str or pd.Timedelta or NaTType
"""
if not file and not data and not term:
raise ValueError("Nothing passed to parse time info from.")
if field == "frequency":
time_dictionary = TIME_UNITS_TO_FREQUENCY
elif field == "timedelta":
time_dictionary = TIME_UNITS_TO_TIMEDELTA
else:
raise NotImplementedError()
if term:
if term in ["fx", "fixed"]:
if field == "timedelta":
return pd.NaT
return "fx"
return pd.to_timedelta(time_dictionary[term])
if data and not file:
potential_time = data.get("frequency")
if not potential_time:
if hasattr(data, "time"):
time_units = data["time"].units
potential_time = time_units.split()[0]
else:
msg = f"Could not find `frequency` or `time` for {Path(file).name}. Assuming `fx`."
logger.warning(msg)
potential_time = "fx"
if potential_time in ["ymon", "yseas", "fixed", "fx"]:
msg = f"Found `{potential_time}`. Frequency is likely `fx`."
logger.warning(msg)
if field == "frequency":
return "fx"
if field == "timedelta":
return pd.NaT
raise ValueError()
if field == "timedelta":
if potential_time in ["fx", "fixed"]:
return pd.NaT
return pd.to_timedelta(time_dictionary[potential_time])
return time_dictionary[potential_time]
if file and not data:
for delimiter in ["_", "."]:
file_parts = Path(file).stem.split(delimiter)
potential_times = [segment for segment in file_parts if segment in time_dictionary.keys()]
if potential_times:
if potential_times[0] in ["fx", "fixed"]:
if field == "frequency":
return "fx"
if field == "timedelta":
return pd.NaT
raise ValueError(f"Field `{field}` not supported.")
if field == "timedelta":
return pd.to_timedelta(time_dictionary[potential_times[0]])
return time_dictionary[potential_times[0]]
if file and data:
for delimiter in ["_", "."]:
file_parts = Path(file).stem.split(delimiter)
potential_times = [segment for segment in file_parts if segment in time_dictionary.keys()]
potential_time = data.get("frequency", "")
if potential_time == "":
if hasattr(data, "time"):
time_units = data["time"].units
potential_time = time_units.split()[0]
else:
msg = f"Could not find `frequency` or `time` for {Path(file).name}. Assuming `fx`."
logger.warning(msg)
potential_time = "fx"
if potential_time in ["ymon", "yseas", "fixed", "fx"]:
msg = f"Found `{potential_time}`. Frequency is likely `fx`."
logger.warning(msg)
if "fx" in file_parts or "fixed" in file_parts:
if field == "frequency":
return "fx"
if field == "timedelta":
return pd.NaT
raise ValueError(f"Field `{field}` not supported.")
if potential_time in potential_times:
return time_dictionary[potential_time]
elif potential_times:
break
msg = (
f"Frequency from metadata (`{potential_time}`) not found in filename (`{Path(file).name}`): "
"Performing more rigorous frequency checks."
)
logger.warning(msg)
if Path(file).is_file() and Path(file).suffix in [".nc", ".nc4"]:
engine = "h5netcdf"
elif Path(file).is_dir() and Path(file).suffix == ".zarr":
engine = "zarr"
else:
raise DecoderError(f"File is not valid netcdf or zarr: {Path(file).name}")
_ds = xr.open_dataset(
file,
engine=engine,
drop_variables="time_bnds",
)
if not hasattr(_ds, "time"):
logger.warning("Dataset does not contain time array. Assuming fixed variable.")
if field == "frequency":
return "fx"
if field == "timedelta":
return pd.NaT
raise ValueError(f"Field `{field}` not supported.")
else:
_, found_freq = check_time_frequency(_ds.time)
if found_freq in potential_times:
msg = (
"Time frequency found in dataset on analysis was found in filename. "
f"Metadata for `{Path(file).name} is probably incorrect. "
f"Basing fields on `{found_freq}`."
)
logger.warning(msg)
return time_dictionary[found_freq]
elif found_freq in ["month", "mon"]:
for f in ["Amon", "Omon", "monC", "monthly", "months", "mon"]:
if f in potential_times:
msg = f"Month-like time frequency found in dataset on analysis was found in filename. Basing fields on `{f}`."
logger.warning(msg)
return time_dictionary[f]
else:
msg = f"Time frequency found in dataset on analysis was not found in filename. Basing fields on `{found_freq}`."
logger.warning(msg)
return time_dictionary[found_freq]
raise DecoderError(f"Time frequency indiscernible for file `{file}`.")
@staticmethod
def _decode_version(file: PathLike | str, data: dict) -> dict:
"""
Decode version information.
Parameters
----------
file : os.PathLike or str
data : dict
Returns
-------
dict
"""
version_info = dict()
try:
version_info["version"] = data["version"]
except KeyError:
possible_version = Path(file).parent
if re.match(r"^[vV]\d+", possible_version.name):
version_info["version"] = possible_version.name
else:
possible_version_signature = possible_version.glob(f"{Path(file).stem}.v*")
for sig in possible_version_signature:
found_version = re.match(r"([vV]\d+)$", sig.suffix)
if found_version:
version_info["version"] = found_version.group()
version_info["sha256sum"] = sig.open().read()
break
else:
version_info["version"] = "vNotFound"
return version_info
[docs]
@classmethod
def decode_converted(cls, file: PathLike | str) -> dict:
"""
Decode converted data.
Parameters
----------
file : os.PathLike or str
Returns
-------
dict
"""
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return facets
facets.update(data)
del facets["history"]
facets["date"] = date
file_format = data.get("output_format")
if file_format:
facets["format"] = file_format
elif "format" in data:
facets["format"] = data["format"]
elif Path(file).suffix in [".nc", ".nc4"]:
facets["format"] = "nc"
elif Path(file).suffix in [".zarr"]:
facets["format"] = "zarr"
facets["variable"] = variable
facets.update(cls._decode_version(data=data, file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
if "frequency" not in facets:
facets["timedelta"] = cls._decode_time_info(data=data, file=file, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
return facets
[docs]
@staticmethod
def decode_eccc_obs(self, file: PathLike | str) -> dict:
raise NotImplementedError()
[docs]
@staticmethod
def decode_ahccd_obs(self, file: PathLike | str) -> dict:
raise NotImplementedError()
[docs]
@staticmethod
def decode_melcc_obs(self, file: PathLike | str) -> dict:
raise NotImplementedError()
[docs]
@classmethod
def decode_pcic_candcs_u6(cls, file: PathLike | str) -> dict:
if "Derived" in Path(file).parents:
raise NotImplementedError("Derived CanDCS-U6 variables are not supported.")
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return facets
facets["activity"] = data["activity_id"]
facets["mip_era"] = data["project_id"]
facets["bias_adjust_institution"] = "PCIC"
facets["date"] = date
facets["domain"] = data["domain"]
facets["experiment"] = str(data["GCM__experiment_id"]).replace(",", "-")
facets["format"] = "netcdf"
facets["institution"] = data["GCM__institution_id"]
facets["member"] = (
f"r{data['GCM__realization_index']}i{data['GCM__initialization_index']}p{data['GCM__physics_index']}f{data['GCM__forcing_index']}"
)
facets["processing_level"] = "biasadjusted"
facets["bias_adjust_project"] = "CanDCS-U6"
facets["source"] = data["GCM__source_id"]
facets["type"] = "simulation"
facets["variable"] = variable
facets["version"] = f"v{data.get('GCM__data_specs_version')}"
if facets["version"] is None:
facets.update(find_version_hash(file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
facets["frequency"] = cls._decode_time_info(data=data, file=file, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
return facets
[docs]
@classmethod
def decode_cmip6(cls, file: PathLike | str) -> dict:
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return facets
facets["activity"] = data["activity_id"]
facets["date"] = date
facets["domain"] = "global"
facets["experiment"] = data["experiment_id"]
facets["format"] = "netcdf"
facets["grid_label"] = data["grid_label"]
facets["institution"] = data["institution_id"]
facets["member"] = data["variant_label"]
facets["modeling_realm"] = data["realm"]
facets["processing_level"] = "raw"
facets["mip_era"] = data["mip_era"]
facets["source"] = data["source_id"]
facets["type"] = "simulation"
facets["variable"] = variable
facets.update(cls._decode_version(data=data, file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
facets["frequency"] = cls._decode_time_info(data=data, file=file, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
return facets
[docs]
@classmethod
def decode_cmip5(cls, file: PathLike | str) -> dict:
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return facets
facets["activity"] = "CMIP"
facets["date"] = date
facets["domain"] = "global"
facets["experiment"] = data["experiment_id"]
facets["format"] = "netcdf"
facets["institution"] = data["institute_id"]
facets["member"] = data["parent_experiment_rip"]
facets["modeling_realm"] = data["modeling_realm"]
facets["processing_level"] = "raw"
facets["mip_era"] = data["project_id"]
facets["source"] = data["model_id"]
facets["type"] = "simulation"
facets["variable"] = variable
facets.update(cls._decode_version(data=data, file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
facets["frequency"] = cls._decode_time_info(data=data, file=file, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
return facets
[docs]
@classmethod
def decode_cordex(cls, file: PathLike | str) -> dict:
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return dict()
# FIXME: What to do about our internal data that breaks all established conventions?
facets["activity"] = "CORDEX"
if data.get("project_id") == "" or data.get("project_id") is None:
facets["mip_era"] = "internal"
elif data.get("project_id") == "CORDEX":
facets["mip_era"] = "CMIP5"
if date == "r0i0p0":
facets["date"] = "fx"
else:
facets["date"] = date
domain = data.get("CORDEX_domain")
if domain:
facets["domain"] = domain.strip()
else:
domain = data.get("ouranos_domain_name")
if domain:
facets["domain"] = domain.strip()
else:
msg = f"File {Path(file).name} has a nonstandard domain name."
logger.error(msg)
raise NotImplementedError(msg)
# CORDEX-NAM on AWS mis-attributes the domain (22/44 should be 22i/44i)
aws_keys = data.get("intake_esm_dataset_key")
if aws_keys:
facets["domain"] = aws_keys.split(".")[3]
title = data.get("title")
if title:
regridded_domain_found = re.search(r"\w{3}-\d{2}i", title)
if regridded_domain_found:
facets["domain"] = regridded_domain_found.group()
# The logic here is awful, but the information is bad to begin with.
driving_model = ""
driving_institution = ""
driving_institution_parts = str(data["driving_model_id"]).split("-")
if VALIDATION_ENABLED:
if driving_institution_parts[0] in INSTITUTIONS:
driving_institution = driving_institution_parts[0]
elif "-".join(driving_institution_parts[:2]) in INSTITUTIONS:
driving_institution = "-".join(driving_institution_parts[:2])
elif "-".join(driving_institution_parts[:3]) in INSTITUTIONS:
driving_institution = "-".join(driving_institution_parts[:3])
else:
logger.warning("CORDEX Metadata validation checks require PyESSV. Driving institution cannot be determined.")
driving_model = data["driving_model_id"]
if data["driving_model_id"].startswith("GFDL"):
driving_institution = "NOAA-GFDL"
driving_model = f"NOAA-GFDL-{data['driving_model_id']}"
elif data["driving_model_id"].startswith("MPI-ESM"):
driving_institution = "MPI-M"
driving_model = f"MPI-M-{data['driving_model_id']}"
elif data["driving_model_id"].startswith("HadGEM2"):
driving_institution = "MOHC"
driving_model = f"MOHC-{data['driving_model_id']}"
elif data["driving_model_id"].startswith("CNRM-CM5"):
driving_institution = "CNRM-CERFACS"
driving_model = f"CNRM-CERFACS-{data['driving_model_id']}"
elif VALIDATION_ENABLED and not driving_institution:
raise DecoderError(f"driving_institution (from driving_model_id: `{data['driving_model_id']}`) is not valid.")
facets["driving_institution"] = driving_institution.strip()
if driving_model:
facets["driving_model"] = driving_model.strip()
else:
facets["driving_model"] = str(data["driving_model_id"]).strip()
facets["format"] = "netcdf"
if data["institute_id"].strip() == "Our.":
facets["institution"] = "Ouranos"
else:
facets["institution"] = data["institute_id"].strip()
facets["processing_level"] = "raw"
facets["source"] = data["model_id"]
facets["type"] = "simulation"
facets["variable"] = variable
facets.update(cls._decode_version(data=data, file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
facets["frequency"] = cls._decode_time_info(data=data, file=file, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
try:
facets["experiment"] = data["experiment_id"].strip()
except KeyError:
facets["experiment"] = data["driving_experiment_name"].strip()
try:
for potential_member in ["parent_experiment_rip", "parent_experiment"]:
facets["member"] = data.get(potential_member)
if facets["member"] == "N/A":
raise KeyError()
else:
break
if facets["member"] is None:
raise KeyError()
except KeyError:
facets["member"] = data["driving_model_ensemble_member"].strip()
return facets
[docs]
@classmethod
def decode_isimip_ft(cls, file: PathLike | str) -> dict:
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return facets
facets["activity"] = "ISIMIP"
facets["mip_era"] = data["project_id"]
facets["date"] = date
facets["domain"] = "global"
facets["co2_forcing_id"] = data["co2_forcing_id"]
facets["experiment"] = data["experiment_id"]
facets["format"] = "netcdf"
facets["impact_model"] = data["impact_model_id"]
facets["institution"] = data["institute_id"]
facets["member"] = data["driving_model_ensemble_member"]
facets["modeling_realm"] = data["modeling_realm"]
facets["social_forcing_id"] = data["social_forcing_id"]
facets["source"] = data["model_id"]
facets["type"] = "simulation"
facets["variable"] = variable
facets.update(cls._decode_version(data=data, file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
facets["frequency"] = cls._decode_time_info(data=data, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
return facets
[docs]
@classmethod
def decode_nex_gddp_cmip6(cls, file: PathLike | str) -> dict:
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return facets
facets["experiment"] = data["scenario"]
facets["activity"] = "CMIP" if facets["experiment"] == "historical" else "ScenarioMIP"
facets["institution"] = data["cmip6_institution_id"]
facets["member"] = data["variant_label"]
facets["processing_level"] = "biasadjusted"
facets["bias_adjust_project"] = "NEX-GDDP-CMIP6"
facets["bias_adjust_institution"] = "NASA"
facets["mip_era"] = "CMIP6"
facets["source"] = data["cmip6_source_id"]
facets["type"] = "simulation"
facets["variable"] = variable
facets.update(cls._decode_version(data=data, file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
facets["frequency"] = cls._decode_time_info(data=data, file=file, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
return facets
[docs]
@classmethod
def decode_espo_g6_r2(cls, file: PathLike | str) -> dict:
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return facets
facets["bias_adjust_project"] = "ESPO-G6-R2"
facets["processing_level"] = "biasadjusted"
facets["version"] = "1.0.0"
facets["domain"] = "NAM"
for f in [
"experiment",
"activity",
"institution",
"member",
"bias_adjust_institution",
"mip_era",
"source",
"type",
]:
facets[f] = data[f"cat:{f}"]
facets["variable"] = variable
# facets.update(cls._decode_version(data=data, file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
facets["frequency"] = cls._decode_time_info(data=data, file=file, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
return facets
[docs]
@classmethod
def decode_espo_g6_e5l(cls, file: PathLike | str) -> dict:
facets = dict()
try:
variable, date, data = cls._from_dataset(file=file)
except DecoderError:
return facets
facets["bias_adjust_project"] = "ESPO-G6-E5L"
facets["processing_level"] = "biasadjusted"
facets["version"] = "1.0.0"
facets["domain"] = "NAM"
for f in [
"experiment",
"activity",
"institution",
"member",
"bias_adjust_institution",
"mip_era",
"source",
"type",
]:
facets[f] = data[f"cat:{f}"]
facets["variable"] = variable
# facets.update(cls._decode_version(data=data, file=file))
facets.update(cls._decode_hour_of_day_info(file=file))
try:
facets["frequency"] = cls._decode_time_info(data=data, file=file, field="frequency")
facets["timedelta"] = cls._decode_time_info(term=facets["frequency"], field="timedelta")
facets["date_start"] = date_parser(date)
facets["date_end"] = date_parser(date, end_of_period=True)
except DecoderError: # noqa: S110
pass
return facets