Source code for miranda.preprocess.ecmwf_tigge

"""ECMWF TIGGE Conversion module."""

from __future__ import annotations
import itertools as it
import logging
import multiprocessing
import os
import shutil
import tempfile
from pathlib import Path

import xarray
from dask.diagnostics import ProgressBar


__all__ = ["tigge_convert"]


# FIXME: Is this function still pertinent?
[docs] def tigge_convert( source: os.PathLike | None = None, target: os.PathLike | None = None, processes: int = 8, ) -> None: """ Convert TIGGE grib2 file to netCDF format. Parameters ---------- source : os.PathLike, optional The source directory containing the TIGGE files. target : os.PathLike, optional The target directory to save the converted files. processes : int The number of processes to use for the conversion. """ def _tigge_convert(fn): """ Launch reformatting function. Parameters ---------- fn : tuple The file and output folder. """ infile, output_folder = fn try: for f in Path(infile.parent).glob(infile.name.replace(".grib", "*.idx")): f.unlink(missing_ok=True) ds = xarray.open_dataset( infile, engine="cfgrib", chunks="auto", ) encoding = {var: dict(zlib=True) for var in ds.data_vars} encoding["time"] = {"dtype": "single"} tf = tempfile.NamedTemporaryFile(suffix=".nc", delete=False) with ProgressBar(): msg = f"Converting `{infile.name}`." logging.info(msg) ds.to_netcdf(tf.name, format="NETCDF4", engine="h5netcdf", encoding=encoding) shutil.move( tf.name, output_folder.joinpath(infile.name.replace(".grib", ".nc")).as_posix(), ) except ValueError as err: msg = f"error converting {infile.name} : File may be corrupted : {err}" logging.error(msg) if source is None: source = Path().cwd().joinpath("downloaded") all_files = list(Path(source).glob("*.grib2")) if len(all_files) == 0: raise FileNotFoundError("TIGGE files not found.") if target is None: target = Path().cwd().joinpath("converted") else: target = Path(target) target.mkdir(exist_ok=True) with multiprocessing.Pool(processes=processes) as p: combs = list(it.product(*[all_files, [target]])) p.map(_tigge_convert, combs) p.close() p.join()