add zip extractor adapter

This commit is contained in:
m.dabbagh 2026-01-18 15:44:49 +03:30
parent f06370e0b9
commit 13b887260f
3 changed files with 300 additions and 1 deletions

View File

@ -423,7 +423,7 @@ async def health_check() -> HealthCheckResponse:
return HealthCheckResponse( return HealthCheckResponse(
status="healthy", status="healthy",
version="1.0.0", version="1.0.0",
supported_file_types=["pdf", "docx", "txt"], supported_file_types=["pdf", "docx", "txt", "zip"],
available_strategies=["fixed_size", "paragraph"], available_strategies=["fixed_size", "paragraph"],
) )

View File

@ -0,0 +1,297 @@
"""
ZIP Extractor - Concrete implementation for ZIP archive extraction.
This adapter implements the IExtractor port for ZIP files containing
Markdown documents. It merges all .md files into a single document.
"""
import logging
import zipfile
from pathlib import Path
from typing import List
from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
)
from ....core.domain.models import Document, DocumentMetadata, SourceType
from ....core.ports.outgoing.extractor import IExtractor
logger = logging.getLogger(__name__)
class ZipExtractor(IExtractor):
"""
Concrete ZIP extractor for archives containing Markdown files.
This adapter:
1. Opens ZIP archives and filters for .md files
2. Sorts files alphabetically for deterministic order
3. Merges all Markdown files into a single document
4. Inserts file source headers between merged content
5. Handles corrupted files gracefully
"""
def __init__(self) -> None:
"""Initialize ZIP extractor."""
self._supported_extensions = ['zip']
self._encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252']
logger.debug("ZipExtractor initialized")
def extract(self, file_path: Path) -> Document:
"""
Extract and merge Markdown files from ZIP archive.
Args:
file_path: Path to the ZIP file
Returns:
Document entity with merged content and metadata
Raises:
ExtractionError: If extraction fails
EmptyContentError: If no Markdown files could be extracted
"""
try:
logger.info(f"Extracting Markdown files from ZIP: {file_path}")
# Validate file
self._validate_file(file_path)
# Extract and merge markdown files
merged_text = self._extract_and_merge_markdown(file_path)
# Validate content
if not merged_text or not merged_text.strip():
raise EmptyContentError(file_path=str(file_path))
# Create metadata
metadata = self._create_metadata(file_path)
# Build document with raw_markdown
document = Document(raw_markdown=merged_text, metadata=metadata)
logger.info(
f"Successfully extracted {len(merged_text)} characters from {file_path.name}"
)
return document
except EmptyContentError:
raise
except ExtractionError:
raise
except Exception as e:
logger.error(f"ZIP extraction failed for {file_path}: {str(e)}")
raise ExtractionError(
message=f"Failed to extract Markdown from {file_path.name}",
details=str(e),
file_path=str(file_path),
)
def supports_file_type(self, file_extension: str) -> bool:
"""
Check if this extractor supports ZIP files.
Args:
file_extension: File extension (e.g., 'zip')
Returns:
True if ZIP files are supported
"""
return file_extension.lower() in self._supported_extensions
def get_supported_types(self) -> List[str]:
"""
Get list of supported file extensions.
Returns:
List containing 'zip'
"""
return self._supported_extensions.copy()
def _validate_file(self, file_path: Path) -> None:
"""
Validate file exists and is a valid ZIP archive.
Args:
file_path: Path to validate
Raises:
ExtractionError: If file is invalid
"""
if not file_path.exists():
raise ExtractionError(
message=f"File not found: {file_path}",
file_path=str(file_path),
)
if not file_path.is_file():
raise ExtractionError(
message=f"Path is not a file: {file_path}",
file_path=str(file_path),
)
if file_path.stat().st_size == 0:
raise EmptyContentError(file_path=str(file_path))
# Validate it's a valid ZIP file
if not zipfile.is_zipfile(file_path):
raise ExtractionError(
message=f"File is not a valid ZIP archive: {file_path}",
file_path=str(file_path),
)
def _extract_and_merge_markdown(self, file_path: Path) -> str:
"""
Extract all Markdown files from ZIP and merge into single string.
Args:
file_path: Path to ZIP file
Returns:
Merged Markdown content with file source headers
Raises:
ExtractionError: If ZIP extraction fails
"""
try:
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Get all markdown files, sorted alphabetically
md_files = self._get_markdown_files(zip_file)
if not md_files:
logger.warning(f"No .md files found in ZIP archive: {file_path}")
raise EmptyContentError(file_path=str(file_path))
logger.info(f"Found {len(md_files)} Markdown files in ZIP")
# Merge all files
merged_parts = []
successful_extractions = 0
for md_file in md_files:
content = self._extract_file_content(zip_file, md_file)
if content is not None:
# Add file source header
header = f"\n\n# File Source: {md_file}\n\n"
merged_parts.append(header)
merged_parts.append(content)
successful_extractions += 1
logger.debug(f"Successfully extracted: {md_file}")
if successful_extractions == 0:
logger.warning(f"Failed to extract any valid Markdown files from ZIP: {file_path}")
raise EmptyContentError(file_path=str(file_path))
logger.info(
f"Successfully merged {successful_extractions}/{len(md_files)} files"
)
# Join all parts with proper spacing
return "".join(merged_parts).strip()
except EmptyContentError:
raise
except zipfile.BadZipFile as e:
raise ExtractionError(
message=f"Corrupted ZIP file: {file_path}",
details=str(e),
file_path=str(file_path),
)
except Exception as e:
raise ExtractionError(
message=f"ZIP extraction failed: {str(e)}",
file_path=str(file_path),
)
def _get_markdown_files(self, zip_file: zipfile.ZipFile) -> List[str]:
"""
Get sorted list of Markdown files from ZIP, filtering hidden files.
Args:
zip_file: Open ZipFile object
Returns:
Sorted list of Markdown file paths
"""
md_files = []
for file_info in zip_file.filelist:
filename = file_info.filename
# Skip directories
if filename.endswith('/'):
continue
# Skip hidden files and __MACOSX
path_parts = Path(filename).parts
if any(part.startswith('.') or part.startswith('__') for part in path_parts):
logger.debug(f"Skipping hidden/system file: {filename}")
continue
# Check for .md extension
if filename.lower().endswith('.md'):
md_files.append(filename)
# Sort alphabetically for deterministic order
md_files.sort()
return md_files
def _extract_file_content(
self,
zip_file: zipfile.ZipFile,
filename: str,
) -> str | None:
"""
Extract content from a single file in the ZIP with encoding detection.
Args:
zip_file: Open ZipFile object
filename: Name of file to extract
Returns:
File content as string, or None if extraction fails
"""
try:
# Read raw bytes
raw_content = zip_file.read(filename)
# Try multiple encodings
for encoding in self._encodings:
try:
text = raw_content.decode(encoding)
logger.debug(f"Decoded {filename} with {encoding}")
return text
except UnicodeDecodeError:
continue
# If all encodings fail, log warning and skip
logger.warning(
f"Failed to decode {filename} with any supported encoding, skipping"
)
return None
except Exception as e:
# Log error but continue processing other files
logger.warning(f"Error extracting {filename}: {str(e)}, skipping")
return None
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create source-neutral document metadata from ZIP file.
Args:
file_path: Path to the ZIP file
Returns:
DocumentMetadata entity
"""
stat = file_path.stat()
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
size_bytes=stat.st_size,
)

View File

@ -17,6 +17,7 @@ from .adapters.outgoing.extractors.docx_extractor import DocxExtractor
from .adapters.outgoing.extractors.factory import ExtractorFactory from .adapters.outgoing.extractors.factory import ExtractorFactory
from .adapters.outgoing.extractors.pdf_extractor import PDFExtractor from .adapters.outgoing.extractors.pdf_extractor import PDFExtractor
from .adapters.outgoing.extractors.txt_extractor import TxtExtractor from .adapters.outgoing.extractors.txt_extractor import TxtExtractor
from .adapters.outgoing.extractors.zip_extractor import ZipExtractor
from .adapters.outgoing.persistence.in_memory_repository import ( from .adapters.outgoing.persistence.in_memory_repository import (
InMemoryDocumentRepository, InMemoryDocumentRepository,
) )
@ -99,6 +100,7 @@ class ApplicationContainer:
factory.register_extractor(PDFExtractor()) factory.register_extractor(PDFExtractor())
factory.register_extractor(DocxExtractor()) factory.register_extractor(DocxExtractor())
factory.register_extractor(TxtExtractor()) factory.register_extractor(TxtExtractor())
factory.register_extractor(ZipExtractor())
logger.info( logger.info(
f"Registered extractors for: {factory.get_supported_types()}" f"Registered extractors for: {factory.get_supported_types()}"