Source code for miranda.ghcn._ghcn

from __future__ import annotations
import logging
import os
import shutil
from pathlib import Path

import pandas as pd
import requests
import xarray as xr
from numpy import nan
from pandas.errors import EmptyDataError, ParserError

from miranda.convert.utils import (
    _add_coords_to_dataset,
    get_station_meta,
    prj_dict,
    q_flag_dict,
)


all = [
    "create_ghcn_xarray",
    "download_ghcn",
    "get_ghcn_raw",
]

logger = logging.getLogger("miranda.ghcn")


def _process_ghcnd(station_id: Path, variable_meta: dict, station_meta: pd.DataFrame, logger: logging.Logger) -> xr.Dataset | None:
    """
    Process a single GHCN-Daily (ghcnd) station file into an xarray.Dataset.

    Parameters
    ----------
    station_id : Path
        Path to the input CSV file for the station.
    variable_meta : dict
        Variable metadata dictionary.
    station_meta : pd.DataFrame
        DataFrame containing station metadata.
    logger : logging.Logger
        Logger for logging messages.

    Returns
    -------
    xr.Dataset, optional
        The processed dataset, or None if no variables found.
    """
    df = pd.read_csv(station_id)
    df.columns = df.columns.str.lower()
    df.element = df.element.str.lower()
    imask = ~df.q_flag.isin(list(q_flag_dict["ghcnd"].keys()))
    df.loc[imask, "q_flag"] = nan
    varlist = [k for k in variable_meta.keys() if k in df.element.unique()]
    if varlist:
        df["time"] = pd.to_datetime(df["date"], format="%Y%m%d")
        df = df.set_index(["id", "time"])
        dslist = []
        for var in varlist:
            ds1 = df.loc[df.element == var].to_xarray()
            ds1 = ds1.rename({"data_value": var, "id": "station"})
            drop_vars = [v for v in ds1.data_vars if v not in varlist and v not in ["q_flag"]]
            ds1 = ds1.drop_vars(drop_vars)
            ds1 = ds1.rename({v: f"{var}_{v}" for v in ds1.data_vars if "flag" in v})
            dslist.append(ds1)
        ds = xr.merge(dslist)
        del dslist
        df_stat = station_meta[station_meta.station_id == station_id.stem]
        if len(df_stat) != 1:
            raise ValueError(f"Expected a single station metadata for {station_id.stem}.")
        ds = _add_coords_to_dataset(ds, df_stat, float_flag=False)
        return ds
    return None


def _process_ghcnh(station_id: Path, variable_meta: dict, station_meta: pd.DataFrame, logger: logging.Logger) -> xr.Dataset | None:
    """
    Process a single GHCN-Hourly (ghcnh) station file into an xarray.Dataset.

    Parameters
    ----------
    station_id : Path
        Path to the input PSV file for the station.
    variable_meta : dict
        Variable metadata dictionary.
    station_meta : pd.DataFrame
        DataFrame containing station metadata.
    logger : logging.Logger
        Logger for logging messages.

    Returns
    -------
    xr.Dataset, optional
        The processed dataset, or None if no variables found.
    """
    varlist = [k for k in variable_meta.keys()]
    flaglist = [f"{k}_Quality_Code" for k in varlist]
    coordlist_alt = [
        "Station_ID",
        "Station_name",
        "Year",
        "Month",
        "Day",
        "Hour",
        "Latitude",
        "Longitude",
        "Elevation",
    ]
    coordlist = [
        "STATION",
        "Station_name",
        "Year",
        "Month",
        "Day",
        "Hour",
        "LATITUDE",
        "LONGITUDE",
        "ELEVATION",
    ]

    usecols = coordlist + varlist + flaglist
    usecols_alt = coordlist_alt + varlist + flaglist
    try:
        df = pd.read_csv(station_id, delimiter="|", low_memory=False, usecols=usecols)
        df.rename(columns={"STATION": "STATION_ID"}, inplace=True)
    except (EmptyDataError, ParserError, ValueError, OSError) as e:
        msg = f"Failed to read {station_id} with columns {usecols} due to {type(e).__name__}: {e}. Trying alternative column names."
        logger.warning(msg)
        df = pd.read_csv(station_id, delimiter="|", low_memory=False, usecols=usecols_alt)
    df.columns = df.columns.str.lower()
    varlist = [k for k in variable_meta.keys() if k in df.columns]
    if varlist:
        for col in ["year", "month", "day", "hour"]:
            df[col] = pd.to_numeric(df[col], errors="coerce")
            df = df.dropna(subset=[col])
        df["time"] = pd.to_datetime(df[["year", "month", "day", "hour"]])
        df = df.set_index(["station_id", "time"])
        df = df.iloc[~df.index.duplicated()]
        dslist = []
        for var in varlist:
            ds1 = df[[var, f"{var}_quality_code"]].to_xarray()
            ds1[var] = ds1[var].astype("float32")
            ds1 = ds1.rename({"station_id": "station", f"{var}_quality_code": f"{var}_flag"})
            if ds1[f"{var}_flag"].dtype == "float":
                ds1[f"{var}_flag"] = ds1[f"{var}_flag"].round().astype(str)
                ds1[f"{var}_flag"] = ds1[f"{var}_flag"].where(ds1[f"{var}_flag"] != "nan", "")
            else:
                df1 = df[[var, f"{var}_quality_code"]].copy()
                df1["num_str"] = pd.to_numeric(df1[f"{var}_quality_code"], errors="coerce").round().astype(str)
                df1.loc[
                    df1[f"{var}_quality_code"].apply(lambda x: isinstance(x, str)),
                    "num_str",
                ] = df1[df1[f"{var}_quality_code"].apply(lambda x: isinstance(x, str))][f"{var}_quality_code"].astype(str)
                df1 = df1.drop(columns=[f"{var}_quality_code"])
                df1 = df1.rename(columns={"num_str": f"{var}_quality_code"})
                ds1 = df1.to_xarray()
                ds1 = ds1.rename({"station_id": "station", f"{var}_quality_code": f"{var}_flag"})
                ds1[f"{var}_flag"] = ds1[f"{var}_flag"].where(ds1[f"{var}_flag"] != "nan", "")
            dslist.append(ds1)
        ds = xr.merge(dslist)
        del dslist
        df_stat = station_meta[station_meta.station_id == station_id.stem.split("_", 1)[1].split("_por")[0]]
        if len(df_stat) != 1:
            raise ValueError(f"expected a single station metadata for {station_id.stem}")
        for cc in [c for c in df_stat.columns if c not in ["station_id", "geometry", "index_right"]]:
            if cc not in ds.coords:
                ds = ds.assign_coords({cc: xr.DataArray(df_stat[cc].values, coords=ds.station.coords)})
        for vv in ds.data_vars:
            if ds[vv].dtype == "float64":
                ds[vv] = ds[vv].astype("float32")
        return ds
    return None


def _filter_vars_time(ds: xr.Dataset, varlist: list | None, start_date: pd.Timestamp, end_date: pd.Timestamp) -> xr.Dataset | None:
    keep_vars = [vv for vv in ds.data_vars if any([vv.startswith(v) for v in varlist])]
    ds = ds.drop_vars([v for v in ds.data_vars if v not in keep_vars])
    ds = ds.sel(time=slice(str(start_date.year), str(end_date.year)))
    if len(ds.station) == 0 or len(ds.time) == 0:
        return None
    return ds


[docs] def create_ghcn_xarray( in_files: list, variable_meta: dict, station_meta: pd.DataFrame, project: str, start_date: str | pd.Timestamp, end_date: str | pd.Timestamp, varlist: list | None = None, n_workers: int | None = None, ) -> xr.Dataset | None: """ Create a Zarr dump of DWD climate summary data. Parameters ---------- in_files : list A list of input files. variable_meta : dict Variable metadata. station_meta : pd.DataFrame Station metadata. project : str Project name. start_date: str or pd.Timestamp Start date of the data to be processed. end_date : str or pd.Timestamp End date of the data to be processed. varlist : list List of variables to keep, if None, all variables are kept. n_workers : int, optional Number of parallel workers to use. If None or 1, no parallelism is used Returns ------- xr.Dataset, optional Dataset. """ import concurrent.futures def _process_one(station_id: Path) -> xr.Dataset | None: msg = f"Reading {station_id.name}" logger.info(msg) if project not in prj_dict: raise ValueError(f"Unknown project {project}") if project == "ghcnd": return _process_ghcnd(station_id, variable_meta, station_meta, logger) elif project == "ghcnh": return _process_ghcnh(station_id, variable_meta, station_meta, logger) else: raise ValueError(f"Unknown project {project}") station_list = sorted(list(in_files)) data: list[xr.Dataset] = [] if n_workers is not None and n_workers > 1: futures: dict[concurrent.futures.Future, Path] = {} with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers * 2) as executor: for station_id in station_list: futures[executor.submit(_process_one, station_id)] = station_id for fut in concurrent.futures.as_completed(list(futures.keys())): station_id = futures[fut] try: ds = fut.result() ds = _filter_vars_time(ds, varlist, start_date, end_date) if ds is not None: data.append(ds) except (pd.errors.EmptyDataError, xr.MergeError, TypeError, AttributeError, IndexError, OSError, ValueError, KeyError) as e: msg = f"Failed to read data for {station_id.name} : {type(e).__name__}: {e} ... continuing" logger.warning(msg) continue else: for station_id in station_list: try: ds = _process_one(station_id) ds = _filter_vars_time(ds, varlist, start_date, end_date) if ds is not None: data.append(ds) except (pd.errors.EmptyDataError, xr.MergeError, TypeError, AttributeError, IndexError, OSError, ValueError, KeyError) as e: # noqa: PERF203 msg = f"Failed to read data for {station_id.name} : {type(e).__name__}: {e} ... continuing" logger.warning(msg) continue # noqa: PERF203 if len(data) == 0: return None return xr.concat(data, dim="station", join="outer")
[docs] def get_ghcn_raw( station_ids: list, station_type: str, out_folder: Path, timeout: int = 10, update_raw: bool = False, n_workers: int | None = None, ) -> list[str]: """ Download raw GHCN data. Parameters ---------- station_ids : list[str] List of station IDs. station_type : str Station type. out_folder : Path Output folder. timeout : int Request timeout in seconds. Default is 10. update_raw : bool Whether to update raw data. n_workers : int, optional Number of parallel workers to use. If None or 1, no parallelism is used Returns ------- list of str List of station IDs that failed to download. """ if station_ids is None: raise ValueError("station_ids must be provided") if station_type is None: raise ValueError("stationtype must be provided") if out_folder is None: raise ValueError("outfolder must be provided") import concurrent.futures out_folder.mkdir(parents=True, exist_ok=True) errors = [] def download_one_station(station_id): if station_type == "daily": url = f"https://noaa-ghcn-pds.s3.amazonaws.com/csv/by_station/{station_id}.csv" outfile = out_folder / f"{station_id}.csv" elif station_type == "hourly": url = f"https://www.ncei.noaa.gov/oa/global-historical-climatology-network/hourly/access/by-station/GHCNh_{station_id}_por.psv" outfile = out_folder / f"GHCNh_{station_id}_por.psv" else: msg = f"unknown station type : {station_type}" raise ValueError(msg) if outfile.exists() and not update_raw: return None try: msg = f"Downloading {url}" logger.info(msg) with requests.get(url, timeout=(5, timeout)) as r: r.raise_for_status() with Path(outfile.with_suffix(f".tmp{outfile.suffix}")).open("wb") as f: f.write(r.content) shutil.move(outfile.with_suffix(f".tmp{outfile.suffix}"), outfile) return None except OSError: msg = f"Failed to download from URL: {url} .. continuing" logger.info(msg) return station_id if n_workers is not None and n_workers > 1: with concurrent.futures.ThreadPoolExecutor(max_workers=n_workers) as executor: results = list(executor.map(download_one_station, station_ids)) errors = [sid for sid in results if sid is not None] else: for station_id in station_ids: err = download_one_station(station_id) if err is not None: errors.append(err) return errors
[docs] def download_ghcn( project: str, working_folder: str | os.PathLike[str] | None = None, lon_bnds: list[float] | None = None, lat_bnds: list[float] | None = None, update_raw: bool = False, timeout: int | None = None, retry: int = 5, n_workers: int | None = None, # FIXME: Not implemented yet ) -> None: """ Download GHCN data. Parameters ---------- project : str Project name. working_folder : str or os.PathLink[str], optional Temporary files folder. lon_bnds : list of float, optional Longitude boundaries. lat_bnds : list of float, optional Latitude boundaries. update_raw : bool Whether to update the raw files or not. timeout : int, optional Request timeout in seconds. retry : int Number of retries. n_workers : int, optional Number of parallel workers to use. If None or 1, no parallelism is used Raises ------ ValueError If the project name is unknown. """ station_df = get_station_meta(project=project, lon_bnds=lon_bnds, lat_bnds=lat_bnds) working_folder.mkdir(parents=True, exist_ok=True) out_folder = working_folder.joinpath("raw") out_folder.mkdir(parents=True, exist_ok=True) # request = NoaaGhcnRequest(parameters=(prj_dict[project], "data"), start_date=start_date, end_date=end_date) station_ids = station_df["station_id"].tolist() errors = 0 for try_iter in range(retry): errors = get_ghcn_raw( station_ids=station_ids, station_type=prj_dict[project]["freq"], out_folder=out_folder, update_raw=update_raw, timeout=timeout, n_workers=n_workers, ) if len(errors) == 0: break else: station_ids = errors msg = f"Failed to download {len(errors)} stations. Retrying ({try_iter + 1}/{retry})" logger.info(msg) else: msg = f"Failed to download stations {errors} after {retry} retries. Skipping." logger.error(msg)