from __future__ import annotations
import json
import logging.config
import os
import shutil
import time
from collections.abc import Hashable, Sequence
from pathlib import Path
import xarray as xr
from xarray.core.utils import Frozen
from miranda.scripting import LOGGING_CONFIG
from ._input import discover_data
from .utils import delayed_write, get_global_attrs, get_time_attrs, sort_variables
logging.config.dictConfig(LOGGING_CONFIG)
__all__ = [
"fetch_chunk_config",
"prepare_chunks_for_ds",
"rechunk_files",
"translate_time_chunk",
]
_data_folder = Path(__file__).parent / "data"
chunk_configurations = json.load(open(_data_folder / "ouranos_chunk_config.json"))
[docs]
def prepare_chunks_for_ds(
ds: xr.Dataset, chunks: dict[str, str | int]
) -> dict[str, int]:
"""Prepare the chunks to be used to write Dataset.
This includes translating the time chunks, making sure chunks are not too small, and removing -1.
Parameters
----------
ds : xr.Dataset
Dataset that we want to write with the chunks.
chunks : dict
Desired chunks in human-readable format (with "4 years" and -1).
Returns
-------
dict
Chunks in a format that is ready to be used to write to disk.
"""
# check if any of the chunks are strings and need to be translated
if any(isinstance(x, str) for x in chunks.values()):
calendar, size = get_time_attrs(ds)
chunks = translate_time_chunk(chunks, calendar, size)
# replace -1 that netcdf can't handle and cut chunks that are too big for the dim
for dim, c in chunks.copy().items():
length = ds.dims[dim]
if c == -1 or c > length:
logging.info(f"Chunk was changed from {c} to {length} for dimension {dim}.")
chunks[dim] = length
return chunks
# Shamelessly stolen and modified from xscen
[docs]
def translate_time_chunk(chunks: dict, calendar: str, timesize: int) -> dict:
"""Translate chunk specification for time into a number.
Notes
-----
-1 translates to `timesize`
'Nyear' translates to N times the number of days in a year of calendar `calendar`.
"""
for k, v in chunks.items():
if isinstance(v, dict):
chunks[k] = translate_time_chunk(v.copy(), calendar, timesize)
elif k == "time" and v is not None:
if isinstance(v, str) and v.endswith("years"):
n = int(str(chunks["time"]).strip("years").strip())
Nt = n * {"noleap": 365, "360_day": 360, "all_leap": 366}.get(
calendar, 365.25
)
chunks[k] = int(Nt)
elif v == -1:
chunks[k] = timesize
return chunks
[docs]
def fetch_chunk_config(
priority: str,
freq: str,
dims: Sequence[str] | dict[str, int] | Frozen | tuple[Hashable],
default_config: dict = chunk_configurations,
) -> dict[str, int]:
"""
Parameters
----------
priority : {"time", "files"}
Specifies whether the chunking regime should prioritize file granularity ("files") or time series ("time").
freq : {"1hr", "day", "month"}
The time frequency of the input data.
dims : sequence of str
The dimension names that will be used for chunking.
default_config : dict
The dictionary to use for determining the chunking configuration.
Returns
-------
dict[str, int]
"""
if isinstance(dims, (dict, Frozen)):
dims = {k for k in dims.keys()}
if priority in default_config:
if freq in default_config[priority]:
for config in default_config[priority][freq].keys():
if set(default_config[priority][freq][config]).issubset(dims):
return default_config[priority][freq][config]
raise KeyError(f"Chunk dimensions `{dims}` configuration not found.")
raise KeyError(f"Frequency `{freq}` configuration not found.")
raise KeyError(f"Priority regime `{priority}` configuration not found.")
[docs]
def rechunk_files(
input_folder: str | os.PathLike,
output_folder: str | os.PathLike,
project: str | None = None,
time_step: str | None = None,
chunking_priority: str = "auto",
target_chunks: dict[str, int] | None = None,
variables: Sequence[str] | None = None,
suffix: str = "nc",
output_format: str = "netcdf",
overwrite: bool = False,
) -> None:
"""Rechunks dataset for better loading/reading performance.
Warnings
--------
Globbing assumes that target datasets to be rechunked have been saved in NetCDF format.
File naming requires the following order of facets: `{variable}_{time_step}_{institute}_{project}_reanalysis_*.nc`.
Chunking dimensions are assumed to be CF-Compliant (`lat`, `lon`, `rlat`, `rlon`, `time`).
Parameters
----------
input_folder : str or os.PathLike
Folder to be examined. Performs globbing.
output_folder : str or os.PathLike
Target folder.
project : str, optional
Supported projects. Used for determining chunk dictionary. Superseded if `target_chunks` is set.
time_step : {"1hr", "day"}, optional
Time step of the input data. Parsed from dataset attrs if not set. Superseded if `target_chunks` is set.
chunking_priority: {"time", "files", "auto"}
The chunking regime to use. Default: "auto".
target_chunks : dict, optional
Must include "time", optionally "lat" and "lon", depending on dataset structure.
variables : Sequence[str], optional
If no variables set, will attempt to process all variables supported based on project name.
suffix : {"nc", "zarr"}
Suffix used to identify data files. Default: "nc".
output_format : {"netcdf", "zarr"}
Default: "zarr".
overwrite : bool
Will overwrite files. For zarr, existing folders will be removed before writing.
Returns
-------
None
"""
if isinstance(input_folder, str):
input_folder = Path(input_folder).expanduser()
if isinstance(output_folder, str):
output_folder = Path(output_folder).expanduser()
output_folder.mkdir(exist_ok=True)
if target_chunks is None or time_step is None:
test_file = next(input_folder.glob(f"*.{suffix}"))
if project is None:
logging.warning(
"`project` and `target_chunks` not set. Attempting to find `project` from attributes"
)
project = get_global_attrs(test_file).get("project")
if not project:
raise ValueError(
"`project` not found. Must pass either `project` or `target_chunks`."
)
if time_step is None:
time_step = get_global_attrs(test_file).get("frequency")
if not time_step:
raise ValueError("Frequency not found in file attributes.")
if output_format == "netcdf":
output_suffix = "nc"
if chunking_priority == "auto":
chunking_priority = "files"
elif output_format == "zarr":
output_suffix = "zarr"
if chunking_priority == "auto":
chunking_priority = "time"
else:
raise NotImplementedError(f"Output format: `{output_format}`.")
files_found = discover_data(input_folder, suffix=suffix)
variable_sorted = sort_variables(files_found, variables)
errored = list()
start_all = time.perf_counter()
for variable, files in variable_sorted.items():
start_var = time.perf_counter()
for file in files:
start = time.perf_counter()
output_path = output_folder / "temp"
output_path.mkdir(exist_ok=True)
out = output_path / f"{file.stem}.{output_suffix}"
if out.is_dir() or out.is_file():
if overwrite:
if out.is_dir() and output_format == "zarr":
logging.warning(f"Removing existing zarr files for {out.name}.")
shutil.rmtree(out)
else:
logging.info(f"Files exist: {file.name}")
continue
ds = xr.open_dataset(file, chunks={"time": -1})
if target_chunks is None:
chunk_config = fetch_chunk_config(chunking_priority, time_step, ds.dims)
calendar, size = get_time_attrs(ds)
target_chunks = translate_time_chunk(chunk_config, calendar, size)
try:
delayed_write(
ds,
out,
output_format=output_format,
overwrite=overwrite,
target_chunks=target_chunks,
).compute()
except KeyError:
logging.warning(f"{file} has chunking errors. Verify data manually.")
errored.append(file)
continue
logging.info(f"Done for {file.stem} in {time.perf_counter() - start:.2f} s")
logging.info(f"Moving {file.name} to {output_folder}.")
shutil.move(out, output_folder)
logging.info(
f"{variable} rechunked in {(time.perf_counter() - start_var) / 3600:.2f} h"
)
continue
logging.info(
f"All variables in {input_folder} rechunked in {time.perf_counter() - start_all}"
)
if errored:
errored_filenames = "\n".join(map(str, errored))
logging.warning(f"Errored files are as follows: \n{errored_filenames}")