Source code for miranda.eccc._homogenized

"""Adjusted and Homogenized Canadian Clime Data module."""
from __future__ import annotations

import calendar
import logging.config
from pathlib import Path

import numpy as np
import pandas as pd
import xarray as xr
from dask.diagnostics import ProgressBar

from miranda.scripting import LOGGING_CONFIG

from ._utils import cf_ahccd_metadata

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.Logger("miranda")

__all__ = ["convert_ahccd", "convert_ahccd_fwf_files"]


[docs] def convert_ahccd( data_source: str | Path, output_dir: str | Path, variable: str, generation: int | None = None, ) -> None: """Convert Adjusted and Homogenized Canadian Climate Dataset files. Parameters ---------- data_source: str or Path output_dir: str or Path variable: str generation: int, optional Returns ------- None """ output_dir = Path(output_dir).resolve().joinpath(variable) output_dir.mkdir(parents=True, exist_ok=True) code = dict(tasmax="dx", tasmin="dn", tas="dm", pr="dt", prsn="ds", prlp="dr").get( variable ) var, col_names, col_spaces, header_row, global_attrs = cf_ahccd_metadata( code, generation ) gen = {2: "Second", 3: "Third"}.get(generation) if generation == 3 and code in {"dx", "dn", "dm"}: meta = "ahccd_gen3_temperature.csv" elif generation == 2 and code in {"dt", "ds", "dr"}: meta = "ahccd_gen2_precipitation.csv" else: raise NotImplementedError(f"Code '{code} for generation {gen}.") metadata_source = Path(__file__).resolve().parent.joinpath("data").joinpath(meta) if "tas" in variable: metadata = pd.read_csv(metadata_source, header=2) metadata.columns = col_names.keys() cols_specs = col_spaces elif "pr" in variable: metadata = pd.read_csv(metadata_source, header=3) metadata.columns = col_names.keys() cols_specs = col_spaces for index, row in metadata.iterrows(): if isinstance(row["stnid"], str): metadata.loc[index, "stnid"] = metadata.loc[index, "stnid"].replace( " ", "" ) else: raise KeyError(f"{variable} does not include 'pr' or 'tas'.") # Convert station .txt files to netcdf for ff in Path(data_source).glob("*d*.txt"): outfile = output_dir.joinpath(ff.name.replace(".txt", ".nc")) if not outfile.exists(): logger.info(ff.name) stid = ff.name.replace(code, "").split(".txt")[0] try: metadata_st = metadata[metadata["stnid"] == int(stid)] except ValueError: metadata_st = metadata[metadata["stnid"] == stid] if len(metadata_st) == 1: ds_out = convert_ahccd_fwf_files( ff, metadata_st, variable, generation, cols_specs, var ) ds_out.attrs = global_attrs ds_out.to_netcdf(outfile, engine="h5netcdf") else: logger.warning( f"metadata info for station {ff.name} not found : skipping" ) # merge individual stations to single .nc file # variable ncfiles = list(output_dir.glob("*.nc")) outfile = output_dir.parent.joinpath( "merged_stations", f"ahccd_gen{generation}_{variable}.nc" ) if not outfile.exists(): logger.info("merging stations :", variable) with ProgressBar(): ds_ahccd = xr.open_mfdataset( ncfiles, concat_dim="station", combine="nested" ).load() for coord in ds_ahccd.coords: # xarray object datatypes mix string and int (e.g. stnid) convert to string for merged nc files # Do not apply to datetime object if coord != "time" and ds_ahccd[coord].dtype == "O": ds_ahccd[coord] = ds_ahccd[coord].astype(str) for v in ds_ahccd.data_vars: # xarray object datatypes mix string and int (e.g. stnid) convert to string for merged nc files # Do not apply to flag timeseries if ds_ahccd[v].dtype == "O" and "flag" not in v: logger.info(v) ds_ahccd[v] = ds_ahccd[v].astype(str) ds_ahccd[f"{variable}_flag"].attrs[ "long_name" ] = f"{ds_ahccd[f'{variable}'].attrs['long_name']} flag" ds_ahccd.lon.attrs["units"] = "degrees_east" ds_ahccd.lon.attrs["long_name"] = "longitude" ds_ahccd.lat.attrs["units"] = "degrees_north" ds_ahccd.lat.attrs["long_name"] = "latitude" for clean_name, orig_name in col_names.items(): if clean_name in ["lat", "long"]: continue ds_ahccd[clean_name].attrs["long_name"] = orig_name outfile.parent.mkdir(parents=True, exist_ok=True) ds_ahccd.to_netcdf( outfile, engine="h5netcdf", format="NETCDF4_CLASSIC", mode="w" ) del ds_ahccd for nc in outfile.parent.glob("*.nc"): logger.info(nc) ds = xr.open_dataset(nc) logger.info(ds)
[docs] def convert_ahccd_fwf_files( ff: Path | str, metadata: pd.DataFrame, variable: str, generation: int = None, cols_specs: list[tuple[int, int]] | None = None, attrs: dict | None = None, ) -> xr.Dataset: """Convert AHCCD fixed-width files. Parameters ---------- ff: str or Path metadata: pandas.DataFrame variable: str generation cols_specs attrs Returns ------- xarray.Dataset """ code = dict(tasmax="dx", tasmin="dn", tas="dm", pr="dt", prsn="ds", prlp="dr").get( variable ) if attrs is None: attrs, _, _, _, _ = cf_ahccd_metadata(code, generation) if cols_specs is None: _, _, cols_specs, _, _ = cf_ahccd_metadata(code, generation) _, _, _, nhead, _ = cf_ahccd_metadata(code, generation) df = pd.read_fwf(ff, header=nhead, colspecs=cols_specs) if "pr" in variable: cols = list(df.columns[0:3]) cols = cols[0::2] cols.extend(list(df.columns[4::2])) flags = list(df.columns[5::2]) dfflags = df[flags] else: cols = [c for c in df.columns if "Unnamed" not in c] flags = [c for c in df.columns if "Unnamed" in c] dfflags = df[flags[2:]] df = df[cols] df.replace(attrs["NaN_value"], np.NaN, inplace=True) for i, j in enumerate(["Year", "Month"]): df = df.rename(columns={df.columns[i]: j}) start_date = f"{df['Year'][0]}-{str(df['Month'][0]).zfill(2)}-01" _, ndays = calendar.monthrange(df["Year"].iloc[-1], df["Month"].iloc[-1]) end_date = f"{df['Year'].iloc[-1]}-{str(df['Month'].iloc[-1]).zfill(2)}-{str(ndays).zfill(2)}" time1 = pd.date_range(start=start_date, end=end_date) index = pd.MultiIndex.from_arrays([df["Year"], df["Month"]]) df.index = index dfflags.index = index cols = [c for c in df.columns if "Year" not in c and "Month" not in c] df = df[cols] df.columns = np.arange(1, 32) dfflags.columns = np.arange(1, 32) ds = df.stack().to_frame() ds = ds.rename(columns={0: variable}) ds_flag = dfflags.stack().to_frame() ds_flag = ds_flag.rename(columns={0: "flag"}) ds.index.names = ["Year", "Month", "Day"] ds_flag.index.names = ["Year", "Month", "Day"] ds[f"{variable}_flag"] = ds_flag["flag"] del ds_flag # find invalid dates for y in time1.year.unique(): for m in ( ds[ds.index.get_level_values("Year") == y] .index.get_level_values("Month") .unique() ): _, exp_ndays = calendar.monthrange(y, m) ndays = ( (ds.index.get_level_values("Year") == y) & (ds.index.get_level_values("Month") == m) ).sum() if ndays > np.int(exp_ndays): print(f"year {y}, month {m}, ndays={ndays}, exp_ndays={exp_ndays}") raise RuntimeError("Unknown days present.") time_ds = pd.DataFrame( { "year": ds.index.get_level_values("Year"), "month": ds.index.get_level_values("Month"), "day": ds.index.get_level_values("Day"), } ) ds.index = pd.to_datetime(time_ds) ds = ds.to_xarray().rename({"index": "time"}) ds_out = xr.Dataset(coords={"time": time1}) for v in ds.data_vars: ds_out[v] = ds[v] ds_out[variable].attrs = attrs # ds_out metadata = metadata.to_xarray().rename({"index": "station"}).drop_vars("station") metadata = metadata.assign_coords( { "stnid": metadata["stnid"].astype(str), "station_name": metadata["station_name"], } ) # ds_out = ds_out.assign_coords({'lon': metadata['long'], 'lat': metadata['lat'], 'elevation': metadata['elev']}) # ds_out = ds_out.assign_coords(station=metadata.stnid) metadata = metadata.drop_vars(["stnid", "station_name"]) ds_out["lon"] = metadata["long"] ds_out["lon"].attrs["units"] = "degrees_east" ds_out["lat"] = metadata["lat"] ds_out["lat"].attrs["units"] = "degrees_north" ds_out["elev"] = metadata["elev"] ds_out["elev"].attrs["units"] = "m" metadata = metadata.drop_vars(["long", "lat", "elev"]) for vv in metadata.data_vars: if metadata[vv].dtype == "O" and (variable not in vv): ds_out[vv] = metadata[vv].astype(str) else: ds_out[vv] = metadata[vv] return ds_out