Source code for stouputils.archive.repair_zip_file


# Imports
import os
from zipfile import ZIP_DEFLATED, ZipFile

from ..decorators.handle_error import handle_error


# Function that repair a corrupted zip file (ignoring some of the errors)
[docs] @handle_error def repair_zip_file(file_path: str, destination: str) -> bool: """ Try to repair a corrupted zip file by ignoring some of the errors This function manually parses the ZIP file structure to extract files even when the ZIP file is corrupted. It reads the central directory entries and attempts to decompress each file individually. Args: file_path (str): Path of the zip file to repair destination (str): Destination of the new file Returns: bool: Always returns True unless any strong error Examples: .. code-block:: python > repair_zip_file("/path/to/source.zip", "/path/to/destination.zip") """ # Check if not os.path.exists(file_path): raise FileNotFoundError(f"File '{file_path}' not found") dirname: str = os.path.dirname(destination) if dirname and not os.path.exists(dirname): raise FileNotFoundError(f"Directory '{dirname}' not found") import struct import zlib # Read the entire ZIP file into memory with open(file_path, 'rb') as f: data = f.read() # Find central directory entries CENTRAL_SIG = b'PK\x01\x02' entries: list[dict[str, int | str]] = [] idx = 0 while True: idx = data.find(CENTRAL_SIG, idx) if idx == -1: break # Ensure enough length for central directory header if idx + 46 > len(data): break header = data[idx:idx+46] try: ( _sig, _ver_made, _ver_needed, _flags, comp_method, _mtime, _mdate, crc, csize, usize, name_len, extra_len, _comm_len, _disk_start, _int_attr, _ext_attr, local_off ) = struct.unpack('<4s6H3L3H2H2L', header) name_start = idx + 46 if name_start + name_len > len(data): idx += 4 continue name = data[name_start:name_start+name_len].decode('utf-8', errors='replace') entries.append({ 'name': name, 'comp_method': comp_method, 'csize': csize, 'usize': usize, 'local_offset': local_off, 'crc': crc }) except (struct.error, UnicodeDecodeError): # Skip corrupted entries pass idx += 4 # Create a new ZIP file with recovered entries with ZipFile(destination, "w", compression=ZIP_DEFLATED) as new_zip_file: for entry in entries: try: # Get the local header to find data start lo: int = int(entry['local_offset']) if lo + 30 > len(data): continue lh = data[lo:lo+30] try: _, _, _, _, _, _, _, _, _, name_len, extra_len = struct.unpack('<4sHHHHHLLLHH', lh) except struct.error: continue data_start: int = lo + 30 + name_len + extra_len if data_start + int(entry['csize']) > len(data): continue comp_data = data[data_start:data_start+int(entry['csize'])] # Decompress the data try: if int(entry['comp_method']) == 0: # No compression content = comp_data[:int(entry['usize'])] elif int(entry['comp_method']) == 8: # Deflate compression content = zlib.decompress(comp_data, -zlib.MAX_WBITS) else: # Unsupported compression method, skip continue # Write to new ZIP file new_zip_file.writestr(str(entry['name']), content) except (zlib.error, Exception): # If decompression fails, try to write raw data as a fallback try: new_zip_file.writestr(f"{entry['name']!s}.corrupted", comp_data) except Exception: # Skip completely corrupted entries continue except Exception: # Skip any entries that cause errors continue return True