# S.Biner, Ouranos, mai 2019
# methodologie
# 1) on rassemble les fichiers netcdf des differentes eccc en un seul fichier netCDF.
# 2) on scan les fichiers sources annuels en cherchant une variable et on sauve
# ce qu'on trouve dans des fichiers netcdf. On applique aussi les flags
# et on fait les changements d'unites
# obtenu via en cliquant sur 'about the data'
from __future__ import annotations

import contextlib
import functools
import logging
import multiprocessing as mp
import os
import re
import sys
import tempfile
import time
from calendar import monthrange
from datetime import datetime as dt
from logging import config
from pathlib import Path
from urllib.error import HTTPError

import dask.dataframe as dd
import numpy as np
import pandas as pd
import xarray as xr
from dask.diagnostics import ProgressBar
from xclim.core.units import convert_units_to

from miranda.archive import group_by_length
from miranda.scripting import LOGGING_CONFIG
from import file_size, report_file_size
from miranda.units import GiB, MiB
from miranda.utils import generic_extract_archive

from ._utils import cf_station_metadata


__all__ = [

TABLE_DATE ="%d %B %Y")

def load_station_metadata(meta: str | os.PathLike) -> xr.Dataset:
    if meta:
        df_inv = pd.read_csv(meta, header=0)
            import geopandas as gpd

            station_metadata_url = ""
            df_inv = gpd.read_file(station_metadata_url)
        except HTTPError as err:
            raise RuntimeError(
                f"Station metadata table unable to be fetched. Considering downloading directly: {err}"
    df_inv["LONGITUDE"] = df_inv.geometry.x
    df_inv["LATITUDE"] = df_inv.geometry.y
    df_inv["ELEVATION"] = df_inv.ELEVATION.astype(float)
    df_inv["CLIMATE_IDENTIFIER"] = df_inv["CLIMATE_IDENTIFIER"].astype(str)

    df_inv = df_inv.drop(["geometry"], axis=1)
    return df_inv.to_xarray()

def _remove_duplicates(ds):
    if any(ds.get_index("time").duplicated()):
            f"Found {ds.get_index('time').duplicated().sum()} duplicated time coordinates "
            f"for station {ds.station_id.values}. Assuming first value."
    return ds.sel(time=~ds.get_index("time").duplicated())

def _convert_station_file(
    fichier: Path,
    output_path: Path,
    errored_files: list[Path],
    mode: str,
    add_offset: float,
    column_dtypes: list[str],
    column_names: list[str],
    long_name: str,
    missing_flags: set[str],
    missing_values: set[str],
    nc_name: str,
    raw_units: str,
    units: str,
    scale_factor: float,
    standard_name: str,
    variable_code: str,
    if mode.lower() in ["h", "hour", "hourly"]:
        num_observations = 24
        column_widths = [7, 4, 2, 2, 3] + [6, 1] * num_observations
    elif mode.lower() in ["d", "day", "daily"]:
        num_observations = 31
        column_widths = [7, 4, 2, 3] + [6, 1] * num_observations
        raise NotImplementedError("`mode` must be 'h'/'hourly or 'd'/'daily'.")

    if not missing_values:
        missing_values = {-9999, "#####"}

    with tempfile.TemporaryDirectory() as temp_folder:
        if fichier.suffix in [".gz", ".tar", ".zip", ".7z"]:
            data_files = generic_extract_archive(fichier, output_dir=temp_folder)
            data_files = [fichier]"Processing file: {fichier}.")

        size_limit = 1 * GiB

        for data in data_files:
            if file_size(data) > size_limit and "dask" in sys.modules:
                    f"File exceeds {report_file_size(size_limit)} - Using dask.dataframes."
                pandas_reader = dd
                using_dask_array = True
                chunks = dict(blocksize=200 * MiB)
                client = ProgressBar
                    f"File below {report_file_size(size_limit)} - Using pandas.dataframes."
                pandas_reader = pd
                chunks = dict()
                using_dask_array = False
                client = contextlib.nullcontext

            with client() as c:
                # Create a dataframe from the files
                    df = pandas_reader.read_fwf(
                            name: data_type
                            for name, data_type in zip(column_names, column_dtypes)
                    if using_dask_array:
                        df = c.persist(df)

                except FileNotFoundError:
                    logging.error(f"File {data} was not found.")

                except (UnicodeDecodeError, Exception) as e:
                        f"File {} was unable to be read. "
                        f"This is probably an issue with the file: {e}"

                # Loop through the station codes
                station_codes = df["code"].unique()
                for code in station_codes:
                    df_code = df[df["code"] == code]

                    # Abort if the variable is not found
                    if using_dask_array:
                        has_variable_codes = (
                            (df_code["code_var"] == variable_code).compute()
                        has_variable_codes = (
                            df_code["code_var"] == variable_code
                    if not has_variable_codes:
                            f"Variable `{nc_name}` not found for station code: {code} in file {data}. Continuing..."

                    # Perform the data treatment
          "Converting `{nc_name}` for station code: {code}")

                    # Dump the data into a DataFrame
                    df_var = df_code[df_code["code_var"] == variable_code].copy()

                    # Mask the data according to the missing values flag
                    df_var = df_var.replace(missing_values, np.nan)

                    # Decode the values and flags
                    dfd = df_var.loc[
                        :, [f"D{i:0n}" for i in range(1, num_observations + 1)]
                    dff = df_var.loc[
                        :, [f"F{i:0n}" for i in range(1, num_observations + 1)]

                    # Remove the "NaN" flag
                    dff = dff.fillna("")

                    # Use the flag to mask the values
                        val = np.asarray(dfd.values, float)
                    except ValueError as e:
                        logging.error(f"{e} raised from {dfd}, continuing...")
                        flag = np.asarray(dff.values, str)
                    except ValueError as e:
                        logging.error(f"{e} raised from {dff}, continuing...")
                    mask = np.isin(flag, missing_flags)
                    val[mask] = np.nan

                    # Treat according to units conversions
                    val = val * scale_factor + add_offset

                    # Create the DataArray
                    date_summations = dict(time=list())
                    if mode == "hourly":
                        for index, row in df_var.iterrows():
                            period = pd.Period(
                                year=row.year, month=row.month,, freq="D"
                            dates = pd.Series(
                        written_values = val.flatten()
                        written_flags = flag.flatten()
                    elif mode == "daily":
                        value_days = list()
                        flag_days = list()
                        for i, (index, row) in enumerate(df_var.iterrows()):
                            period = pd.Period(year=row.year, month=row.month, freq="M")
                            dates = pd.Series(

                                    range(monthrange(int(row.year), int(row.month))[1])
                                    range(monthrange(int(row.year), int(row.month))[1])
                        written_values = value_days
                        written_flags = flag_days

                    ds = xr.Dataset()
                    da_val = xr.DataArray(
                        written_values, coords=date_summations, dims=["time"]

                    if raw_units != units:
                        da_val.attrs["units"] = raw_units
                        da_val = convert_units_to(da_val, units)
                        da_val.attrs["units"] = units

                    da_val = da_val.rename(nc_name)
                    variable_attributes = dict(
                    if "original_units" in kwargs:
                        variable_attributes["original_units"] = kwargs["original_units"]

                    da_flag = xr.DataArray(
                        written_flags, coords=date_summations, dims=["time"]
                    da_flag = da_flag.rename("flag")
                    flag_attributes = dict(
                        long_name="data flag",
                        note="See ECCC technical documentation for details",

                    ds[nc_name] = da_val
                    ds["flag"] = da_flag

                    # save the file in NetCDF format
                    start_year = ds.time.dt.year.values[0]
                    end_year = ds.time.dt.year.values[-1]

                    station_folder = output_path.joinpath(str(code))
                    station_folder.mkdir(parents=True, exist_ok=True)

                    f_nc = (
                        f"{start_year if start_year == end_year else '_'.join([str(start_year), str(end_year)])}.nc"

                    if station_folder.joinpath(f_nc).exists():
                        logging.warning(f"File `{f_nc}` already exists. Continuing...")

                    history = (
                        f"{'%Y-%m-%d %X')} converted from flat station file "
                        f"(`{}`) to n-dimensional array."

                    # TODO: This info should eventually be sourced from a JSON definition
                    global_attrs = dict(
                        comment="Acquired on demand from data specialists at "
                        "ECCC Climate Services / Services Climatiques.",
                        contact="John Richard",
                    if mode == "hourly":
                    elif mode == "daily":
                            internal_comment=f"Converted by {os.environ.get('USER', os.environ.get('USERNAME'))}.",
                            redistribution="Redistribution permitted.",
                            title="Environment and Climate Change Canada (ECCC) weather station observations",
                            usage="The original data is owned by the Government of Canada (Environment and Climate "
                            "Change Canada), and falls under the licence agreement for use of Environment and "
                            "Climate Change Canada data",
                            version=f"v{'%Y.%m.%V')}",  # Year.Month.Week

          "Exporting to: {station_folder.joinpath(f_nc)}")
                    del ds
                    del val
                    del mask
                    del flag
                    del da_val
                    del da_flag
                    del dfd
                    del dff
                    del written_values
                    del written_flags
                    del date_summations

                del df

        if os.listdir(temp_folder):
            for temporary_file in Path(temp_folder).glob("*"):
                if temporary_file in data_files:

[docs] def convert_flat_files( source_files: str | os.PathLike, output_folder: str | os.PathLike | list[str | int], variables: str | int | list[str | int], mode: str = "hourly", n_workers: int = 4, ) -> None: """ Parameters ---------- source_files: str or Path output_folder: str or Path variables: str or List[str] mode: {"hourly", "daily"} n_workers: int Returns ------- None """ func_time = time.time() if mode.lower() in ["h", "hour", "hourly"]: num_observations = 24 column_names = ["code", "year", "month", "day", "code_var"] column_dtypes = [str, float, float, float, str] elif mode.lower() in ["d", "day", "daily"]: num_observations = 31 column_names = ["code", "year", "month", "code_var"] column_dtypes = [str, float, float, str] else: raise NotImplementedError("`mode` must be 'h'/'hourly or 'd'/'daily'.") # Preparing the data column headers for i in range(1, num_observations + 1): data_entry, flag_entry = f"D{i:0n}", f"F{i:0n}" column_names.append(data_entry) column_names.append(flag_entry) column_dtypes.extend([str, str]) if isinstance(variables, (str, int)): variables = [variables] for variable_code in variables: variable_code = str(variable_code).zfill(3) metadata = cf_station_metadata(variable_code) nc_name = metadata["nc_name"] rep_nc = Path(output_folder).joinpath(nc_name) rep_nc.mkdir(parents=True, exist_ok=True) # Loop on the files f"Collecting files for variable '{metadata['standard_name']}' " f"(filenames containing '{metadata['_table_name']}')." ) list_files = list() if isinstance(source_files, list) or Path(source_files).is_file(): list_files.append(source_files) else: glob_patterns = [g for g in metadata["_table_name"]] for pattern in glob_patterns: list_files.extend( [f for f in Path(source_files).rglob(f"{pattern}*") if f.is_file()] ) manager = mp.Manager() errored_files = manager.list() converter_func = functools.partial( _convert_station_file, output_path=rep_nc, errored_files=errored_files, mode=mode, variable_code=variable_code, column_names=column_names, column_dtypes=column_dtypes, **metadata, ) with mp.Pool(processes=n_workers) as pool:, list_files) pool.close() pool.join() if errored_files: logging.warning( "Some files failed to be properly parsed:\n", ", ".join(errored_files) ) logging.warning(f"Process completed in {time.time() - func_time:.2f} seconds")
[docs] def aggregate_stations( source_files: str | os.PathLike | None = None, output_folder: str | os.PathLike | None = None, time_step: str = None, variables: str | int | list[str | int] | None = None, include_flags: bool = True, groupings: int | None = None, mf_dataset_freq: str | None = None, temp_directory: str | os.PathLike | None = None, n_workers: int = 1, ) -> None: """ Parameters ---------- source_files: str or Path output_folder: str or Path variables: str or int or list of str or int, optional time_step: {"hourly", "daily"} include_flags: bool groupings: int The number of files in each group used for converting to multi-file Datasets. mf_dataset_freq: str, optional Resampling frequency for creating output multi-file Datasets. E.g. 'YS': 1 year per file, '5YS': 5 years per file. temp_directory: str or Path, optional Use another temporary directory location in case default location is not spacious enough. n_workers: int Returns ------- None """ func_time = time.time() if isinstance(source_files, str): source_files = Path(source_files) if time_step.lower() in ["h", "hour", "hourly"]: mode = "hourly" elif time_step.lower() in ["d", "day", "daily"]: mode = "daily" else: raise ValueError("Time step must be `h` / `hourly` or `d` / `daily`.") if isinstance(variables, list): pass elif isinstance(variables, (str, int)): variables = [variables] # TODO: have the variable gathered from a JSON file elif variables is None: if mode == "hourly": variables = [ 89, 94, 123, ] variables.extend(range(76, 81)) variables.extend(range(262, 281)) elif mode == "daily": variables = [1, 2, 3] variables.extend(range(10, 26)) else: raise NotImplementedError() for variable_code in variables: info = cf_station_metadata(variable_code) variable_name = info["nc_name"]"Merging `{variable_name}` using `{time_step}` time step.") # Only perform aggregation on available data with corresponding metadata"Performing glob and sort.") nc_list = [str(nc) for nc in source_files.joinpath(variable_name).rglob("*.nc")] if not groupings: groupings = max(n_workers**2, 4) if nc_list: nc_lists = group_by_length(nc_list, groupings) with tempfile.TemporaryDirectory( prefix="eccc", dir=temp_directory ) as temp_dir: combinations = sorted( (ii, nc, temp_dir, len(nc_lists)) for ii, nc in enumerate(nc_lists) ) with mp.Pool(processes=n_workers) as pool: pool.starmap(_tmp_zarr, combinations) pool.close() pool.join() zarrs_found = [f for f in Path(temp_dir).glob("*.zarr")] f"Found {len(zarrs_found)} intermediary aggregation files." ) ds = xr.open_mfdataset( zarrs_found, engine="zarr", combine="nested", concat_dim={"station"}, ) if ds: station_file_codes = [Path(x).name.split("_")[0] for x in nc_list] if not include_flags: drop_vars = [vv for vv in ds.data_vars if "flag" in vv] ds = ds.drop_vars(drop_vars) ds = ds.sortby(ds.station_id, "time") # Rearrange column order to have lon, lat, elev first # # FIXME: This doesn't work as intended - Assign coordinates instead # cols = meta.columns.tolist() # cols1 = [ # "latitude", # "longitude", # "elevation", # ] # for rr in cols1: # cols.remove(rr) # cols1.extend(cols) # meta = meta[cols1] # meta.index.rename("station", inplace=True) # meta = meta.to_xarray() # meta.sortby(meta["climate_identifier"]) # meta = meta.assign({"station": ds.station.values}) # np.testing.assert_array_equal( # sorted(meta["climate_identifier"].values), sorted(ds.station_id.values) # ) # for vv in meta.data_vars: # ds = ds.assign_coords({vv: meta[vv]}) # ds = xr.merge([ds, meta]) # ds.attrs = attrs1 # export done within tmddir context otherwise data is erased before final export!! valid_stations = list(sorted(ds.station_id.values)) valid_stations_count = len(valid_stations)"Processing stations for variable `{variable_name}`.") if len(station_file_codes) == 0: logging.error( f"No stations were found containing variable filename `{variable_name}`. Exiting." ) return f"Files exist for {len(station_file_codes)} ECCC stations. " f"Metadata found for {valid_stations_count} stations. " ) # FIXME: Is this still needed? #"Preparing the NetCDF time period.") # Create the time period timestamps # year_start = ds.time.dt.year.min().values # year_end = ds.time.dt.year.max().values # Calculate the time index dimensions of the output NetCDF # time_index = pd.date_range( # start=f"{year_start}-01-01", # end=f"{year_end + 1}-01-01", # freq=mode[0].capitalize(), # )[:-1] # # f"Number of ECCC stations: {valid_stations_count}, time steps: {time_index.size}." # ) Path(output_folder).mkdir(parents=True, exist_ok=True) file_out = Path(output_folder).joinpath(f"{variable_name}_eccc_{mode}") ds = ds.assign_coords(station=range(0, len(ds.station))).sortby("time") if mf_dataset_freq is not None: # output mf_dataset using resampling frequency _, datasets = zip(*ds.resample(time=mf_dataset_freq)) else: datasets = [ds] paths = [ f"{file_out}_{data.time.dt.year.min().values}-{data.time.dt.year.max().values}.nc" for data in datasets ] # FIXME: chunks need to be dealt with # chunks = [1, len(ds.time)] # comp = dict(zlib=True, complevel=5) # , chunk sizes=chunks) with ProgressBar(): # FIXME: looping seems to cause increasing memory over time use a pool of one or 2?? # for dataset, path in zip(datasets, paths): # _export_agg_nc(dataset,path) combs = zip(datasets, paths) pool = mp.Pool(2), combs) pool.close() pool.join() ds.close() del ds else:"No files found for variable: `{variable_name}`.") runtime = f"Process completed in {time.time() - func_time:.2f} seconds" logging.warning(runtime)
def _export_agg_nc(args): dataset, path = args comp = dict(zlib=True, complevel=5) encoding = {var: comp for var in dataset.data_vars} dataset.load().to_netcdf( path, engine="h5netcdf", format="NETCDF4_CLASSIC", encoding=encoding, ) dataset.close() del dataset def _tmp_zarr( iterable: int, nc: list[str | os.PathLike], tempdir: str | os.PathLike, group: int | None = None, ) -> None: f"Processing batch of files {iterable + 1}" f"{' of ' + str(group) if group is not None else ''}." ) station_file_codes = [Path(x).name.split("_")[0] for x in nc] try: ds = xr.open_mfdataset( nc, combine="nested", concat_dim={"station"}, preprocess=_remove_duplicates ) except ValueError as e: errored_nc_files = ", ".join([Path(f).name for f in nc]) logging.error( f"Issues found with the following files: [{errored_nc_files}]: {e}" ) return ds = ds.assign_coords( station_id=xr.DataArray(station_file_codes, dims="station").astype(str) ) if "flag" in ds.data_vars: ds1 = ds.drop_vars("flag").copy(deep=True) ds1["flag"] = ds.flag.astype(str) ds = ds1 with ProgressBar(): ds.load().to_zarr( Path(tempdir).joinpath(f"{str(iterable).zfill(4)}.zarr"), ) del ds def _combine_years( station_folder: str, varia: str, out_folder: str | os.PathLike, meta_file: str | os.PathLike, rejected: list[str], _verbose: bool = False, ) -> None: nc_files = sorted(list(Path(station_folder).glob("*.nc"))) if len(nc_files): f"Found {len(nc_files)} files for station code {Path(station_folder).name}." ) else: logging.warning( f"No readings found for station code {Path(station_folder).name}. Continuing..." ) return # Remove range files if years are all present, otherwise default to range_file. years_found = dict() range_files_found = dict() years_parsed = True for f in nc_files: groups = re.findall(r"_\d{4}", f.stem) if len(groups) == 1: year = int(groups[0].strip("_")) years_found[year] = f elif len(groups) == 2: year_start, year_end = int(groups[0].strip("_")), int(groups[1].strip("_")) range_files_found[f] = set(range(year_start, year_end)) else: logging.warning( "Years unable to be effectively parsed from series. Continuing with xarray solver..." ) years_parsed = False break if years_parsed: if len(range_files_found) > 0: logging.warning( f"Overlapping single-year and multi-year files found for station code {station_folder}. " "Removing overlaps." ) for ranged_file, years in range_files_found.items(): if years.issubset(years_found.values()): nc_files.remove(ranged_file) else: for y in years: try: nc_files.remove(years_found[y]) except (KeyError, ValueError): continue year_range = min(years_found.keys()), max(years_found.keys()) "Year(s) covered: " f"{year_range[0]}{'-' + str(year_range[1]) if year_range[0] != year_range[1] else ''}. " ) if _verbose:"Opening: {', '.join([ for p in nc_files])}") ds = xr.open_mfdataset(nc_files, combine="nested", concat_dim={"time"}) outfile = Path(out_folder).joinpath( f'{nc_files[0].name.split(f"_{varia}_")[0]}_{varia}_' f"{ds.time.dt.year.min().values}-{ds.time.dt.year.max().values}.nc" ) df_inv = xr.open_dataset(meta_file) station_id = ds.attrs["member"] meta = df_inv.isel(index=df_inv.CLIMATE_IDENTIFIER == station_id) meta = meta.rename({"index": "station", "CLIMATE_IDENTIFIER": "station_id"}) try: meta = meta.assign_coords(station=[0]) except ValueError: rejected.append(Path(station_folder).name) logging.error( f"Something went wrong at the assign_coords step for station {station_folder}. Continuing..." ) return if len(meta.indexes) > 1: raise ValueError("Found more than 1 station.") elif len(meta.indexes) == 0: rejected.append(Path(station_folder).name) logging.warning( f"No metadata found for station code {station_folder}. Continuing..." ) return keep_coords = [ "time", "station", "station_id", "latitude", "longitude", "elevation", ] for vv in meta.data_vars: if vv.lower() not in keep_coords: continue ds = ds.assign_coords({vv.lower(): meta[vv]}) for vv in ds.data_vars: if ds[vv].dtype == "O": ds[vv] = ds[vv].astype(str) if not outfile.exists():"Merging to {}") comp = dict(zlib=True, complevel=5) encoding = {data_var: comp for data_var in ds.data_vars} encoding["time"] = dict(dtype="single") with ProgressBar(): ds.to_netcdf( outfile, engine="h5netcdf", format="NETCDF4_CLASSIC", encoding=encoding, ) else:"Files exist for {}. Continuing...")
[docs] def merge_converted_variables( source_files: str | os.PathLike, output_folder: str | os.PathLike, variables: str | int | list[str | int] | None = None, station_metadata: str | os.PathLike | None = None, overwrite: bool = False, n_workers: int = 1, ) -> None: """ Parameters ---------- source_files: str, Path output_folder: str, Path variables: str or int or list of str or int, optional station_metadata: str or Path, optional overwrite: bool n_workers: int Returns ------- None """ meta = load_station_metadata(station_metadata) metadata_file = Path(tempfile.NamedTemporaryFile(suffix=".nc", delete=False).name) meta.to_netcdf(metadata_file) if isinstance(source_files, str): source_files = Path(source_files) if isinstance(output_folder, str): output_folder = Path(output_folder) selected_variables = list() if variables is not None: if not isinstance(variables, list): variables = [variables] for var in variables: selected_variables.append(cf_station_metadata(var)) variables_found = [ for x in source_files.iterdir() if x.is_dir()] if selected_variables: variables_found = [ x for x in variables_found if x in [item["nc_name"] for item in selected_variables] ] for variable in variables_found:"Merging files found for variable: `{variable}`.") station_dirs = [ x for x in source_files.joinpath(variable).iterdir() if x.is_dir() ]"Number of stations found: {len(station_dirs)}.") output_rep = output_folder.joinpath(variable) Path(output_rep).mkdir(parents=True, exist_ok=True) if ( len(list(output_rep.iterdir())) >= (len(meta.CLIMATE_IDENTIFIER) * 0.75) ) and not overwrite: logging.warning( f"Variable {variable} appears to have already been converted. Will be skipped. " f"To force conversion of this variable, set `overwrite=True`." ) continue manager = mp.Manager() rejected_stations = manager.list() combine_func = functools.partial( _combine_years, varia=variable, out_folder=output_rep, meta_file=metadata_file, rejected=rejected_stations, ) with mp.Pool(processes=n_workers) as pool:, station_dirs) pool.close() pool.join() if rejected_stations: logging.warning( f"Rejected station codes are the following: {', '.join(rejected_stations)}." )