Source code for stouputils.archive.repair_zip_file


# Imports
import bisect
import os
import struct
import zlib
from zipfile import ZIP_DEFLATED, ZipFile

from ..decorators.handle_error import handle_error


# Function that repair a corrupted zip file (ignoring some of the errors)
[docs] @handle_error def repair_zip_file(file_path: str, destination: str) -> bool: """ Try to repair a corrupted zip file by ignoring some of the errors This function manually parses the ZIP file structure to extract files even when the ZIP file is corrupted. It reads the central directory entries and attempts to decompress each file individually. Args: file_path (str): Path of the zip file to repair destination (str): Destination of the new file Returns: bool: Always returns True unless any strong error Examples: .. code-block:: python > repair_zip_file("/path/to/source.zip", "/path/to/destination.zip") """ # Check if not os.path.exists(file_path): raise FileNotFoundError(f"File '{file_path}' not found") dirname: str = os.path.dirname(destination) if dirname and not os.path.exists(dirname): raise FileNotFoundError(f"Directory '{dirname}' not found") # Read the entire ZIP file into memory with open(file_path, "rb") as f: data = f.read() LOCAL_SIG = b"PK\x03\x04" CENTRAL_SIG = b"PK\x01\x02" EOCD_SIG = b"PK\x05\x06" DATA_LEN = len(data) def _collect_positions(signature: bytes) -> list[int]: positions: list[int] = [] idx = data.find(signature) while idx != -1: positions.append(idx) idx = data.find(signature, idx + 1) return positions signature_positions: list[int] = sorted(set( _collect_positions(LOCAL_SIG) + _collect_positions(CENTRAL_SIG) + _collect_positions(EOCD_SIG) )) def _decode_name(raw_name: bytes, flags: int) -> str: if flags & 0x0800: return raw_name.decode("utf-8", errors="replace") return raw_name.decode("cp437", errors="replace") def _sanitize_name(name: str, fallback_index: int) -> str: sanitized = name.replace("\\", "/").lstrip("/") # A metadata file should not be a directory; this helps common pack corruption cases. if sanitized.lower() in {"pack.mcmeta", "pack.mcmeta/"}: return "pack.mcmeta" if sanitized.endswith("/") and "." in sanitized.rsplit("/", 1)[-1]: sanitized = sanitized.rstrip("/") if not sanitized: sanitized = f"recovered_{fallback_index}" return sanitized def _next_zip_signature(start: int) -> int: pos = bisect.bisect_left(signature_positions, start) if pos >= len(signature_positions): return DATA_LEN return signature_positions[pos] def _find_local_header_near(offset_hint: int) -> int: if 0 <= offset_hint <= DATA_LEN - 4 and data[offset_hint:offset_hint + 4] == LOCAL_SIG: return offset_hint if 0 <= offset_hint + 4 <= DATA_LEN - 4 and data[offset_hint + 4:offset_hint + 8] == LOCAL_SIG: return offset_hint + 4 start = max(0, offset_hint - 32) end = min(DATA_LEN, offset_hint + 8192) best = -1 best_dist = 10**12 search_at = start while True: pos = data.find(LOCAL_SIG, search_at, end) if pos == -1: break dist = abs(pos - offset_hint) if dist < best_dist: best_dist = dist best = pos search_at = pos + 1 return best def _read_local_header(offset: int) -> tuple[int, int, int, str, int] | None: if offset < 0 or offset + 30 > DATA_LEN: return None if data[offset:offset + 4] != LOCAL_SIG: return None try: ( _sig, _ver, flags, method, _mtime, _mdate, _crc, csize, _usize, name_len, extra_len, ) = struct.unpack("<4s5H3L2H", data[offset:offset + 30]) except struct.error: return None name_start = offset + 30 name_end = name_start + name_len extra_end = name_end + extra_len if extra_end > DATA_LEN: return None raw_name = data[name_start:name_end] name = _decode_name(raw_name, flags) return method, int(csize), flags, name, extra_end def _extract_content(method: int, data_start: int, size_hint: int | None) -> tuple[bytes, int] | None: if data_start < 0 or data_start > DATA_LEN: return None def _decode_range(end: int) -> tuple[bytes, int] | None: comp_data = data[data_start:end] try: if method == 0: return comp_data, end if method == 8: decompressor = zlib.decompressobj(-zlib.MAX_WBITS) content = decompressor.decompress(comp_data) + decompressor.flush() used = len(comp_data) - len(decompressor.unused_data) if used > 0: return content, data_start + used return content, end except Exception: return None if size_hint is not None and size_hint >= 0: end_hint = data_start + size_hint if end_hint <= DATA_LEN: result = _decode_range(end_hint) if result is not None: return result next_sig = _next_zip_signature(data_start) if next_sig > data_start: result = _decode_range(next_sig) if result is not None: return result if next_sig != DATA_LEN: result = _decode_range(DATA_LEN) if result is not None: return result return None central_entries: list[dict[str, int | str]] = [] idx = 0 while True: idx = data.find(CENTRAL_SIG, idx) if idx == -1: break if idx + 46 > DATA_LEN: break try: ( _sig, _ver_made, _ver_needed, flags, method, _mtime, _mdate, _crc, csize, _usize, name_len, extra_len, comment_len, _disk_start, _int_attr, _ext_attr, local_offset, ) = struct.unpack("<4s6H3L5H2L", data[idx:idx + 46]) except struct.error: idx += 4 continue name_start = idx + 46 name_end = name_start + name_len block_end = name_end + extra_len + comment_len if block_end > DATA_LEN: idx += 4 continue entry_name = _decode_name(data[name_start:name_end], int(flags)) central_entries.append({ "name": entry_name, "method": int(method), "csize": int(csize), "local_offset": int(local_offset), }) idx = block_end # Recover entries primarily from central directory metadata. seen_names: set[str] = set() entry_index = 0 with ZipFile(destination, "w", compression=ZIP_DEFLATED) as new_zip_file: for entry in central_entries: try: local_offset = _find_local_header_near(int(entry["local_offset"])) local_info = _read_local_header(local_offset) if local_info is None: continue local_method, local_csize, _local_flags, local_name_raw, data_start = local_info entry_name = _sanitize_name(str(entry["name"]), entry_index) local_name = _sanitize_name(local_name_raw, entry_index) entry_index += 1 if entry_name.endswith("/") and not local_name.endswith("/"): entry_name = local_name method = int(entry["method"]) if method not in (0, 8): method = local_method content_info = _extract_content(method, data_start, int(entry["csize"])) if content_info is None and local_csize >= 0: content_info = _extract_content(method, data_start, local_csize) if content_info is None: content_info = _extract_content(method, data_start, None) if content_info is None: continue content, _next_offset = content_info if entry_name in seen_names: base_name, dot, ext = entry_name.rpartition(".") if dot: entry_name = f"{base_name}_recovered.{ext}" else: entry_name = f"{entry_name}_recovered" seen_names.add(entry_name) new_zip_file.writestr(entry_name, content) except Exception: continue # Fallback: recover pack.mcmeta from local entries when central metadata is too damaged. if "pack.mcmeta" not in seen_names: idx = 0 while True: idx = data.find(LOCAL_SIG, idx) if idx == -1: break local_info = _read_local_header(idx) if local_info is None: idx += 4 continue method, local_csize, _flags, local_name_raw, data_start = local_info entry_name = _sanitize_name(local_name_raw, entry_index) entry_index += 1 if entry_name.lower() != "pack.mcmeta": idx += 4 continue content_info = _extract_content(method, data_start, local_csize) if content_info is None: content_info = _extract_content(method, data_start, None) if content_info is None: idx += 4 continue content, _next_offset = content_info new_zip_file.writestr("pack.mcmeta", content) seen_names.add("pack.mcmeta") break idx += 4 return True