Source code for miranda.eccc._homogenized

"""Adjusted and Homogenized Canadian Clime Data module."""

from __future__ import annotations
import calendar
import logging
import os
import shutil
from pathlib import Path

import numpy as np
import pandas as pd
import requests
import xarray as xr
from dask.diagnostics import ProgressBar

from miranda.convert.utils import _add_coords_to_dataset
from miranda.eccc._utils import cf_ahccd_metadata


logger = logging.getLogger("miranda")

__all__ = [
    "convert_ahccd",
    "convert_ahccd_fwf_files",
    "create_canhomt_xarray",
    "download_canhomt",
]


[docs] def download_canhomt( project: str, working_folder: str | os.PathLike[str] | None = None, update_raw: bool = False, timeout: int | None = None, retry: int = 5, n_workers: int | None = None, # FIXME: Not implemented yet ) -> None: """ Download CanHomT data. Parameters ---------- project : {"canhomt_dly"} Project name. working_folder : str or os.PathLink[str], optional Temporary files folder. update_raw : bool Whether to update the raw files or not. timeout : int, optional Request timeout in seconds. retry : int Number of retries. n_workers : int, optional Number of workers to use. Not implemented. Raises ------ ValueError If the project name is unknown. OSError If there is an error downloading the data. """ if update_raw and working_folder.joinpath("raw").exists(): shutil.rmtree(working_folder.joinpath("raw")) working_folder.mkdir(parents=True, exist_ok=True) out_folder = working_folder.joinpath("raw") out_folder.mkdir(parents=True, exist_ok=True) if project in ["canhomt_dly"]: url = "https://crd-data-donnees-rdc.ec.gc.ca/CDAS/products/CanHomTV4/CanHomT_dlyV4.tar.gz" out_file = out_folder / "CanHomT_dlyV4.tar.gz" else: msg = f"unknown project type : {project}. Supported projects are ['canhomt_dly']" raise ValueError(msg) for _ in range(retry): if out_file.exists() and not update_raw: break try: msg = f"Downloading {url}" logging.info(msg) with requests.get(url, timeout=(5, timeout)) as r: r.raise_for_status() with Path(out_file.with_suffix(f".tmp{out_file.suffix}")).open("wb") as f: f.write(r.content) shutil.move(out_file.with_suffix(f".tmp{out_file.suffix}"), out_file) break except OSError: raise shutil.unpack_archive(out_file, out_folder, "gztar")
[docs] def create_canhomt_xarray(in_files: list, variable_meta: dict, station_meta: pd.DataFrame, project: str) -> xr.Dataset | None: """ Create a xarray dataset from CanHomT raw .csv files. Parameters ---------- in_files : list A list of input files. variable_meta : dict Variable metadata. station_meta : pd.DataFrame Station metadata. project : str Project name. Returns ------- xr.Dataset, optional Dataset. """ data = [] for cc in ["beginyear", "endyear"]: station_meta[cc] = pd.to_datetime(station_meta[cc], format="%Y-%m") for station_id in station_meta.station_id: msg = f"Reading {station_id}" logging.info(msg) statfile = [f for f in in_files if f.name == f"{station_id}.csv"] if statfile == []: msg = f"Station {station_id} not found in input files" logging.warning(msg) continue df = pd.read_csv(statfile[0]) df.columns = df.columns.str.lower() df["station"] = station_id varlist = [k for k in variable_meta.keys() if k in df.columns] if varlist: df["time"] = pd.to_datetime(df["time"], format="%Y-%m-%d") df = df.set_index(["station", "time"]) ds = df.to_xarray() drop_vars = [v for v in ds.data_vars if v not in varlist and "flag" not in v] ds = ds.drop_vars(drop_vars) df_stat = station_meta[station_meta.station_id == station_id] if len(df_stat) != 1: raise ValueError(f"expected a single station metadata for {station_id.stem}") ds = _add_coords_to_dataset(ds, df_stat) ds = ds.rename({f"{v}_flag": f"{v}_q_flag" for v in ds.data_vars if "flag" not in v}) data.append(ds) if len(data) == 0: return None return xr.concat(data, dim="station")
[docs] def convert_ahccd( data_source: str | Path, output_dir: str | Path, variable: str, generation: int | None = None, ) -> None: """ Convert Adjusted and Homogenized Canadian Climate Dataset files. Parameters ---------- data_source: str or Path output_dir: str or Path variable: str generation: int, optional Returns ------- None """ output_dir = Path(output_dir).resolve().joinpath(variable) output_dir.mkdir(parents=True, exist_ok=True) code = dict(tasmax="dx", tasmin="dn", tas="dm", pr="dt", prsn="ds", prlp="dr").get(variable) var, col_names, col_spaces, header_row, global_attrs = cf_ahccd_metadata(code, generation) gen = {2: "Second", 3: "Third"}.get(generation) if generation == 3 and code in {"dx", "dn", "dm"}: meta = "ahccd_gen3_temperature.csv" elif generation == 2 and code in {"dt", "ds", "dr"}: meta = "ahccd_gen2_precipitation.csv" else: raise NotImplementedError(f"Code '{code} for generation {gen}.") metadata_source = Path(__file__).resolve().parent.joinpath("data").joinpath(meta) if "tas" in variable: metadata = pd.read_csv(metadata_source, header=2) metadata.columns = col_names.keys() cols_specs = col_spaces elif "pr" in variable: metadata = pd.read_csv(metadata_source, header=3) metadata.columns = col_names.keys() cols_specs = col_spaces for index, row in metadata.iterrows(): if isinstance(row["stnid"], str): metadata.loc[index, "stnid"] = metadata.loc[index, "stnid"].replace(" ", "") else: raise KeyError(f"{variable} does not include 'pr' or 'tas'.") # Convert station .txt files to netcdf for ff in Path(data_source).glob("*d*.txt"): outfile = output_dir.joinpath(ff.name.replace(".txt", ".nc")) if not outfile.exists(): logger.info(ff.name) stid = ff.name.replace(code, "").split(".txt")[0] try: metadata_st = metadata[metadata["stnid"] == int(stid)] except ValueError: metadata_st = metadata[metadata["stnid"] == stid] if len(metadata_st) == 1: ds_out = convert_ahccd_fwf_files(ff, metadata_st, variable, generation, cols_specs, var) ds_out.attrs = global_attrs ds_out.to_netcdf(outfile, engine="h5netcdf") else: msg = f"metadata info for station {ff.name} not found : skipping" logger.warning(msg) # merge individual stations to single .nc file # variable ncfiles = list(output_dir.glob("*.nc")) outfile = output_dir.parent.joinpath("merged_stations", f"ahccd_gen{generation}_{variable}.nc") if not outfile.exists(): logger.info("merging stations :", variable) with ProgressBar(): ds_ahccd = xr.open_mfdataset(ncfiles, concat_dim="station", combine="nested").load() for coord in ds_ahccd.coords: # xarray object datatypes mix string and int (e.g. stnid) convert to string for merged nc files # Do not apply to datetime object if coord != "time" and ds_ahccd[coord].dtype == "O": ds_ahccd[coord] = ds_ahccd[coord].astype(str) for v in ds_ahccd.data_vars: # xarray object datatypes mix string and int (e.g. stnid) convert to string for merged nc files # Do not apply to flag timeseries if ds_ahccd[v].dtype == "O" and "flag" not in v: logger.info(v) ds_ahccd[v] = ds_ahccd[v].astype(str) ds_ahccd[f"{variable}_flag"].attrs["long_name"] = f"{ds_ahccd[f'{variable}'].attrs['long_name']} flag" ds_ahccd.lon.attrs["units"] = "degrees_east" ds_ahccd.lon.attrs["long_name"] = "longitude" ds_ahccd.lat.attrs["units"] = "degrees_north" ds_ahccd.lat.attrs["long_name"] = "latitude" for clean_name, orig_name in col_names.items(): if clean_name in ["lat", "long"]: continue ds_ahccd[clean_name].attrs["long_name"] = orig_name outfile.parent.mkdir(parents=True, exist_ok=True) ds_ahccd.to_netcdf(outfile, engine="h5netcdf", format="NETCDF4_CLASSIC", mode="w") del ds_ahccd for nc in outfile.parent.glob("*.nc"): logger.info(nc) ds = xr.open_dataset(nc) logger.info(ds)
[docs] def convert_ahccd_fwf_files( ff: Path | str, metadata: pd.DataFrame, variable: str, generation: int | None = None, cols_specs: list[tuple[int, int]] | None = None, attrs: dict | None = None, ) -> xr.Dataset: """ Convert AHCCD fixed-width files. Parameters ---------- ff: str or Path metadata: pandas.DataFrame variable: str generation cols_specs attrs Returns ------- xarray.Dataset """ code = dict(tasmax="dx", tasmin="dn", tas="dm", pr="dt", prsn="ds", prlp="dr").get(variable) if attrs is None: attrs, _, _, _, _ = cf_ahccd_metadata(code, generation) if cols_specs is None: _, _, cols_specs, _, _ = cf_ahccd_metadata(code, generation) _, _, _, nhead, _ = cf_ahccd_metadata(code, generation) df = pd.read_fwf(ff, header=nhead, colspecs=cols_specs) if "pr" in variable: cols = list(df.columns[0:3]) cols = cols[0::2] cols.extend(list(df.columns[4::2])) flags = list(df.columns[5::2]) dfflags = df[flags] else: cols = [c for c in df.columns if "Unnamed" not in c] flags = [c for c in df.columns if "Unnamed" in c] dfflags = df[flags[2:]] df = df[cols] df.replace(attrs["NaN_value"], np.NaN, inplace=True) for i, j in enumerate(["Year", "Month"]): df = df.rename(columns={df.columns[i]: j}) start_date = f"{df['Year'][0]}-{str(df['Month'][0]).zfill(2)}-01" _, ndays = calendar.monthrange(df["Year"].iloc[-1], df["Month"].iloc[-1]) end_date = f"{df['Year'].iloc[-1]}-{str(df['Month'].iloc[-1]).zfill(2)}-{str(ndays).zfill(2)}" time1 = pd.date_range(start=start_date, end=end_date) index = pd.MultiIndex.from_arrays([df["Year"], df["Month"]]) df.index = index dfflags.index = index cols = [c for c in df.columns if "Year" not in c and "Month" not in c] df = df[cols] df.columns = np.arange(1, 32) dfflags.columns = np.arange(1, 32) ds = df.stack().to_frame() ds = ds.rename(columns={0: variable}) ds_flag = dfflags.stack().to_frame() ds_flag = ds_flag.rename(columns={0: "flag"}) ds.index.names = ["Year", "Month", "Day"] ds_flag.index.names = ["Year", "Month", "Day"] ds[f"{variable}_flag"] = ds_flag["flag"] del ds_flag # find invalid dates for y in time1.year.unique(): for m in ds[ds.index.get_level_values("Year") == y].index.get_level_values("Month").unique(): _, exp_ndays = calendar.monthrange(y, m) ndays = ((ds.index.get_level_values("Year") == y) & (ds.index.get_level_values("Month") == m)).sum() if ndays > np.int(exp_ndays): print(f"year {y}, month {m}, ndays={ndays}, exp_ndays={exp_ndays}") raise RuntimeError("Unknown days present.") time_ds = pd.DataFrame( { "year": ds.index.get_level_values("Year"), "month": ds.index.get_level_values("Month"), "day": ds.index.get_level_values("Day"), } ) ds.index = pd.to_datetime(time_ds) ds = ds.to_xarray().rename({"index": "time"}) ds_out = xr.Dataset(coords={"time": time1}) for v in ds.data_vars: ds_out[v] = ds[v] ds_out[variable].attrs = attrs # ds_out metadata = metadata.to_xarray().rename({"index": "station"}).drop_vars("station") metadata = metadata.assign_coords( { "stnid": metadata["stnid"].astype(str), "station_name": metadata["station_name"], } ) # ds_out = ds_out.assign_coords({'lon': metadata['long'], 'lat': metadata['lat'], 'elevation': metadata['elev']}) # ds_out = ds_out.assign_coords(station=metadata.stnid) metadata = metadata.drop_vars(["stnid", "station_name"]) ds_out["lon"] = metadata["long"] ds_out["lon"].attrs["units"] = "degrees_east" ds_out["lat"] = metadata["lat"] ds_out["lat"].attrs["units"] = "degrees_north" ds_out["elev"] = metadata["elev"] ds_out["elev"].attrs["units"] = "m" metadata = metadata.drop_vars(["long", "lat", "elev"]) for vv in metadata.data_vars: if metadata[vv].dtype == "O" and (variable not in vv): ds_out[vv] = metadata[vv].astype(str) else: ds_out[vv] = metadata[vv] return ds_out