Source code for miranda.preprocess._eccc_ahccd

"""Adjusted and Homogenized Canadian Clime Data module."""

from __future__ import annotations
import calendar
import logging
from pathlib import Path

import numpy as np
import pandas as pd
import xarray as xr

from miranda.io import write_dataset
from miranda.io.utils import name_output_file
from miranda.preprocess._metadata import (
    eccc_variable_metadata,
    homogenized_column_definitions,
)
from miranda.treatments import find_project_variable_codes, load_json_data_mappings


logger = logging.Logger("miranda")

__all__ = ["convert_ahccd", "convert_ahccd_fwf_file", "merge_ahccd"]


[docs] def convert_ahccd_fwf_file( ff: Path | str, metadata: pd.DataFrame, variable: str, *, generation: int, ) -> xr.Dataset: """ Convert AHCCD fixed-width files. Parameters ---------- ff: str or Path metadata: pandas.DataFrame variable: str generation: int Returns ------- xarray.Dataset """ configuration = load_json_data_mappings("eccc-ahccd") code = find_project_variable_codes(variable, configuration) variable_meta, global_attrs = eccc_variable_metadata(code, "eccc-ahccd", generation, configuration) column_names, column_spaces, column_dtypes, header = homogenized_column_definitions(code) df = pd.read_fwf(ff, header=header, colspecs=column_spaces, dtype=column_dtypes) # Handle different variable types if "pr" in variable: cols = list(df.columns[0:3]) cols = cols[0::2] cols.extend(list(df.columns[4::2])) flags = list(df.columns[5::2]) dfflags = df[flags] elif "tas" in variable: cols = [c for c in df.columns if "Unnamed" not in c] flags = [c for c in df.columns if "Unnamed" in c] dfflags = df[flags[2:]] else: raise NotImplementedError(f"Variable `{variable}` not supported.") # Extract relevant columns df = df[cols] df.replace(variable_meta[variable]["NaN_value"], np.NaN, inplace=True) for i, j in enumerate(["Year", "Month"]): df = df.rename(columns={df.columns[i]: j}) start_date = f"{df['Year'][0]}-{str(df['Month'][0]).zfill(2)}-01" _, ndays = calendar.monthrange(df["Year"].iloc[-1], df["Month"].iloc[-1]) end_date = f"{df['Year'].iloc[-1]}-{str(df['Month'].iloc[-1]).zfill(2)}-{str(ndays).zfill(2)}" time1 = pd.date_range(start=start_date, end=end_date) index = pd.MultiIndex.from_arrays([df["Year"], df["Month"]]) df.index = index cols = [c for c in df.columns if "Year" not in c and "Month" not in c] df = df[cols] df.columns = np.arange(1, 32) ds = df.stack().to_frame() ds = ds.rename(columns={0: variable}) ds.index.names = ["Year", "Month", "Day"] dfflags.index = index dfflags.columns = np.arange(1, 32) ds_flag = dfflags.stack().to_frame() ds_flag = ds_flag.rename(columns={0: "flag"}) ds_flag.index.names = ["Year", "Month", "Day"] ds[f"{variable}_flag"] = ds_flag["flag"] del ds_flag # find invalid dates for y in time1.year.unique(): for m in ds[ds.index.get_level_values("Year") == y].index.get_level_values("Month").unique(): _, exp_ndays = calendar.monthrange(y, m) ndays = ((ds.index.get_level_values("Year") == y) & (ds.index.get_level_values("Month") == m)).sum() if ndays > np.int(exp_ndays): print(f"year {y}, month {m}, ndays={ndays}, exp_ndays={exp_ndays}") raise RuntimeError("Unknown days present.") time_ds = pd.DataFrame( { "year": ds.index.get_level_values("Year"), "month": ds.index.get_level_values("Month"), "day": ds.index.get_level_values("Day"), } ) ds.index = pd.to_datetime(time_ds) # noqa ds = ds.to_xarray().rename({"index": "time"}) ds_out = xr.Dataset(coords={"time": time1}) for v in ds.data_vars: ds_out[v] = ds[v] ds_out[variable].attrs = variable_meta[variable] metadata = metadata.to_xarray().rename({"index": "station"}).drop_vars("station") metadata = metadata.assign_coords(dict(station_name=metadata["station_name"])) ds_out = ds_out.assign_coords(station=metadata.stnid.astype(str)) metadata = metadata.drop_vars(["stnid", "station_name"]) ds_out[f"{variable}_flag"].attrs["long_name"] = variable_meta[variable]["long_name"] ds_out["lon"] = metadata["long"] ds_out.lon.attrs["units"] = "degrees_east" ds_out.lon.attrs["axis"] = "X" ds_out["lat"] = metadata["lat"] ds_out.lat.attrs["units"] = "degrees_north" ds_out.lat.attrs["axis"] = "Y" ds_out["elev"] = metadata["elev"] ds_out.elev.attrs["units"] = "meters" ds_out.elev.attrs["positive"] = "up" ds_out.elev.attrs["axis"] = "Z" metadata = metadata.drop_vars(["long", "lat", "elev"]) for vv in metadata.data_vars: if metadata[vv].dtype == "O" and (variable not in vv): ds_out[vv] = metadata[vv].astype(str) else: ds_out[vv] = metadata[vv] return ds_out
[docs] def convert_ahccd( data_source: str | Path, output_dir: str | Path, variable: str, *, generation: int, merge: bool = False, overwrite: bool = False, ) -> None: """ Convert Adjusted and Homogenized Canadian Climate Dataset files. Parameters ---------- data_source: str or Path output_dir: str or Path variable: str generation: int merge: bool overwrite: bool Returns ------- None """ configuration = load_json_data_mappings("eccc-ahccd") output_dir = Path(output_dir).resolve().joinpath(variable) output_dir.mkdir(parents=True, exist_ok=True) code = find_project_variable_codes(variable, configuration) variable_meta, global_attrs = eccc_variable_metadata(code, "eccc-ahccd", generation, configuration) ( column_names, column_spaces, column_dtypes, header_row, ) = homogenized_column_definitions(code) gen = {2: "Second", 3: "Third"}.get(generation) if generation == 3 and code in {"dx", "dn", "dm"}: station_meta = "ahccd_gen3_temperature.csv" elif generation == 2 and code in {"dt", "ds", "dr"}: station_meta = "ahccd_gen2_precipitation.csv" else: raise NotImplementedError(f"Code '{code} for generation {gen}.") metadata_source = Path(__file__).resolve().parent.joinpath("configs").joinpath(station_meta) if "tas" in variable: metadata = pd.read_csv(metadata_source, header=2) metadata.columns = column_names.keys() elif "pr" in variable: metadata = pd.read_csv(metadata_source, header=3) metadata.columns = column_names.keys() for index, row in metadata.iterrows(): if isinstance(row["stnid"], str): metadata.loc[index, "stnid"] = metadata.loc[index, "stnid"].replace(" ", "") else: raise KeyError(f"{variable} does not include 'pr' or 'tas'.") # Convert station .txt files to netcdf for ff in Path(data_source).glob(f"{code}*.txt"): output_name = ff.name.replace(".txt", ".nc") if not output_dir.joinpath(output_name).exists() or overwrite: logger.info(ff.name) station_id = ff.stem[2:] metadata_st = metadata[metadata["stnid"] == station_id] if len(metadata_st) == 1: ds_out = convert_ahccd_fwf_file(ff, metadata_st, variable, generation=generation) ds_out.attrs = global_attrs write_dataset( ds_out, output_dir, output_format="netcdf", output_name=output_name, overwrite=overwrite, compute=True, ) else: msg = f"Metadata info for station {ff.name} not found: Skipping..." logger.warning(msg) else: msg = f"{output_name} already exists: Skipping..." logger.info(msg) if merge: merge_ahccd(data_source, output_dir, variable) return
[docs] def merge_ahccd( data_source: str | Path, output_dir: str | Path | None = None, variable: str | None = None, overwrite: bool = False, ) -> None: """Merge Adjusted and Homogenized Canadian Climate Dataset files.""" configuration = load_json_data_mappings("eccc-ahccd") if variable: code = find_project_variable_codes(variable, configuration) glob_pattern = f"{code}*.nc" output_dir = Path(output_dir).resolve().joinpath(variable) else: glob_pattern = "*.nc" output_dir = Path(output_dir).resolve() output_dir.mkdir(parents=True, exist_ok=True) # Merge individual stations to single .nc file ds_ahccd = xr.open_mfdataset(list(data_source.glob(glob_pattern)), concat_dim="station", combine="nested") for coord in ds_ahccd.coords: # xarray object datatypes mix string and int (e.g. station) convert to string for merged nc files # Do not apply to datetime object if coord != "time" and ds_ahccd[coord].dtype == "O": ds_ahccd[coord] = ds_ahccd[coord].astype(str) variables_found = set() for v in ds_ahccd.data_vars: # xarray object datatypes mix string and int (e.g. station) convert to string for merged nc files # Do not apply to flag timeseries if ds_ahccd[v].dtype == "O" and "flag" not in v: ds_ahccd[v] = ds_ahccd[v].astype(str) try: variables_found.add(find_project_variable_codes(str(v), configuration)) except NotImplementedError: msg = f"Variable {v} not found in metadata." logging.info(msg) pass # Name output file ds_ahccd.attrs["variable"] = ", ".join(variables_found) if len(variables_found) > 1: variables = "-".join(variables_found) msg = f"Many variables found. Merging station and variables files in {data_source}." logger.info(msg) else: variables = variables_found.pop() output_name = name_output_file(ds_ahccd, "netcdf", variables) try: msg = f"Writing merged file to: {output_dir}." logger.info(msg) write_dataset( ds_ahccd, output_dir, output_format="netcdf", output_name=output_name, overwrite=overwrite, compute=True, ) del ds_ahccd except FileExistsError: logger.info("Merged file already exists. Use overwrite=`True` to overwrite.")