Source code for miranda.preprocess._eccc_obs

"""Specialized conversion tools for Environment and Climate Change Canada / Meteorological Service of Canada data."""

from __future__ import annotations
import functools
import logging
import multiprocessing as mp
import os
import re
import tempfile
import time
from datetime import datetime as dt
from pathlib import Path
from typing import Any

import dask.dataframe as dd
import pandas as pd
import xarray as xr
from dask.diagnostics import ProgressBar

from miranda.preprocess._metadata import eccc_variable_metadata, obs_column_definitions
from miranda.treatments import find_project_variable_codes, load_json_data_mappings
from miranda.vocabularies.eccc import obs_vocabularies


__all__ = [
    "convert_station",
    "merge_converted_variables",
    "merge_stations",
]
TABLE_DATE = dt.now().strftime("%d %B %Y")


def _remove_duplicates(ds):
    if any(ds.get_index("time").duplicated()):
        msg = f"Found {ds.get_index('time').duplicated().sum()} duplicated time coordinates for station {ds.station_id.values}. Assuming first value."
        logging.info(msg)
    return ds.sel(time=~ds.get_index("time").duplicated())


def convert_observation(
    data_source: str | Path | list[str | Path],
    output_dir: str | Path,
    variable: str,
    *,
    generation: int | None = None,
    merge: bool = False,
    overwrite: bool = False,
):
    """Convert a single station's data from the fixed-width format to a netCDF file."""
    output_dir = Path(output_dir).resolve().joinpath(variable)
    output_dir.mkdir(parents=True, exist_ok=True)

    code = find_project_variable_codes(variable, "eccc-obs")
    var_meta, global_attrs = eccc_variable_metadata(code, "eccc-obs", generation)
    (
        column_names,
        column_spaces,
        column_dtypes,
        header_row,
    ) = obs_column_definitions(code)

    archives = list()
    if isinstance(data_source, list) or Path(data_source).is_file():
        archives.append(data_source)
    else:
        tables = [str(repository.keys()) for repository in obs_vocabularies if code in repository.values()]
        msg = f"Collecting files for variable '{variable}'. Filename patterns containing variable code '{code}: {', '.join(tables)}'."
        logging.info(msg)
        for table in tables:
            archives.extend([f for f in Path(data_source).rglob(f"{table}*.gz")])

    # Create the output directory
    output_variable_dir = Path(output_dir).joinpath(variable)
    output_variable_dir.mkdir(parents=True, exist_ok=True)

    # Loop on the files
    errored_files = []
    for _file in archives:
        # FIXME: convert the file using the appropriate function
        pass

    if errored_files:
        msg = "Some files failed to be properly parsed:\n", ", ".join(errored_files)
        logging.warning(msg)


[docs] def convert_station( data: str | os.PathLike, variable: str, mode: str, # missing_flags: set[str], # missing_values: set[str], using_dask_array: bool = False, *, client: Any, **kwargs, ): """Convert a single station's data from the fixed-width format to a netCDF file.""" data = Path(data) variable_code = find_project_variable_codes(variable, "eccc-obs") column_names, column_widths, column_dtypes, header = obs_column_definitions(mode) # if not missing_values: # missing_values = {-9999, "#####"} if using_dask_array: pandas_reader = dd # set the block size to 200 MB chunks = dict(blocksize=200 * 2**20) else: pandas_reader = pd chunks = dict() using_dask_array = False # Create a dataframe from the files try: df = pandas_reader.read_fwf( data, widths=column_widths, names=column_names, dtype={name: data_type for name, data_type in zip(column_names, column_dtypes, strict=False)}, assume_missing=True, **chunks, ) if using_dask_array: df = client.persist(df) except FileNotFoundError as err: msg = f"File {data} was not found." logging.error(msg) raise FileNotFoundError(msg) from err except UnicodeDecodeError as e: msg = f"File {data.name} was unable to be read. This is probably an issue with the file: {e}" logging.error(msg) raise # Loop through the station codes station_codes = df["code"].unique() for code in station_codes: df_code = df[df["code"] == code] # Abort if the variable is not found if using_dask_array: has_variable_codes = ((df_code["code_var"] == variable_code).compute()).any() else: has_variable_codes = (df_code["code_var"] == variable_code).any() if not has_variable_codes: msg = f"Variable `{variable}` not found for station code: {code} in file {data}. Continuing..." logging.info(msg) continue
# # Perform the data treatment # logging.info(f"Converting `{variable}` for station code: {code}") # # # Dump the data into a DataFrame # df_var = df_code[df_code["code_var"] == variable_code].copy() # # # Mask the data according to the missing values flag # df_var = df_var.replace(missing_values, np.nan) # # # Decode the values and flags # dfd = df_var.loc[:, [f"D{i:0n}" for i in range(1, num_observations + 1)]] # dff = df_var.loc[:, [f"F{i:0n}" for i in range(1, num_observations + 1)]] # # # Remove the "NaN" flag # dff = dff.fillna("") # # # Use the flag to mask the values # try: # val = np.asarray(dfd.values, float) # except ValueError as e: # logging.error(f"{e} raised from {dfd}, continuing...") # continue # try: # flag = np.asarray(dff.values, str) # except ValueError as e: # logging.error(f"{e} raised from {dff}, continuing...") # continue # mask = np.isin(flag, missing_flags) # val[mask] = np.nan # # # Treat according to units conversions # val = val * scale_factor + add_offset # Create the DataArray # date_summations = dict(time=list()) # if mode == "hourly": # for index, row in df_var.iterrows(): # period = pd.Period( # year=row.year, month=row.month, day=row.day, freq="D" # ) # dates = pd.Series( # pd.date_range( # start=period.start_time, # end=period.end_time, # freq="H", # ) # ) # date_summations["time"].extend(dates) # written_values = val.flatten() # written_flags = flag.flatten() # elif mode == "daily": # value_days = list() # flag_days = list() # for i, (index, row) in enumerate(df_var.iterrows()): # period = pd.Period(year=row.year, month=row.month, freq="M") # dates = pd.Series( # pd.date_range( # start=period.start_time, # end=period.end_time, # freq="D", # ) # ) # date_summations["time"].extend(dates) # # value_days.extend( # val[i][range(monthrange(int(row.year), int(row.month))[1])] # ) # flag_days.extend( # flag[i][range(monthrange(int(row.year), int(row.month))[1])] # ) # written_values = value_days # written_flags = flag_days # # ds = xr.Dataset() # da_val = xr.DataArray(written_values, coords=date_summations, dims=["time"]) # # if raw_units != units: # da_val.attrs["units"] = raw_units # da_val = convert_units_to(da_val, units) # else: # da_val.attrs["units"] = units # # da_val = da_val.rename(nc_name) # variable_attributes = dict( # variable_code=variable_code, # standard_name=standard_name, # long_name=long_name, # ) # if "original_units" in kwargs: # variable_attributes["original_units"] = kwargs["original_units"] # da_val.attrs.update(variable_attributes) # # da_flag = xr.DataArray(written_flags, coords=date_summations, dims=["time"]) # da_flag = da_flag.rename("flag") # flag_attributes = dict( # long_name="data flag", # note="See ECCC technical documentation for details", # ) # da_flag.attrs.update(flag_attributes) # # ds[nc_name] = da_val # ds["flag"] = da_flag # # # save the file in NetCDF format # start_year = ds.time.dt.year.values[0] # end_year = ds.time.dt.year.values[-1] # # station_folder = output_path.joinpath(str(code)) # station_folder.mkdir(parents=True, exist_ok=True) # # f_nc = ( # f"{code}_{variable_code}_{nc_name}_" # f"{start_year if start_year == end_year else '_'.join([str(start_year), str(end_year)])}.nc" # ) # # if station_folder.joinpath(f_nc).exists(): # logging.warning(f"File `{f_nc}` already exists. Continuing...") # # history = ( # f"{dt.now().strftime('%Y-%m-%d %X')} converted from flat station file " # f"(`{file.name}`) to n-dimensional array." # ) # # # TODO: This info should eventually be sourced from a JSON definition # global_attrs = dict( # Conventions="CF-1.8", # comment="Acquired on demand from data specialists at " # "ECCC Climate Services / Services Climatiques.", # contact="John Richard", # contact_email="climatcentre-climatecentral@ec.gc.ca", # domain="CAN", # ) # if mode == "hourly": # global_attrs.update(dict(frequency="1hr")) # elif mode == "daily": # global_attrs.update(dict(frequency="day")) # global_attrs.update( # dict( # history=history, # internal_comment=f"Converted by {os.environ.get('USER', os.environ.get('USERNAME'))}.", # institution="ECCC", # license="https://climate.weather.gc.ca/prods_servs/attachment1_e.html", # member=code, # processing_level="raw", # redistribution="Redistribution permitted.", # references="https://climate.weather.gc.ca/doc/Technical_Documentation.pdf", # source="historical-station-records", # table_date=TABLE_DATE, # title="Environment and Climate Change Canada (ECCC) weather station observations", # type="station-obs", # usage="The original data is owned by the Government of Canada (Environment and Climate " # "Change Canada), and falls under the licence agreement for use of Environment and " # "Climate Change Canada data", # variable=str(nc_name), # version=f"v{dt.now().strftime('%Y.%m.%V')}", # Year.Month.Week # ) # ) # ds.attrs.update(global_attrs) # # logging.info(f"Exporting to: {station_folder.joinpath(f_nc)}") # ds.to_netcdf(station_folder.joinpath(f_nc)) # del ds # del val # del mask # del flag # del da_val # del da_flag # del dfd # del dff # del written_values # del written_flags # del date_summations # # del df
[docs] def merge_stations( source_files: str | os.PathLike | None = None, output_folder: str | os.PathLike | None = None, *, time_step: str, variables: str | int | list[str | int] | None = None, include_flags: bool = True, groupings: int | None = None, mf_dataset_freq: str | None = None, temp_directory: str | os.PathLike | None = None, n_workers: int = 1, ) -> None: """ Merge stations. Parameters ---------- source_files : str or Path Source files to be aggregated. output_folder : str or Path Output folder for the aggregated files. variables : str or int or list of str or int, optional The variable codes to be aggregated. time_step : {"hourly", "daily"} The time step to be used for aggregation. include_flags : bool Include flags in the output files. groupings : int The number of files in each group used for converting to multi-file Datasets. mf_dataset_freq : str, optional Resampling frequency for creating output multi-file Datasets. E.g. 'YS': 1 year per file, '5YS': 5 years per file. temp_directory : str or Path, optional Use another temporary directory location in case default location is not spacious enough. n_workers : int The number of workers to use. Returns ------- None """ func_time = time.time() if isinstance(source_files, str): source_files = Path(source_files) if time_step.lower() in ["h", "hour", "hourly"]: mode = "hourly" elif time_step.lower() in ["d", "day", "daily"]: mode = "daily" else: raise ValueError("Time step must be `h` / `hourly` or `d` / `daily`.") if isinstance(variables, list): pass elif isinstance(variables, (str, int)): variables = [variables] # TODO: have the variable gathered from a JSON file elif variables is None: if mode == "hourly": variables = [ 89, 94, 123, ] variables.extend(range(76, 81)) variables.extend(range(262, 281)) elif mode == "daily": variables = [1, 2, 3] variables.extend(range(10, 26)) else: raise NotImplementedError() for variable_code in variables: info = load_json_data_mappings("eccc-obs")["variables"][variable_code] variable_name = info["cf_variable_name"] msg = f"Merging `{variable_name}` using `{time_step}` time step." logging.info(msg) # Only perform aggregation on available data with corresponding metadata logging.info("Performing glob and sort.") nc_list = [str(nc) for nc in source_files.joinpath(variable_name).rglob("*.nc")] if not groupings: groupings = max(n_workers**2, 4) if nc_list: with tempfile.TemporaryDirectory(prefix="eccc", dir=temp_directory) as temp_dir: combinations = sorted((ii, nc, temp_dir, len(nc_list)) for ii, nc in enumerate(nc_list)) with mp.Pool(processes=n_workers) as pool: pool.starmap(_tmp_zarr, combinations) pool.close() pool.join() zarrs_found = [f for f in Path(temp_dir).glob("*.zarr")] msg = f"Found {len(zarrs_found)} intermediary aggregation files." logging.info(msg) ds = xr.open_mfdataset( zarrs_found, engine="zarr", combine="nested", concat_dim={"station"}, ) if ds: station_file_codes = [Path(x).name.split("_")[0] for x in nc_list] if not include_flags: drop_vars = [vv for vv in ds.data_vars if "flag" in vv] ds = ds.drop_vars(drop_vars) ds = ds.sortby(ds.station_id, "time") # Rearrange column order to have lon, lat, elev first # # FIXME: This doesn't work as intended - Assign coordinates instead # cols = meta.columns.tolist() # cols1 = [ # "latitude", # "longitude", # "elevation", # ] # for rr in cols1: # cols.remove(rr) # cols1.extend(cols) # meta = meta[cols1] # meta.index.rename("station", inplace=True) # meta = meta.to_xarray() # meta.sortby(meta["climate_identifier"]) # meta = meta.assign({"station": ds.station.values}) # np.testing.assert_array_equal( # sorted(meta["climate_identifier"].values), sorted(ds.station_id.values) # ) # for vv in meta.data_vars: # ds = ds.assign_coords({vv: meta[vv]}) # ds = xr.merge([ds, meta]) # ds.attrs = attrs1 # export done within tmddir context otherwise data is erased before final export!! valid_stations = list(sorted(ds.station_id.values)) valid_stations_count = len(valid_stations) msg = f"Processing stations for variable `{variable_name}`." logging.info(msg) if len(station_file_codes) == 0: msg = f"No stations were found containing variable filename `{variable_name}`. Exiting." logging.error(msg) return msg = f"Files exist for {len(station_file_codes)} ECCC stations. Metadata found for {valid_stations_count} stations. " logging.info(msg) # FIXME: Is this still needed? # logging.info("Preparing the NetCDF time period.") # Create the time period timestamps # year_start = ds.time.dt.year.min().values # year_end = ds.time.dt.year.max().values # Calculate the time index dimensions of the output NetCDF # time_index = pd.date_range( # start=f"{year_start}-01-01", # end=f"{year_end + 1}-01-01", # freq=mode[0].capitalize(), # )[:-1] # logging.info( # f"Number of ECCC stations: {valid_stations_count}, time steps: {time_index.size}." # ) Path(output_folder).mkdir(parents=True, exist_ok=True) file_out = Path(output_folder).joinpath(f"{variable_name}_eccc_{mode}") ds = ds.assign_coords(station=range(0, len(ds.station))).sortby("time") if mf_dataset_freq is not None: # output mf_dataset using resampling frequency _, datasets = zip(*ds.resample(time=mf_dataset_freq), strict=False) else: datasets = [ds] paths = [f"{file_out}_{data.time.dt.year.min().values}-{data.time.dt.year.max().values}.nc" for data in datasets] # FIXME: chunks need to be dealt with # chunks = [1, len(ds.time)] # comp = dict(zlib=True, complevel=5) # , chunk sizes=chunks) with ProgressBar(): # FIXME: looping seems to cause increasing memory over time use a pool of one or 2?? # for dataset, path in zip(datasets, paths): # _export_agg_nc(dataset,path) combs = zip(datasets, paths, strict=False) pool = mp.Pool(2) pool.map(_export_agg_nc, combs) pool.close() pool.join() ds.close() del ds else: msg = f"No files found for variable: `{variable_name}`." logging.info(msg) runtime = f"Process completed in {time.time() - func_time:.2f} seconds." logging.warning(runtime)
def _export_agg_nc(args): dataset, path = args comp = dict(zlib=True, complevel=5) encoding = {var: comp for var in dataset.data_vars} dataset.load().to_netcdf( path, engine="h5netcdf", format="NETCDF4_CLASSIC", encoding=encoding, ) dataset.close() del dataset def _tmp_zarr( iterable: int, nc: list[str | os.PathLike], tempdir: str | os.PathLike, group: int | None = None, ) -> None: msg = f"Processing batch of files {iterable + 1}{' of ' + str(group) if group is not None else ''}." logging.info(msg) station_file_codes = [Path(x).name.split("_")[0] for x in nc] try: ds = xr.open_mfdataset(nc, combine="nested", concat_dim="station", preprocess=_remove_duplicates) except ValueError as e: errored_nc_files = ", ".join([Path(f).name for f in nc]) msg = f"Issues found with the following files: [{errored_nc_files}]: {e}" logging.error(msg) return ds = ds.assign_coords(station_id=xr.DataArray(station_file_codes, dims="station").astype(str)) if "flag" in ds.data_vars: ds1 = ds.drop_vars("flag").copy(deep=True) ds1["flag"] = ds.flag.astype(str) ds = ds1 with ProgressBar(): ds.load().to_zarr( Path(tempdir).joinpath(f"{str(iterable).zfill(4)}.zarr"), ) del ds def _combine_years( station_folder: str, varia: str, out_folder: str | os.PathLike, meta_file: str | os.PathLike, rejected: list[str], _verbose: bool = False, ) -> None: nc_files = sorted(list(Path(station_folder).glob("*.nc"))) if len(nc_files): msg = f"Found {len(nc_files)} files for station code {Path(station_folder).name}." logging.info(msg) else: msg = f"No readings found for station code {Path(station_folder).name}. Continuing..." logging.warning(msg) return # Remove range files if years are all present, otherwise default to range_file. years_found = dict() range_files_found = dict() years_parsed = True for f in nc_files: groups = re.findall(r"_\d{4}", f.stem) if len(groups) == 1: year = int(groups[0].strip("_")) years_found[year] = f elif len(groups) == 2: year_start, year_end = int(groups[0].strip("_")), int(groups[1].strip("_")) range_files_found[f] = set(range(year_start, year_end)) else: msg = "Years unable to be effectively parsed from series. Continuing with xarray solver..." logging.warning(msg) years_parsed = False break if years_parsed: if len(range_files_found) > 0: msg = f"Overlapping single-year and multi-year files found for station code {station_folder}. Removing overlaps." logging.warning(msg) for ranged_file, years in range_files_found.items(): if years.issubset(years_found.values()): nc_files.remove(ranged_file) else: missing_years = [] for y in years: try: nc_files.remove(years_found[y]) except (KeyError, ValueError): # noqa: PERF203 missing_years.append(str(y)) continue if missing_years: msg = f"Missing years {', '.join(missing_years)} from multi-year file {ranged_file}. " logging.warning(msg) year_range = min(years_found.keys()), max(years_found.keys()) msg = f"Year(s) covered: {year_range[0]}{'-' + str(year_range[1]) if year_range[0] != year_range[1] else ''}. " logging.info(msg) if _verbose: msg = f"Opening: {', '.join([p.name for p in nc_files])}" logging.info(msg) ds = xr.open_mfdataset(nc_files, combine="nested", concat_dim="time") outfile = Path(out_folder).joinpath( f"{nc_files[0].name.split(f'_{varia}_')[0]}_{varia}_{ds.time.dt.year.min().values}-{ds.time.dt.year.max().values}.nc" ) df_inv = xr.open_dataset(meta_file) station_id = ds.attrs["member"] meta = df_inv.isel(index=df_inv.CLIMATE_IDENTIFIER == station_id) meta = meta.rename({"index": "station", "CLIMATE_IDENTIFIER": "station_id"}) try: meta = meta.assign_coords(station=[0]) except ValueError: rejected.append(Path(station_folder).name) msg = f"Something went wrong at the assign_coords step for station {station_folder}. Continuing..." logging.error(msg) return if len(meta.indexes) > 1: raise ValueError("Found more than 1 station.") elif len(meta.indexes) == 0: rejected.append(Path(station_folder).name) msg = f"No metadata found for station code {station_folder}. Continuing..." logging.warning(msg) return keep_coords = [ "time", "station", "station_id", "latitude", "longitude", "elevation", ] for vv in meta.data_vars: if str(vv).lower() not in keep_coords: continue ds = ds.assign_coords({str(vv).lower(): meta[vv]}) for vv in ds.data_vars: if ds[vv].dtype == "O": ds[vv] = ds[vv].astype(str) if not outfile.exists(): msg = f"Merging to {outfile.name}." logging.info(msg) comp = dict(zlib=True, complevel=5) encoding = {data_var: comp for data_var in ds.data_vars} encoding["time"] = {"dtype": "single"} with ProgressBar(): ds.to_netcdf( outfile, engine="h5netcdf", format="NETCDF4_CLASSIC", encoding=encoding, ) else: msg = f"Files exist for {outfile.name}. Continuing..." logging.info(msg)
[docs] def merge_converted_variables( source_files: str | os.PathLike, output_folder: str | os.PathLike, variables: str | int | list[str | int] | None = None, overwrite: bool = False, n_workers: int = 1, ) -> None: """ Merge converted variables into a single file per variable. Parameters ---------- source_files : str, Path output_folder : str, Path variables : str or int or list of str or int, optional overwrite : bool n_workers : int Returns ------- None """ meta = load_json_data_mappings("eccc-obs") metadata_file = Path(tempfile.NamedTemporaryFile(suffix=".nc", delete=False).name) meta.to_netcdf(metadata_file) if isinstance(source_files, str): source_files = Path(source_files) if isinstance(output_folder, str): output_folder = Path(output_folder) selected_variables = list() if variables is not None: if not isinstance(variables, list): variables = [variables] selected_variables.extend(meta[var] for var in variables) variables_found = [x.name for x in source_files.iterdir() if x.is_dir()] if selected_variables: variables_found = [x for x in variables_found if x in [item["nc_name"] for item in selected_variables]] for variable in variables_found: msg = f"Merging files found for variable: `{variable}`." logging.info(msg) station_dirs = [x for x in source_files.joinpath(variable).iterdir() if x.is_dir()] msg = f"Number of stations found: {len(station_dirs)}." logging.info(msg) output_rep = output_folder.joinpath(variable) Path(output_rep).mkdir(parents=True, exist_ok=True) if (len(list(output_rep.iterdir())) >= (len(meta.CLIMATE_IDENTIFIER) * 0.75)) and not overwrite: msg = ( f"Variable {variable} appears to have already been converted. Will be skipped. " f"To force conversion of this variable, set `overwrite=True`." ) logging.warning(msg) continue manager = mp.Manager() rejected_stations = manager.list() combine_func = functools.partial( _combine_years, varia=variable, out_folder=output_rep, meta_file=metadata_file, rejected=rejected_stations, ) with mp.Pool(processes=n_workers) as pool: pool.map(combine_func, station_dirs) pool.close() pool.join() if rejected_stations: msg = f"Rejected station codes are the following: {', '.join(rejected_stations)}." logging.warning(msg)