Source code for miranda.convert.melcc

"""MELCC (Québec) Weather Stations data conversion module."""

from __future__ import annotations
import datetime as dt
import logging
import os
import re
import subprocess  # noqa: S404
import warnings
from argparse import ArgumentParser
from collections import defaultdict
from collections.abc import Sequence
from io import StringIO
from pathlib import Path
from typing import Any

import numpy as np
import pandas as pd
import xarray
import xarray as xr
from xclim.core.formatting import update_history
from xclim.core.units import convert_units_to, pint_multiply, str2pint

from miranda import __version__

from ._data_corrections import dataset_corrections, load_json_data_mappings, metadata_conversion


logger = logging.getLogger("miranda.convert.melcc")


frequencies = {
    "a": ("min", "1 min"),  # T
    "b": ("point", ""),  # "Élémentaire"...
    "c": ("12hr", "12 h"),  # 12H
    "f": ("point", ""),  # Instantaneous??
    "h": ("hr", "1 hr"),  # H
    "q": ("day", "1 d"),  # D
    "z": ("4min", "4 min"),  # 4T
}

__all__ = [
    "concat",
    "convert_mdb",
    "convert_melcc_obs",
    "convert_snow_table",
    "list_tables",
    "parse_var_code",
    "read_definitions",
    "read_stations",
    "read_table",
]


[docs] def parse_var_code(vcode: str) -> dict[str, Any]: """ Parse variable code to generate metadata. Parameters ---------- vcode : str The variable code. Returns ------- dict[str, Any] The metadata dictionary. """ match = re.match(r"(\D*)(\d*)([abcfhqz])", vcode) if match is None: raise ValueError(f"Failed to parse variable {vcode}.") short_code, instrument, freq = match.groups() melcc_code = vcode[:2].upper() + vcode[2:-1].lower() + vcode[-1].upper() short_code = short_code[:2].upper() + short_code[2:].lower() return { "freq": frequencies[freq][0], "sampling": frequencies[freq][1], # PI has different meanings, it's not just a different instrument. "var_name": short_code if short_code != "PI" else melcc_code[:-1], "instrument": np.int32(instrument), "melcc_code": melcc_code, }
def _validate_db_file(db_file: str) -> str: """ Validate the database file and ensure that input is trustworthy. Parameters ---------- db_file : str The database file. Returns ------- str The database file. """ if len(db_file) > 1: raise ValueError("Only one database file can be processed at a time.") if not Path(db_file).is_file(): raise FileNotFoundError(f"File {db_file} not found.") return db_file
[docs] def list_tables(db_file: str | os.PathLike[str]) -> list[str]: """ List the tables of an MDB file. Parameters ---------- db_file : str or os.PathLike The database file. Returns ------- list of str The list of tables. """ try: res = subprocess.run( # noqa: S603 ["mdb-tables", _validate_db_file(db_file)], capture_output=True, encoding="utf-8", ) except subprocess.CalledProcessError as err: raise ValueError(f"Calling mdb-tables on {db_file} failed with code {err.returncode}: {err.stderr}") from err if res.returncode != 0: raise ValueError(f"Calling mdb-tables on {db_file} failed with code {res.returncode}: {res.stderr}") return res.stdout.lower().strip().split()
[docs] def read_table(db_file: str | os.PathLike[str], table: str | os.PathLike) -> xarray.Dataset: """ Read a MySQL table into an xarray object. Parameters ---------- db_file : str or os.PathLike The database file. table : str or os.PathLike The table to read. Returns ------- xarray.Dataset An xarray Dataset with the table data. """ try: res = subprocess.run( # noqa: S603 [ "mdb-export", "-T", "%Y-%m-%dT%H:%M:%S", _validate_db_file(db_file), table.upper(), ], capture_output=True, encoding="utf-8", check=True, ) except subprocess.CalledProcessError as err: msg = f"Calling mdb-export on {db_file} failed with code {err.returncode}: {err.stderr}" raise ValueError(msg) from err df = ( pd.read_csv( StringIO(res.stdout), parse_dates=["DATE"], infer_datetime_format=True, date_parser=lambda d: pd.to_datetime(d, errors="coerce"), ) .rename( columns={ "NO_SEQ_STATION": "station", "DATE": "time", "CODE_STATUT_DONNEE": f"{table}_flag", "VALEUR_DONNEE": table, } ) .drop(columns=["STATUT_APPROBATION"]) # I don't know what this is... .set_index(["station", "time"]) ) nat = df.index.get_level_values("time").isnull().sum() if nat > 0: msg = f"{nat} values dropped because of unparsable timestamps (probably data from before 1900)." logger.warning(msg) df = df[df.index.get_level_values("time").notnull()] return df[~df.index.duplicated()].to_xarray()
[docs] def read_stations(db_file: str | os.PathLike) -> pd.DataFrame: """ Read station file using mdbtools. Parameters ---------- db_file : str or os.PathLike The database file. Returns ------- pandas.DataFrame A Pandas DataFrame with the station information. """ try: res = subprocess.run( # noqa: S603 [ "mdb-export", "-T", "%Y-%m-%dT%H:%M:%S", _validate_db_file(db_file), "STATIONs", ], capture_output=True, encoding="utf-8", check=True, ) except subprocess.CalledProcessError as err: msg = f"Calling mdb-export on {db_file} failed with code {err.returncode}: {err.stderr}" raise ValueError(msg) from err df = pd.read_csv( StringIO(res.stdout), parse_dates=["Date_Ouverture", "Date_Fermeture"], infer_datetime_format=True, date_parser=lambda d: pd.to_datetime(d, errors="coerce"), ).rename( columns={ "NO_STATION_CLIMATO": "station_id", "NOM_STATION": "station_name", "Latitude": "lat", "Longitude": "lon", "Altitude": "elevation", "Date_Ouverture": "station_opening", "Date_Fermeture": "station_closing", "Type_Poste": "station_type", "CODE_TYPE_POSTE": "station_type", "No_Seq_Station": "station", }, errors="ignore", ) ds = df.set_index("station").to_xarray() da = ds.set_coords(ds.data_vars.keys()).station da.lat.attrs.update(units="degree_north", standard_name="latitude") da["lon"] = -da.lon da.lon.attrs.update(units="degree_east", standard_name="longitude") da.elevation.attrs.update(units="m", standard_name="height") da["station_id"] = da["station_id"].astype(str) da["station_name"] = da["station_name"].astype(str) da["station_type"] = da["station_type"].astype(str) da.station_opening.attrs.update(description="Date of station creation.") da.station_closing.attrs.update(description="Date of station closure.") return da.isel(station=~da.indexes["station"].duplicated())
[docs] def read_definitions(db_file: str) -> pd.DataFrame: """ Read variable definition file using mdbtools. Parameters ---------- db_file : str The database file. Returns ------- pandas.DataFrame The variable definitions. """ try: res = subprocess.run( # noqa: S603 ["mdb-export", _validate_db_file(db_file), "DDs"], capture_output=True, encoding="utf-8", check=True, ) except subprocess.CalledProcessError as err: msg = f"Calling mdb-export on {db_file} failed with code {err.returncode}: {err.stderr}" raise ValueError(msg) from err definitions = ( pd.read_csv(StringIO(res.stdout)) .rename( columns={ "GROUPE": "melcc_category", "NOM_DD": "vcode", "PRIORITE": "priority", "UNITE": "units", "Description": "description_fr", } ) .drop(columns=["IND_TRANSFO"]) ) # TODO: Was ist das? definitions["vcode"] = definitions.vcode.str.lower() definitions["units"] = definitions.units.fillna("").replace("mm+cm", "mm") definitions["priority"] = definitions.priority.astype(np.int32) return definitions.set_index("vcode")
[docs] def convert_mdb( database: str | Path, stations: xr.Dataset, definitions: xr.Dataset, output: str | Path, overwrite: bool = True, ) -> dict[tuple[str, str], Path]: """ Convert microsoft databases of MELCCFP observation data to xarray objects. Parameters ---------- database : str or Path The database file. stations : xr.Dataset The station list. definitions : xr.Dataset The variable definitions. output : str or Path The output folder. overwrite : bool Whether to overwrite existing files. Default: True. Returns ------- dict[tuple[str, str], Path] The converted files. """ outs = {} tables = list_tables(database) for table in tables: if table.startswith("gdb") or table.startswith("~"): continue msg = f"Parsing {database}:{table}." logger.info(msg) meta = parse_var_code(table) code = meta["melcc_code"] existing = list(output.glob(f"*_{code}_MELCC_{meta['freq']}_*.nc")) if existing and not overwrite: if len(existing) > 1: raise ValueError(f"Found more than one existing file for table {code}!") file = existing[0] vname = file.stem.split(code)[0][:-1] msg = f"File already exists {file}, skipping." logger.info(msg) outs[(vname, code)] = file continue raw = read_table(database, table) if np.prod(list(raw.dims.values())) == 0: # If any dimension is 0. logger.warning("The table is empty.") continue vv = meta.pop("var_name") raw = raw.rename({table: vv, f"{table}_flag": f"{vv}_flag"}) raw.attrs["frequency"] = meta.pop("freq") raw[vv].attrs.update(**meta) if table.lower() not in definitions.index: warnings.warn(f"The {code} variable wasn't defined in the definition table.", stacklevel=2) else: dd = definitions.loc[table] raw[vv].attrs.update(**dd) raw[f"{vv}_flag"] = raw[f"{vv}_flag"].fillna(0).astype(np.int32) raw[f"{vv}_flag"].attrs.update( standard_name="status_flag", flag_values=np.array([0, 1, 3, 5, 7], dtype="int32"), flag_meanings="nodata good estimated forced trace", flag_meanings_fr="sansdonnée correcte estimée forcée trace", long_name=f"Quality flag for {code}.", ) try: stat = stations.sel(station=raw.station) except KeyError: extra_stations = set(raw.station.values) - set(stations.station.values) msg = f"The following station number were not found in the station list : {sorted(list(extra_stations))}" logger.error(msg) raw = xr.merge([raw, xr.Dataset().assign(station=stations)]).sel(station=raw.station) else: raw = raw.assign_coords(station=stat) ds = dataset_corrections(raw, "melcc-obs") new_var_name = list(filter(lambda k: not k.endswith("_flag"), ds.data_vars.keys()))[0] ds.attrs["history"] = f"[{dt.datetime.now():%Y-%m-%d %H:%M:%S}] Conversion from {database.name}:{table} to netCDF." date = "-".join(ds.indexes["time"][[0, -1]].strftime("%Y%m")) outs[(new_var_name, code)] = output / f"{new_var_name}_{code}_MELCC_{raw.attrs['frequency']}_{date}.nc" ds.to_netcdf(outs[(new_var_name, code)]) return outs
[docs] def convert_melcc_obs( metafile: str | Path, folder: str | Path, output: str | Path | None = None, overwrite: bool = True, ) -> dict[tuple[str, str], Path]: """ Convert MELCCFP observation data to xarray data objects, returning paths. Parameters ---------- metafile : str or Path The metadata file. folder : str or Path The folder containing the MDB files. output : str or Path, optional The output folder. Default: None. overwrite : bool Whether to overwrite existing files. Default: True. Returns ------- dict[(str, str), Path] The converted files. """ output = Path(output or ".") stations = read_stations(metafile) definitions = read_definitions(metafile) logger.info("Parsing databases.") folder = Path(folder) outs = {} for database in folder.glob("*.mdb"): outs.update(convert_mdb(database, stations, definitions, output, overwrite)) return outs
[docs] def concat( files: Sequence[str | os.PathLike[str]], output_folder: str | os.PathLike[str], overwrite: bool = True, ) -> Path: """ Concatenate converted weather station files. Parameters ---------- files : Sequence of str or os.PathLike The files to concatenate. output_folder : str or os.PathLike The output folder. overwrite : bool Whether to overwrite existing files. Default: True. Returns ------- Path The output path. """ *vv, _, melcc, freq, _ = Path(files[0]).stem.split("_") vv = "_".join(vv) msg = f"Concatenating variables from {len(files)} files ({vv})." logger.info(msg) # Magic one-liner to parse all date_start and date_end entries from the file names. dates_start, dates_end = list( zip( *[map(int, Path(file).stem.split("_")[-1].split("-")) for file in files], strict=False, ) ) outpath = Path(output_folder) / f"{vv}_{melcc}_{freq}_{min(dates_start):06d}-{max(dates_end):06d}.nc" if outpath.is_file() and not overwrite: msg = f"Already done in {outpath}. Skipping." logger.info(msg) return outpath dss = dict() for file in files: ds = xr.open_dataset(file, chunks={"time": 1000}) for crd in ds.coords.values(): crd.load() if list(sorted(ds.data_vars.keys())) != [vv, f"{vv}_flag"]: raise ValueError(f"Unexpected variables in {file}. Got {ds.data_vars.keys()}, expected {vv} and {vv}_flag.") priority = ds[vv].attrs["priority"] if priority in dss: raise ValueError(f"Variable {vv} of {file} has the same priority ({priority}) than another file of the same file list.") dss[priority] = ds ds_all = xr.merge([ds.coords.to_dataset() for ds in dss.values()], combine_attrs="drop_conflicts") for var in [vv, f"{vv}_flag"]: ds_all[var] = xr.concat( [ds[var] for ds in dss.values()], dim=xr.DataArray(list(dss.keys()), dims=("priority",), name="priority"), coords="all", combine_attrs="drop_conflicts", ) ds_all = ds_all.sortby("priority") ds_all[f"{vv}_flag"] = ds_all[f"{vv}_flag"].where(ds_all[vv].notnull()) ds_merged = ds_all.ffill("priority").isel(priority=-1, drop=True) ds_merged[f"{vv}_flag"] = ds_merged[f"{vv}_flag"].fillna(0).astype(np.int8) ds_merged.attrs["history"] = update_history( f"Different instruments merged according to their priority - miranda {__version__}", new_name=vv, **{f"priority={i}": ds for i, ds in dss.items()}, ) instruments = [dss[p][vv].melcc_code for p in sorted(dss)] ds_merged[vv].attrs["melcc_code"] = "Merged sources in ascending priority : " + " ,".join(map(str, instruments)) ds_merged.attrs.update( source="info-climat-merged", title="Station observations of the MELCC - all instruments merged.", title_fr="Observations aux stations du MELCC - instruments fusionnés", ) ds_merged.to_netcdf(outpath) return outpath
[docs] def convert_snow_table(file: str | os.PathLike[str] | Path, output: str | os.PathLike[str] | Path) -> None: """ Convert snow data given through an Excel file. This private data is not included in the MDB files. Parameters ---------- file : str or os.PathLike or Path The Excel file with sheets: "Stations", "Périodes standards", and "Données". output : str or os.PathLike or Path Folder where to put the netCDF files (one for each of snd, sd and snw). """ logger.info("Parsing stations.") stations = pd.read_excel(file, sheet_name="Stations") stations = stations.rename( columns={ "No": "station", "Nom": "station_name", "LAT(°)": "lat", "LONG(°)": "lon", "ALT(m)": "elevation", "OUVERTURE": "station_opening", "FERMETURE": "station_closing", } ) statds = stations.set_index("station").to_xarray() stations = statds.set_coords(statds.data_vars.keys()).station stations["station_name"] = stations["station_name"].astype(str) stations.lat.attrs.update(units="degree_north", standard_name="latitude") stations.lon.attrs.update(units="degree_east", standard_name="longitude") stations.elevation.attrs.update(units="m", standard_name="height") stations.station_opening.attrs.update(description="Date of station creation.") stations.station_closing.attrs.update(description="Date of station closure.") # Periods logger.info("Parsing observation periods.") periods = pd.read_excel(file, sheet_name="Périodes standards", names=["start", "end", "middle"]) periods = periods[["start", "end"]].to_xarray() periods = periods.to_array().rename(variable="bnds", index="num_period").drop_vars("bnds").rename("period_bnds") periods.attrs.update( description=( "Bounds of the sampling periods of the MELCC. " "Observations are taken manually once per period. " "The year of these bounds should be ignored." ) ) # Data logger.info("Parsing data.") data = pd.read_excel( file, sheet_name="Données", names=[ "station", "time", "snd", "snd_flag", "sd", "sd_flag", "snw", "snw_flag", ], ) ds = data.set_index(["station", "time"]).to_xarray() ds["station"] = stations.sel(station=ds.station) bins = periods.dt.dayofyear bins = np.concatenate((bins.isel(bnds=0), bins.isel(bnds=1, num_period=[-1]) + 1)) ds["period"] = ds.time.copy(data=np.digitize(ds.time.dt.dayofyear, bins)) ds.period.attrs.update(description="Observational period number.") with xr.set_options(keep_attrs=True): flag_attrs = dict( standard_name="status_flag", flag_values=[0, 1, 3, 5, 7], flag_meanings="nodata good estimated forced trace", flag_meanings_fr="sansdonnée correcte estimée forcée trace", ) ds.snd.attrs.update( standard_name="surface_snow_thickness", units="cm", long_name="Snow depth", long_name_fr="Épaisseur de la neige au sol", melcc_code="NS000F", description_fr="Épaisseur de la neige mesurée (carottier)", ) ds["snd"] = convert_units_to(ds.snd, "m") ds["snd_flag"] = ds.snd_flag.fillna(0).astype(int) ds.snd_flag.attrs.update( long_name="Quality of snow depth measurements.", long_name_fr="Qualité de la mesure d'épaisseur de la neige", **flag_attrs, ) ds.snw.attrs.update( standard_name="surface_snow_amount", units="cm", long_name="Snow amount", long_name_fr="Quantité de neige au sol", melcc_code="NSQ000F", description_fr="Équivalent en eau de la neige mesurée (carottier)", description="Converted from snow water-equivalent using a water density of 1000 kg/m³", ) ds["snw"] = pint_multiply(ds.snw, str2pint("1000 kg m-3"), out_units="kg m^-2") ds["snw_flag"] = ds.snd_flag.fillna(0).astype(int) ds.snw_flag.attrs.update( long_name="Quality of snow amount measurements.", long_name_fr="Qualité de la mesure de quantité de neige", **flag_attrs, ) # Density given as a percentage of water density ds.sd.attrs.update( standard_name="surface_snow_density", units="%", long_name="Snow density", long_name_fr="Densité de la neige au sol", melcc_code="NSD000F", description_fr="Densité de la neige mesurée (carottier)", ) ds["sd"] = pint_multiply(ds.sd, str2pint("1000 kg m-3"), out_units="kg m^-3") ds["sd_flag"] = ds.sd_flag.fillna(0).astype(int) ds.sd_flag.attrs.update( long_name="Quality of snow density measurements.", long_name_fr="Qualité de la mesure de densité de la neige", **flag_attrs, ) ds.attrs.update(frequency="2sem") meta = load_json_data_mappings("melcc") ds = metadata_conversion(ds, "melcc-snow", meta) date = "-".join(ds.indexes["time"][[0, -1]].strftime("%Y%m")) # Save logger.info("Saving to files.") for vv in ["sd", "snd", "snw"]: ds[[vv, f"{vv}_flag"]].to_netcdf(output / f"{vv}_{ds[vv].melcc_code}_MELCC_2sem_{date}.nc")
if __name__ == "__main__": argparser = ArgumentParser( description="Convert MS Access data to netCDF. Requires mdbtools to be installed. By default, a single netCDF is created for each data table." ) argparser.add_argument("-v", "--verbose", help="Increase verbosity of the script.", action="store_true") argparser.add_argument( "-c", "--concat", help="Merge all variables with the same frequency in individual datasets, according to their priority.", action="store_true", ) argparser.add_argument("-o", "--output", help="Output folder where to put the netCDFs.", default=".") argparser.add_argument( "--raw-output", help="Output folder where to put the non-merged netCDFs.", ) argparser.add_argument( "-s", "--skip-existing", help="Do not overwrite existing files.", action="store_true", ) argparser.add_argument( "metafile", help="Path to the MDB file containing the station and variable information.", ) argparser.add_argument("folder", help="Path to a folder including many *.mdb files to convert.") args = argparser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) outputs = convert_melcc_obs( args.metafile, args.folder, output=args.raw_output or args.output, overwrite=not args.skip_existing, ) if args.concat: new_vars = defaultdict(list) for (new_var, _), path in outputs.items(): new_vars[new_var].append(path) for fichiers in new_vars.values(): try: concat(fichiers, args.output, overwrite=not args.skip_existing) except Exception as err: # noqa: PERF203,BLE001 logger.error(err)