Source code for miranda.treatments._variables
from __future__ import annotations
import logging
import xarray as xr
import xclim.core.units
from xclim.core import units
from miranda.treatments.utils import (
_get_section_entry_key, # noqa
_iter_entry_key, # noqa
)
from miranda.units import check_time_frequency
logger = logging.getLogger("miranda.treatments.variables")
__all__ = [
"cf_units_conversion",
"clip_values",
"correct_unit_names",
"invert_value_sign",
"transform_values",
"variable_conversion",
]
[docs]
def correct_unit_names(d: xr.Dataset, p: str, m: dict) -> xr.Dataset:
"""Correct unit names."""
key = "_corrected_units"
for var, val in _iter_entry_key(d, m, "variables", key, p):
if val:
d[var].attrs["units"] = val
prev_history = d.attrs.get("history", "")
history = f"Corrected units name for variable `{var}` to `{val}`. {prev_history}"
d.attrs.update(dict(history=history))
return d
# for de-accumulation or conversion to flux
[docs]
def transform_values(d: xr.Dataset, p: str, m: dict) -> xr.Dataset:
"""Transform dataset values according to operation listed."""
key = "_transformation"
d_out = xr.Dataset(coords=d.coords, attrs=d.attrs)
converted = []
offset, offset_meaning = None, None
time_freq = dict()
expected_period = _get_section_entry_key(m, "dimensions", "time", "_ensure_correct_time", p)
if isinstance(expected_period, str):
time_freq["expected_period"] = expected_period
for vv, trans in _iter_entry_key(d, m, "variables", key, p):
if trans:
if trans == "deaccumulate":
# Time-step accumulated total to time-based flux (de-accumulation)
if offset is None and offset_meaning is None:
try:
offset, offset_meaning = check_time_frequency(d, **time_freq)
except TypeError:
logger.error("Unable to parse the time frequency. Verify data integrity before retrying.")
raise
msg = f"De-accumulating units for variable `{vv}`."
logger.info(msg)
with xr.set_options(keep_attrs=True):
out = d[vv].diff(dim="time")
out = d[vv].where(
getattr(d[vv].time.dt, offset_meaning) == offset[0],
out.broadcast_like(d[vv]),
)
out = units.amount2rate(out)
d_out[vv] = out
converted.append(vv)
elif trans == "amount2rate":
# NOTE: This treatment is no longer needed in xclim v0.43.0+ but is kept for backwards compatibility
# frequency-based totals to time-based flux
msg = f"Performing amount-to-rate units conversion for variable `{vv}`."
logger.info(msg)
with xr.set_options(keep_attrs=True):
out = units.amount2rate(d[vv])
d_out[vv] = out
converted.append(vv)
elif isinstance(trans, str):
if trans.startswith("op "):
op = trans[3]
value = trans[4:].strip()
if value.startswith("attrs"):
value = units.str2pint(d[vv].attrs[value[6:]])
else:
value = units.str2pint(value)
with xr.set_options(keep_attrs=True):
if op == "+":
value = units.convert_units_to(value, d[vv])
d_out[vv] = d[vv] + value
elif op == "-":
value = units.convert_units_to(value, d[vv])
d_out[vv] = d[vv] - value
elif op == "*":
d_out[vv] = units.pint_multiply(d[vv], value)
elif op == "/":
d_out[vv] = units.pint_multiply(d[vv], 1 / value)
else:
raise NotImplementedError(f"Op transform doesn't implement the «{op}» operator.")
converted.append(vv)
else:
raise NotImplementedError(f"Unknown transformation: {trans}")
elif trans is False:
msg = f"No transformations needed for `{vv}` (Explicitly set to False)."
logger.info(msg)
continue
prev_history = d.attrs.get("history", "")
history = f"Transformed variable `{vv}` values using method `{trans}`. {prev_history}"
d_out.attrs.update(dict(history=history))
# Copy unconverted variables
for vv in d.data_vars:
if vv not in converted:
d_out[vv] = d[vv]
return d_out
[docs]
def invert_value_sign(d: xr.Dataset, p: str, m: dict) -> xr.Dataset:
"""Flip value of DataArray."""
key = "_invert_sign"
d_out = xr.Dataset(coords=d.coords, attrs=d.attrs)
converted = []
for vv, inv_sign in _iter_entry_key(d, m, "variables", key, p):
if inv_sign:
msg = f"Inverting sign for `{vv}` (switching direction of values)."
logger.info(msg)
with xr.set_options(keep_attrs=True):
out = d[vv]
d_out[out.name] = -out
converted.append(vv)
elif inv_sign is False:
msg = f"No sign inversion needed for `{vv}` in `{p}` (Explicitly set to False)."
logger.info(msg)
continue
prev_history = d.attrs.get("history", "")
history = f"Inverted sign for variable `{vv}` (switched direction of values). {prev_history}"
d_out.attrs.update(dict(history=history))
# Copy unconverted variables
for vv in d.data_vars:
if vv not in converted:
d_out[vv] = d[vv]
return d_out
# For converting variable units to standard workflow units
[docs]
def cf_units_conversion(d: xr.Dataset, m: dict) -> xr.Dataset:
"""Perform pint-based units-conversion."""
if "time" in m["dimensions"].keys():
if m["dimensions"]["time"].get("units"):
d["time"]["units"] = m["dimensions"]["time"]["units"]
for vv, unit in _iter_entry_key(d, m, "variables", "units", None):
context = m["variables"][vv].get("_units_context", None)
if unit:
with xr.set_options(keep_attrs=True):
d[vv] = units.convert_units_to(d[vv], unit, context=context)
prev_history = d.attrs.get("history", "")
history = f"Converted variable `{vv}` to CF-compliant units (`{unit}`). {prev_history}"
d.attrs.update(dict(history=history))
return d
# For clipping variable values to an established maximum/minimum
[docs]
def clip_values(d: xr.Dataset, p: str, m: dict) -> xr.Dataset:
"""Clip values to an appropriate range,."""
key = "_clip_values"
d_out = xr.Dataset(coords=d.coords, attrs=d.attrs)
converted = []
for vv in d.data_vars:
if vv in m["variables"].keys():
clip_vals = _get_section_entry_key(m, "variables", vv, key, p)
if clip_vals:
min_value, max_value = None, None
# Gather unit conversion context, if applicable
context = clip_vals.get("context", None)
for op, value in clip_vals.items():
if op == "min":
min_value = xclim.core.units.convert_units_to(value, d[vv], context)
if op == "max":
max_value = xclim.core.units.convert_units_to(value, d[vv], context)
msg = f"Clipping min/max values for `{vv}` ({min_value}/{max_value})."
logger.info(msg)
with xr.set_options(keep_attrs=True):
out = d[vv]
d_out[out.name] = out.clip(min_value, max_value)
converted.append(vv)
elif clip_values is False:
msg = f"No clipping of values needed for `{vv}` in `{p}` (Explicitly set to False)."
logger.info(msg)
continue
else:
msg = f"Unknown clipping values for `{vv}` in `{p}`."
logger.info(msg)
continue
prev_history = d.attrs.get("history", "")
history = f"Clipped variable `{vv}` with `min={min_value}` and `max={max_value}`. {prev_history}"
d_out.attrs.update(dict(history=history))
# Copy unconverted variables
for vv in d.data_vars:
if vv not in converted:
d_out[vv] = d[vv]
return d_out
# For renaming and reordering lat and lon dims
[docs]
def variable_conversion(d: xr.Dataset, p: str | None, m: dict) -> xr.Dataset:
"""
Add variable metadata and remove nonstandard entries.
Parameters
----------
d : xarray.Dataset
Dataset with variable(s) to be updated.
p : str
Dataset project name.
m : dict
Metadata definition dictionary for project and variable(s).
Returns
-------
xarray.Dataset
"""
var_descriptions = m["variables"]
var_correction_fields = [
"_clip_values",
"_corrected_units",
"_invert_sign",
"_offset_time",
"_transformation",
"_use_snapshot",
]
for var in d.variables:
if var in var_descriptions.keys():
for field in var_correction_fields:
if field in var_descriptions[var].keys():
del var_descriptions[var][field]
d[var].attrs.update(var_descriptions[var])
# Rename data variables
for orig_var_name, cf_name in _iter_entry_key(d, m, "variables", "_cf_variable_name", p):
if cf_name is not None:
d = d.rename({orig_var_name: cf_name})
d[cf_name].attrs.update(dict(original_variable=orig_var_name))
del d[cf_name].attrs["_cf_variable_name"]
return d