Source code for stouputils.archive

"""
This module provides functions for creating and managing archives.

- repair_zip_file: Try to repair a corrupted zip file by ignoring some of the errors
- make_archive: Create a zip archive from a source directory with consistent file timestamps.
- archive_cli: Main entry point for command line usage

.. image:: https://raw.githubusercontent.com/Stoupy51/stouputils/refs/heads/main/assets/archive_module.gif
  :alt: stouputils archive examples
"""

# pyright: reportUnusedVariable=false
# Imports
import fnmatch
import os
from zipfile import ZIP_DEFLATED, ZipFile, ZipInfo

from .decorators import LogLevels, handle_error
from .io import clean_path, super_copy
from .print import debug, error, info


# Function that repair a corrupted zip file (ignoring some of the errors)
[docs] @handle_error() def repair_zip_file(file_path: str, destination: str) -> bool: """ Try to repair a corrupted zip file by ignoring some of the errors This function manually parses the ZIP file structure to extract files even when the ZIP file is corrupted. It reads the central directory entries and attempts to decompress each file individually. Args: file_path (str): Path of the zip file to repair destination (str): Destination of the new file Returns: bool: Always returns True unless any strong error Examples: .. code-block:: python > repair_zip_file("/path/to/source.zip", "/path/to/destination.zip") """ # Check if not os.path.exists(file_path): raise FileNotFoundError(f"File '{file_path}' not found") dirname: str = os.path.dirname(destination) if dirname and not os.path.exists(dirname): raise FileNotFoundError(f"Directory '{dirname}' not found") import struct import zlib # Read the entire ZIP file into memory with open(file_path, 'rb') as f: data = f.read() # Find central directory entries CENTRAL_SIG = b'PK\x01\x02' entries: list[dict[str, int | str]] = [] idx = 0 while True: idx = data.find(CENTRAL_SIG, idx) if idx == -1: break # Ensure enough length for central directory header if idx + 46 > len(data): break header = data[idx:idx+46] try: ( sig, ver_made, ver_needed, flags, comp_method, mtime, mdate, crc, csize, usize, name_len, extra_len, comm_len, disk_start, int_attr, ext_attr, local_off ) = struct.unpack('<4s6H3L3H2H2L', header) name_start = idx + 46 if name_start + name_len > len(data): idx += 4 continue name = data[name_start:name_start+name_len].decode('utf-8', errors='replace') entries.append({ 'name': name, 'comp_method': comp_method, 'csize': csize, 'usize': usize, 'local_offset': local_off, 'crc': crc }) except (struct.error, UnicodeDecodeError): # Skip corrupted entries pass idx += 4 # Create a new ZIP file with recovered entries with ZipFile(destination, "w", compression=ZIP_DEFLATED) as new_zip_file: for entry in entries: try: # Get the local header to find data start lo: int = int(entry['local_offset']) if lo + 30 > len(data): continue lh = data[lo:lo+30] try: _, _, _, _, _, _, _, _, _, name_len, extra_len = struct.unpack('<4sHHHHHLLLHH', lh) except struct.error: continue data_start: int = lo + 30 + name_len + extra_len if data_start + int(entry['csize']) > len(data): continue comp_data = data[data_start:data_start+int(entry['csize'])] # Decompress the data try: if int(entry['comp_method']) == 0: # No compression content = comp_data[:int(entry['usize'])] elif int(entry['comp_method']) == 8: # Deflate compression content = zlib.decompress(comp_data, -zlib.MAX_WBITS) else: # Unsupported compression method, skip continue # Write to new ZIP file new_zip_file.writestr(str(entry['name']), content) except (zlib.error, Exception): # If decompression fails, try to write raw data as a fallback try: new_zip_file.writestr(f"{entry['name']!s}.corrupted", comp_data) except Exception: # Skip completely corrupted entries continue except Exception: # Skip any entries that cause errors continue return True
# Function that makes an archive with consistency (same zip file each time)
[docs] @handle_error() def make_archive( source: str, destinations: list[str] | str | None = None, override_time: None | tuple[int, int, int, int, int, int] = None, create_dir: bool = False, ignore_patterns: str | None = None, ) -> bool: """ Create a zip archive from a source directory with consistent file timestamps. (Meaning deterministic zip file each time) Creates a zip archive from the source directory and copies it to one or more destinations. The archive will have consistent file timestamps across runs if override_time is specified. Uses maximum compression level (9) with ZIP_DEFLATED algorithm. Args: source (str): The source folder to archive destinations (list[str]|str): The destination folder(s) or file(s) to copy the archive to override_time (None | tuple[int, ...]): The constant time to use for the archive (e.g. (2024, 1, 1, 0, 0, 0) for 2024-01-01 00:00:00) create_dir (bool): Whether to create the destination directory if it doesn't exist ignore_patterns (str | None): Glob pattern(s) to ignore files. Can be a single pattern or comma-separated patterns (e.g. "*.pyc" or "*.pyc,__pycache__,*.log") Returns: bool: Always returns True unless any strong error Examples: .. code-block:: python > make_archive("/path/to/source", "/path/to/destination.zip") > make_archive("/path/to/source", ["/path/to/destination.zip", "/path/to/destination2.zip"]) > make_archive("src", "hello_from_year_2085.zip", override_time=(2085,1,1,0,0,0)) > make_archive("src", "output.zip", ignore_patterns="*.pyc") > make_archive("src", "output.zip", ignore_patterns="__pycache__") > make_archive("src", "output.zip", ignore_patterns="*.pyc,__pycache__,*.log") """ # Fix copy_destinations type if needed if destinations is None: destinations = [] if destinations and isinstance(destinations, str): destinations = [destinations] if not destinations: raise ValueError("destinations must be a list of at least one destination") # Create directories if needed if create_dir: for dest_file in destinations: dest_file = clean_path(dest_file) parent_dir = os.path.dirname(dest_file) if parent_dir and not os.path.exists(parent_dir): os.makedirs(parent_dir, exist_ok=True) # Create the archive destination: str = clean_path(destinations[0]) destination = destination if ".zip" in destination else destination + ".zip" # Parse ignore patterns (can be a single pattern or comma-separated patterns) ignore_pattern_list: list[str] = [] if ignore_patterns: ignore_pattern_list = [pattern.strip() for pattern in ignore_patterns.split(',')] def should_ignore(path: str) -> bool: """Check if a file or directory path should be ignored based on patterns.""" if not ignore_pattern_list: return False for pattern in ignore_pattern_list: if fnmatch.fnmatch(os.path.basename(path), pattern) or fnmatch.fnmatch(path, pattern): return True return False with ZipFile(destination, "w", compression=ZIP_DEFLATED, compresslevel=9) as zip: for root, dirs, files in os.walk(source): # Filter out ignored directories in-place to prevent walking into them dirs[:] = [d for d in dirs if not should_ignore(d)] for file in files: file_path: str = clean_path(os.path.join(root, file)) rel_path = os.path.relpath(file_path, source) # Skip files that match any ignore pattern if should_ignore(file) or should_ignore(rel_path): continue info: ZipInfo = ZipInfo(rel_path) info.compress_type = ZIP_DEFLATED if override_time: info.date_time = override_time with open(file_path, "rb") as f: zip.writestr(info, f.read()) # Copy the archive to the destination(s) for dest_file in destinations[1:]: @handle_error(Exception, message=f"Unable to copy '{destination}' to '{dest_file}'", error_log=LogLevels.WARNING) def internal(src: str, dest: str) -> None: super_copy(src, dest, create_dir=create_dir) internal(destination, clean_path(dest_file)) return True
# Main entry point for command line usage
[docs] def archive_cli() -> None: """ Main entry point for command line usage. Examples: .. code-block:: bash # Repair a corrupted zip file python -m stouputils.archive repair /path/to/corrupted.zip /path/to/repaired.zip # Create a zip archive python -m stouputils.archive make /path/to/source /path/to/destination.zip # Create a zip archive with ignore patterns python -m stouputils.archive make /path/to/source /path/to/destination.zip --ignore "*.pyc,__pycache__" """ import argparse import sys parser = argparse.ArgumentParser(description="Archive utilities") subparsers = parser.add_subparsers(dest="command", help="Available commands") # Repair command repair_parser = subparsers.add_parser("repair", help="Repair a corrupted zip file") repair_parser.add_argument("input_file", help="Path to the corrupted zip file") repair_parser.add_argument("output_file", nargs="?", help="Path to the repaired zip file (optional, defaults to input_file with '_repaired' suffix)") # Make archive command archive_parser = subparsers.add_parser("make", help="Create a zip archive") archive_parser.add_argument("source", help="Source directory to archive") archive_parser.add_argument("destination", help="Destination zip file") archive_parser.add_argument("--ignore", help="Glob patterns to ignore (comma-separated)") archive_parser.add_argument("--create-dir", action="store_true", help="Create destination directory if it doesn't exist") args = parser.parse_args() if args.command == "repair": input_file = args.input_file if args.output_file: output_file = args.output_file else: # Generate default output filename base, ext = os.path.splitext(input_file) output_file = f"{base}_repaired{ext}" debug(f"Repairing '{input_file}' to '{output_file}'...") try: repair_zip_file(input_file, output_file) info(f"Successfully repaired zip file: {output_file}") except Exception as e: error(f"Error repairing zip file: {e}", exit=False) sys.exit(1) elif args.command == "make": debug(f"Creating archive from '{args.source}' to '{args.destination}'...") try: make_archive( source=args.source, destinations=args.destination, create_dir=args.create_dir, ignore_patterns=args.ignore ) info(f"Successfully created archive: {args.destination}") except Exception as e: error(f"Error creating archive: {e}", exit=False) sys.exit(1) else: parser.print_help() sys.exit(1)
if __name__ == "__main__": archive_cli()