Source code for miranda.ecmwf._tigge

from __future__ import annotations
import functools
import logging
import multiprocessing
import os
from datetime import datetime as dt
from datetime import timedelta as td
from pathlib import Path


logger = logging.getLogger("miranda.ecmwf.tigge")


__all__ = ["request_tigge"]


[docs] def request_tigge( variables: list[str], providers: list[str] | None = None, *, forecast_type: str = "pf", times: list[str] | None = None, dates: list[str] | None = None, date_start: str | None = None, date_end: str | None = None, output_folder: os.PathLike | None = None, processes: int = 4, ) -> None: """ Request tigge data from ECMWF in grib format. Parameters ---------- variables : list of str providers : list of str, optional forecast_type: {"pf", "cf"} times : list of str, optional dates : list of str. optional date_start : str, optional date_end : str, optional output_folder : os.PathLike, optional processes : int Returns ------- None """ def _request_direct_tigge( variable_name: str, variable_code: str, time: str, fc_type: str, provider: str, nums: int | None, date: str, ): """Launch formatted request.""" try: from ecmwfapi import ECMWFDataServer except ModuleNotFoundError: msg = f"{_request_direct_tigge.__name__} requires additional dependencies. Please install them with `pip install miranda[remote]`." logging.error(msg) raise number_range = "" if nums: number_range = "/".join([str(n) for n in range(1, nums + 1)]) output_name = f"{variable_name}_{provider}_{'-'.join(time.split('/'))}_tigge_reanalysis_6h_{date.split('/')[0]}_{date.split('/')[-1]}.grib2" # Note: This is only valid for ECMWF at the moment. steps = ( "0/6/12/18/24/30/36/42/48/54/60/66/72/78/84/90/96/102/108/114/120/126/132/138/144/150/156/162/168/" "174/180/186/192/198/204/210/216/222/228/234/240/246/252/258/264/270/276/282/288/294/300/306/312/318/324/" "330/336/342/348/354/360" ) # Remove steps 0 for tasmax and tasmin if variable_code in [121, 122]: steps = steps[2:] request = { "class": "ti", "dataset": "tigge", "date": date, "expver": "prod", "grid": "0.5/0.5", "levtype": "sfc", "origin": provider, "param": variable_code, "step": steps, "time": time, "type": fc_type, "target": output_name, } if nums: request.update({"number": number_range}) server = ECMWFDataServer() server.retrieve(request) # Providers of interest if providers is None: providers = ["ecmf"] if output_folder is None: target = Path().cwd().joinpath("download") else: target = Path(output_folder) Path(target).mkdir(exist_ok=True) os.chdir(target) if not times: times = ["00/12"] if date_start and date_end: start = (dt.today() - td(days=4)).strftime("%Y-%m-%d") finish = (dt.today() - td(days=3)).strftime("%Y-%m-%d") date_range = f"{start}/to/{finish}" times.append(date_range) elif dates: pass else: raise ValueError() tigge_variables = dict() tigge_variables["uas"] = 165 tigge_variables["vas"] = 166 tigge_variables["tas"] = 167 tigge_variables["tds"] = 168 tigge_variables["pr"] = 222 tigge_variables["swe"] = 228144 tigge_variables["tasmax"] = 121 tigge_variables["tasmin"] = 122 if variables is None: variables = tigge_variables.keys() project_members = dict( dems=44, kwbc=26, cwao=20, ecmf=50, babj=30, egrr=23, rksl=24, rjtd=50, edzw=40, lfpw=34, ammc=32, ) for v in variables: var_num = tigge_variables[v] for t in times: for d in dates: for p in providers: proc = multiprocessing.Pool(processes=processes) config = dict( variable_name=v, variable_code=var_num, time=t, provider=p, dates=d, ) num_dict = dict() if forecast_type == "pf": numbers = project_members[p] num_dict[numbers] = numbers func = functools.partial(_request_direct_tigge, **config, **num_dict) logging.info([func, dt.now().strftime("%Y-%m-%d %X")]) proc.map(func, dates) proc.close() proc.join()