From 13b887260f1054b3a5d3e6e55e7828ac0833e632 Mon Sep 17 00:00:00 2001 From: "m.dabbagh" Date: Sun, 18 Jan 2026 15:44:49 +0330 Subject: [PATCH] add zip extractor adapter --- src/adapters/incoming/api_routes.py | 2 +- .../outgoing/extractors/zip_extractor.py | 297 ++++++++++++++++++ src/bootstrap.py | 2 + 3 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 src/adapters/outgoing/extractors/zip_extractor.py diff --git a/src/adapters/incoming/api_routes.py b/src/adapters/incoming/api_routes.py index 8c8708a..f61333c 100644 --- a/src/adapters/incoming/api_routes.py +++ b/src/adapters/incoming/api_routes.py @@ -423,7 +423,7 @@ async def health_check() -> HealthCheckResponse: return HealthCheckResponse( status="healthy", version="1.0.0", - supported_file_types=["pdf", "docx", "txt"], + supported_file_types=["pdf", "docx", "txt", "zip"], available_strategies=["fixed_size", "paragraph"], ) diff --git a/src/adapters/outgoing/extractors/zip_extractor.py b/src/adapters/outgoing/extractors/zip_extractor.py new file mode 100644 index 0000000..205c798 --- /dev/null +++ b/src/adapters/outgoing/extractors/zip_extractor.py @@ -0,0 +1,297 @@ +""" +ZIP Extractor - Concrete implementation for ZIP archive extraction. + +This adapter implements the IExtractor port for ZIP files containing +Markdown documents. It merges all .md files into a single document. +""" +import logging +import zipfile +from pathlib import Path +from typing import List + +from ....core.domain.exceptions import ( + EmptyContentError, + ExtractionError, +) +from ....core.domain.models import Document, DocumentMetadata, SourceType +from ....core.ports.outgoing.extractor import IExtractor + + +logger = logging.getLogger(__name__) + + +class ZipExtractor(IExtractor): + """ + Concrete ZIP extractor for archives containing Markdown files. + + This adapter: + 1. Opens ZIP archives and filters for .md files + 2. Sorts files alphabetically for deterministic order + 3. Merges all Markdown files into a single document + 4. Inserts file source headers between merged content + 5. Handles corrupted files gracefully + """ + + def __init__(self) -> None: + """Initialize ZIP extractor.""" + self._supported_extensions = ['zip'] + self._encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252'] + logger.debug("ZipExtractor initialized") + + def extract(self, file_path: Path) -> Document: + """ + Extract and merge Markdown files from ZIP archive. + + Args: + file_path: Path to the ZIP file + + Returns: + Document entity with merged content and metadata + + Raises: + ExtractionError: If extraction fails + EmptyContentError: If no Markdown files could be extracted + """ + try: + logger.info(f"Extracting Markdown files from ZIP: {file_path}") + + # Validate file + self._validate_file(file_path) + + # Extract and merge markdown files + merged_text = self._extract_and_merge_markdown(file_path) + + # Validate content + if not merged_text or not merged_text.strip(): + raise EmptyContentError(file_path=str(file_path)) + + # Create metadata + metadata = self._create_metadata(file_path) + + # Build document with raw_markdown + document = Document(raw_markdown=merged_text, metadata=metadata) + + logger.info( + f"Successfully extracted {len(merged_text)} characters from {file_path.name}" + ) + return document + + except EmptyContentError: + raise + except ExtractionError: + raise + except Exception as e: + logger.error(f"ZIP extraction failed for {file_path}: {str(e)}") + raise ExtractionError( + message=f"Failed to extract Markdown from {file_path.name}", + details=str(e), + file_path=str(file_path), + ) + + def supports_file_type(self, file_extension: str) -> bool: + """ + Check if this extractor supports ZIP files. + + Args: + file_extension: File extension (e.g., 'zip') + + Returns: + True if ZIP files are supported + """ + return file_extension.lower() in self._supported_extensions + + def get_supported_types(self) -> List[str]: + """ + Get list of supported file extensions. + + Returns: + List containing 'zip' + """ + return self._supported_extensions.copy() + + def _validate_file(self, file_path: Path) -> None: + """ + Validate file exists and is a valid ZIP archive. + + Args: + file_path: Path to validate + + Raises: + ExtractionError: If file is invalid + """ + if not file_path.exists(): + raise ExtractionError( + message=f"File not found: {file_path}", + file_path=str(file_path), + ) + + if not file_path.is_file(): + raise ExtractionError( + message=f"Path is not a file: {file_path}", + file_path=str(file_path), + ) + + if file_path.stat().st_size == 0: + raise EmptyContentError(file_path=str(file_path)) + + # Validate it's a valid ZIP file + if not zipfile.is_zipfile(file_path): + raise ExtractionError( + message=f"File is not a valid ZIP archive: {file_path}", + file_path=str(file_path), + ) + + def _extract_and_merge_markdown(self, file_path: Path) -> str: + """ + Extract all Markdown files from ZIP and merge into single string. + + Args: + file_path: Path to ZIP file + + Returns: + Merged Markdown content with file source headers + + Raises: + ExtractionError: If ZIP extraction fails + """ + try: + with zipfile.ZipFile(file_path, 'r') as zip_file: + # Get all markdown files, sorted alphabetically + md_files = self._get_markdown_files(zip_file) + + if not md_files: + logger.warning(f"No .md files found in ZIP archive: {file_path}") + raise EmptyContentError(file_path=str(file_path)) + + logger.info(f"Found {len(md_files)} Markdown files in ZIP") + + # Merge all files + merged_parts = [] + successful_extractions = 0 + + for md_file in md_files: + content = self._extract_file_content(zip_file, md_file) + if content is not None: + # Add file source header + header = f"\n\n# File Source: {md_file}\n\n" + merged_parts.append(header) + merged_parts.append(content) + successful_extractions += 1 + logger.debug(f"Successfully extracted: {md_file}") + + if successful_extractions == 0: + logger.warning(f"Failed to extract any valid Markdown files from ZIP: {file_path}") + raise EmptyContentError(file_path=str(file_path)) + + logger.info( + f"Successfully merged {successful_extractions}/{len(md_files)} files" + ) + + # Join all parts with proper spacing + return "".join(merged_parts).strip() + + except EmptyContentError: + raise + except zipfile.BadZipFile as e: + raise ExtractionError( + message=f"Corrupted ZIP file: {file_path}", + details=str(e), + file_path=str(file_path), + ) + except Exception as e: + raise ExtractionError( + message=f"ZIP extraction failed: {str(e)}", + file_path=str(file_path), + ) + + def _get_markdown_files(self, zip_file: zipfile.ZipFile) -> List[str]: + """ + Get sorted list of Markdown files from ZIP, filtering hidden files. + + Args: + zip_file: Open ZipFile object + + Returns: + Sorted list of Markdown file paths + """ + md_files = [] + + for file_info in zip_file.filelist: + filename = file_info.filename + + # Skip directories + if filename.endswith('/'): + continue + + # Skip hidden files and __MACOSX + path_parts = Path(filename).parts + if any(part.startswith('.') or part.startswith('__') for part in path_parts): + logger.debug(f"Skipping hidden/system file: {filename}") + continue + + # Check for .md extension + if filename.lower().endswith('.md'): + md_files.append(filename) + + # Sort alphabetically for deterministic order + md_files.sort() + + return md_files + + def _extract_file_content( + self, + zip_file: zipfile.ZipFile, + filename: str, + ) -> str | None: + """ + Extract content from a single file in the ZIP with encoding detection. + + Args: + zip_file: Open ZipFile object + filename: Name of file to extract + + Returns: + File content as string, or None if extraction fails + """ + try: + # Read raw bytes + raw_content = zip_file.read(filename) + + # Try multiple encodings + for encoding in self._encodings: + try: + text = raw_content.decode(encoding) + logger.debug(f"Decoded {filename} with {encoding}") + return text + except UnicodeDecodeError: + continue + + # If all encodings fail, log warning and skip + logger.warning( + f"Failed to decode {filename} with any supported encoding, skipping" + ) + return None + + except Exception as e: + # Log error but continue processing other files + logger.warning(f"Error extracting {filename}: {str(e)}, skipping") + return None + + def _create_metadata(self, file_path: Path) -> DocumentMetadata: + """ + Create source-neutral document metadata from ZIP file. + + Args: + file_path: Path to the ZIP file + + Returns: + DocumentMetadata entity + """ + stat = file_path.stat() + + return DocumentMetadata( + source_id=str(file_path.absolute()), + source_type=SourceType.FILE, + display_name=file_path.name, + size_bytes=stat.st_size, + ) diff --git a/src/bootstrap.py b/src/bootstrap.py index 435cf98..e3dcf91 100644 --- a/src/bootstrap.py +++ b/src/bootstrap.py @@ -17,6 +17,7 @@ from .adapters.outgoing.extractors.docx_extractor import DocxExtractor from .adapters.outgoing.extractors.factory import ExtractorFactory from .adapters.outgoing.extractors.pdf_extractor import PDFExtractor from .adapters.outgoing.extractors.txt_extractor import TxtExtractor +from .adapters.outgoing.extractors.zip_extractor import ZipExtractor from .adapters.outgoing.persistence.in_memory_repository import ( InMemoryDocumentRepository, ) @@ -99,6 +100,7 @@ class ApplicationContainer: factory.register_extractor(PDFExtractor()) factory.register_extractor(DocxExtractor()) factory.register_extractor(TxtExtractor()) + factory.register_extractor(ZipExtractor()) logger.info( f"Registered extractors for: {factory.get_supported_types()}"