Source code for miranda.remote.archiver

"""Archive Module."""
from __future__ import annotations

import logging.config
from collections import defaultdict
from pathlib import Path

from miranda.archive import (
    group_by_deciphered_date,
    group_by_size,
    group_by_subdirectories,
)
from miranda.io import find_filepaths
from miranda.scripting import LOGGING_CONFIG
from miranda.storage import report_file_size
from miranda.utils import single_item_list, working_directory

from .connect import Connection
from .ops import create_archive, create_remote_directory, transfer_file

logging.config.dictConfig(LOGGING_CONFIG)

__all__ = ["archive_database"]


[docs] def archive_database( source: Path | str | list, common_path: Path | str, destination: Path | str, file_suffixes: str = ".nc", server: str = None, username: str = None, project_name: str = None, overwrite: bool = False, compression: bool = False, recursive: bool = False, use_grouping: bool = True, use_subdirectories: bool = True, dry_run: bool = False, ) -> None: """ Given a source, destination, and dependent on file size limit, create tarfile archives and transfer files to another server for backup purposes """ project = "{project_name}_{group_name}_{common_date}{part}.{suffix}" if not project_name: project_name = destination.name if compression: suffix = "tar.gz" elif not compression: suffix = "tar" else: raise ValueError(f"Compression: {compression}") file_list, source_path = find_filepaths( source=source, recursive=recursive, file_suffixes=file_suffixes ) if use_subdirectories: file_groups = group_by_subdirectories(file_list, within=common_path) else: file_groups = defaultdict(list) for f in file_list: file_groups["."].append(f) connection = Connection(username=username, host=server, protocol="sftp") if dry_run: logging.info( "Running archival functions in `dry_run` mode. No files will be transferred." ) try: successful_transfers = list() with connection as ctx: for group_name, members in file_groups.items(): remote_path = Path(destination, group_name) if not dry_run: if not remote_path.exists(): create_remote_directory(remote_path, transport=ctx) if use_grouping: dated_groups = group_by_deciphered_date(members) else: dated_groups = dict(group_name=members) for common_date, files in dated_groups.items(): if not use_grouping or single_item_list(files): for archive_file in files: transfer = Path(remote_path, archive_file.name) if transfer.is_file(): if not overwrite: logging.info(f"{transfer} exists. Skipping file.") continue logging.info(f"{transfer} exists. Overwriting.") if not dry_run: if transfer_file(archive_file, transfer, transport=ctx): successful_transfers.append(archive_file) else: successful_transfers.append(archive_file) elif use_grouping or not single_item_list(files): sized_groups = group_by_size(files) for i, sized_group in enumerate(sized_groups): if len(sized_groups) > 1: part = f"_{str(i + 1).zfill(3)}" else: part = "" archive_file = project.format( project_name, group_name, common_date, part, suffix ) transfer = Path(remote_path, archive_file) if transfer.is_file(): if not overwrite: logging.info( f'File "{transfer}" exists. Skipping file.' ) continue logging.info(f'File "{transfer}" exists. Overwriting.') with working_directory(source_path): if not dry_run: if create_archive( sized_group, transfer, transport=ctx, compression=compression, recursive=recursive, ): successful_transfers.extend(sized_group) else: successful_transfers.extend(sized_group) else: raise FileNotFoundError("No files found in grouping.") logging.info( f"Transferred {len(successful_transfers)} " f"of { len([f for f in file_list])} files " f"totalling {report_file_size(successful_transfers)}." ) except Exception as e: msg = f"{e}: Failed to transfer files." logging.error(msg) raise RuntimeError(msg) from e