Source code for stouputils.backup.consolidate
# Imports
import os
import shutil
import zipfile
from ..config import StouputilsConfig as Cfg
from ..decorators import measure_time
from ..io.path import clean_path
from ..print.message import info, warning
from ..print.progress_bar import colored_for_loop
from .retrieve import get_all_previous_backups
# Function to consolidate multiple backups into one comprehensive backup
[docs]
@measure_time(message="Consolidating backups")
def consolidate_backups(zip_path: str, destination_zip: str) -> None:
""" Consolidates the files from the given backup and all previous ones into a new ZIP file,
ensuring that the most recent version of each file is kept and deleted files are not restored.
Args:
zip_path (str): Path to the latest backup ZIP file (If endswith "/latest.zip" or "/", the latest backup will be used)
destination_zip (str): Path to the destination ZIP file where the consolidated backup will be saved
Examples:
.. code-block:: python
> consolidate_backups("/path/to/backups/latest.zip", "/path/to/consolidated.zip")
[INFO HH:MM:SS] Consolidating backups
[INFO HH:MM:SS] Consolidated backup created: '/path/to/consolidated.zip'
"""
zip_path = clean_path(os.path.abspath(zip_path))
destination_zip = clean_path(os.path.abspath(destination_zip))
zip_folder: str = clean_path(os.path.dirname(zip_path))
# Get all previous backups up to the specified one
previous_backups: dict[str, dict[str, str]] = get_all_previous_backups(zip_folder, all_before=zip_path)
backup_paths: list[str] = list(previous_backups.keys())
# First pass: collect all deleted files and build file registry
deleted_files: set[str] = set()
file_registry: dict[str, tuple[str, zipfile.ZipInfo]] = {} # filename -> (backup_path, zipinfo)
# Process backups in reverse order (newest first) to prioritize latest versions
for backup_path in reversed(backup_paths):
try:
with zipfile.ZipFile(backup_path, "r") as zipf_in:
# Get namelist once for efficiency
namelist: list[str] = zipf_in.namelist()
# Process deleted files
if "__deleted_files__.txt" in namelist:
backup_deleted_files: list[str] = zipf_in.read("__deleted_files__.txt").decode().splitlines()
deleted_files.update(backup_deleted_files)
# Process files - only add if not already in registry (newer versions take precedence)
for inf in zipf_in.infolist():
filename: str = inf.filename
if (filename
and filename != "__deleted_files__.txt"
and filename not in deleted_files
and filename not in file_registry):
file_registry[filename] = (backup_path, inf)
except Exception as e:
warning(f"Error processing backup {backup_path}: {e}")
continue
# Second pass: copy files efficiently, keeping ZIP files open longer
open_zips: dict[str, zipfile.ZipFile] = {}
try:
with zipfile.ZipFile(destination_zip, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zipf_out:
for filename, (backup_path, inf) in colored_for_loop(file_registry.items(), desc="Making consolidated backup"):
try:
# Open ZIP file if not already open
if backup_path not in open_zips:
open_zips[backup_path] = zipfile.ZipFile(backup_path, "r")
zipf_in = open_zips[backup_path]
# Copy file with optimized strategy based on file size
with zipf_in.open(inf, "r") as source:
with zipf_out.open(inf, "w", force_zip64=True) as target:
# Use shutil.copyfileobj with larger chunks for files >50MB
if inf.file_size > 52428800: # 50MB threshold
shutil.copyfileobj(source, target, length=Cfg.LARGE_CHUNK_SIZE)
else:
# Use shutil.copyfileobj with standard chunks for smaller files
shutil.copyfileobj(source, target, length=Cfg.CHUNK_SIZE)
except Exception as e:
warning(f"Error copying file {filename} from {backup_path}: {e}")
continue
# Add accumulated deleted files to the consolidated backup
if deleted_files:
zipf_out.writestr("__deleted_files__.txt", "\n".join(sorted(deleted_files)), compress_type=zipfile.ZIP_DEFLATED)
finally:
# Clean up open ZIP files
for zipf in open_zips.values():
try:
zipf.close()
except Exception:
pass
info(f"Consolidated backup created: {destination_zip}")