Compare commits

...

3 Commits

Author SHA1 Message Date
m.dabbagh
91f8035043 add s3 storage 2026-01-20 12:46:47 +03:30
m.dabbagh
0c09c79a2e refactor api routes 2026-01-19 22:03:36 +03:30
m.dabbagh
6086ddf818 add /chunk route 2026-01-19 21:54:23 +03:30
13 changed files with 833 additions and 321 deletions

3
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@ -8,10 +8,22 @@ marko==2.1.2
# Web Framework # Web Framework
fastapi==0.115.6 fastapi==0.115.6
uvicorn[standard]==0.34.0 uvicorn[standard]==0.34.0
# Utilities
python-multipart==0.0.20 python-multipart==0.0.20
# Document Processing - Extractors
PyPDF2==3.0.1 # PDF extraction
python-docx==1.1.2 # DOCX extraction
# Cloud Storage
boto3==1.35.94 # AWS S3 integration
botocore==1.35.94 # AWS SDK core (installed with boto3)
# Environment Variables
python-dotenv==1.0.1 # Load .env files
# HTTP Client (for testing)
requests==2.32.3 # API testing scripts
# Development Dependencies (optional) # Development Dependencies (optional)
pytest==8.3.4 pytest==8.3.4
pytest-asyncio==0.24.0 pytest-asyncio==0.24.0
@ -19,3 +31,7 @@ httpx==0.28.1
black==24.10.0 black==24.10.0
ruff==0.8.5 ruff==0.8.5
mypy==1.14.0 mypy==1.14.0
# Type Stubs for Development
types-requests==2.32.0.20241016
boto3-stubs[s3]==1.35.94

View File

@ -3,42 +3,52 @@ API Routes - Functional FastAPI routes for text processing.
This is the incoming adapter that translates HTTP requests into This is the incoming adapter that translates HTTP requests into
domain operations. Routes pull the service directly from bootstrap. domain operations. Routes pull the service directly from bootstrap.
Refactored for "Skinny Routes" pattern with:
- Global exception handling
- Dependency injection for common parameters
- Context managers for resource management
- Minimal route logic
""" """
import contextlib
import logging import logging
import shutil import shutil
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from uuid import UUID from typing import Iterator, List, Optional
from fastapi import APIRouter, FastAPI, File, Form, HTTPException, UploadFile, status from fastapi import APIRouter, Depends, FastAPI, File, Form, HTTPException, UploadFile, status
from fastapi.responses import JSONResponse
from ...core.config import get_settings
from ...core.domain.exceptions import ( from ...core.domain.exceptions import (
ChunkingError, ChunkingError,
DocumentNotFoundError,
DomainException, DomainException,
DocumentNotFoundError,
ExtractionError, ExtractionError,
ProcessingError, ProcessingError,
UnsupportedFileTypeError, UnsupportedFileTypeError,
) )
from ...core.domain.models import ChunkingMethod, ChunkingStrategy from ...core.domain.models import Chunk, ChunkingMethod, ChunkingStrategy, Document
from ...core.ports.incoming.text_processor import ITextProcessor from ...core.ports.incoming.text_processor import ITextProcessor
from .api_schemas import ( from .api_schemas import (
ChunkListResponse,
ChunkResponse, ChunkResponse,
DeleteDocumentResponse,
DocumentListResponse,
DocumentResponse, DocumentResponse,
ExtractAndChunkRequest,
ExtractAndChunkResponse,
HealthCheckResponse, HealthCheckResponse,
ProcessDocumentRequest,
ProcessDocumentResponse,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Create FastAPI application # =============================================================================
# Application Setup
# =============================================================================
# Load settings
settings = get_settings()
app = FastAPI( app = FastAPI(
title="Text Processor API", title="Text Processor API",
description="Text extraction and chunking system using Hexagonal Architecture", description="Text extraction and chunking system using Hexagonal Architecture",
@ -47,37 +57,131 @@ app = FastAPI(
redoc_url="/redoc", redoc_url="/redoc",
) )
# Create API router
router = APIRouter(prefix="/api/v1", tags=["Text Processing"]) router = APIRouter(prefix="/api/v1", tags=["Text Processing"])
def _get_service() -> ITextProcessor: # =============================================================================
""" # Global Exception Handler
Get the text processor service from bootstrap singleton. # =============================================================================
This function pulls the service directly without using FastAPI's Depends. @app.exception_handler(DomainException)
async def domain_exception_handler(request, exc: DomainException) -> JSONResponse:
Returns:
ITextProcessor: Core service instance
""" """
Global exception handler for all domain exceptions.
Maps domain exceptions to appropriate HTTP status codes.
"""
status_code_map = {
UnsupportedFileTypeError: status.HTTP_400_BAD_REQUEST,
ExtractionError: status.HTTP_422_UNPROCESSABLE_ENTITY,
ChunkingError: status.HTTP_422_UNPROCESSABLE_ENTITY,
ProcessingError: status.HTTP_500_INTERNAL_SERVER_ERROR,
DocumentNotFoundError: status.HTTP_404_NOT_FOUND,
}
status_code = status_code_map.get(type(exc), status.HTTP_500_INTERNAL_SERVER_ERROR)
logger.error(f"Domain exception: {type(exc).__name__}: {str(exc)}")
return JSONResponse(
status_code=status_code,
content={"detail": str(exc)},
)
# =============================================================================
# Helper Functions & Dependencies
# =============================================================================
def get_service() -> ITextProcessor:
"""Dependency: Get the text processor service from bootstrap."""
from ...bootstrap import get_processor_service from ...bootstrap import get_processor_service
return get_processor_service() return get_processor_service()
def _to_document_response(document) -> DocumentResponse: def get_chunking_strategy(
strategy_name: ChunkingMethod = Form(..., description="Chunking method"),
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000),
overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
) -> ChunkingStrategy:
"""Dependency: Create chunking strategy from form parameters."""
return ChunkingStrategy(
strategy_name=strategy_name,
chunk_size=chunk_size,
overlap_size=overlap_size,
respect_boundaries=respect_boundaries,
)
@contextlib.contextmanager
def managed_temp_file(file: UploadFile) -> Iterator[Path]:
""" """
Convert domain document to API response. Context manager for temporary file handling.
Creates temporary directory, copies uploaded file, yields path,
and ensures cleanup on exit.
Args: Args:
document: Domain Document entity file: Uploaded file from FastAPI
Returns: Yields:
DocumentResponse: API response model Path to temporary file with original filename
""" """
temp_dir = tempfile.mkdtemp()
filename = file.filename if file.filename else "uploaded_file.tmp"
temp_file_path = Path(temp_dir) / filename
try:
logger.debug(f"Creating temporary file: {temp_file_path}")
with open(temp_file_path, 'wb') as f:
shutil.copyfileobj(file.file, f)
yield temp_file_path
finally:
# Cleanup temporary directory
try:
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
def validate_markdown_source(file: Optional[UploadFile], text: Optional[str]) -> None:
"""
Validate that exactly one markdown source is provided.
Args:
file: Optional uploaded file
text: Optional text input
Raises:
HTTPException: If validation fails
"""
if not file and not text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Either 'file' or 'text' must be provided",
)
if file and text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Provide either 'file' or 'text', not both",
)
if file and file.filename and not file.filename.lower().endswith('.md'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported file type. Only .md files are accepted",
)
def to_document_response(document: Document) -> DocumentResponse:
"""Convert domain document to API response."""
from .api_schemas import DocumentMetadataResponse from .api_schemas import DocumentMetadataResponse
# Extract file type from display_name or source_id
display_name = document.metadata.display_name display_name = document.metadata.display_name
file_type = Path(display_name).suffix.lstrip('.') if '.' in display_name else 'unknown' file_type = Path(display_name).suffix.lstrip('.') if '.' in display_name else 'unknown'
@ -90,74 +194,85 @@ def _to_document_response(document) -> DocumentResponse:
file_size_bytes=document.metadata.size_bytes, file_size_bytes=document.metadata.size_bytes,
created_at=document.metadata.created_at.isoformat(), created_at=document.metadata.created_at.isoformat(),
author=document.metadata.author, author=document.metadata.author,
page_count=None, # Not available in new metadata model page_count=None,
), ),
is_processed=document.is_processed, is_processed=document.is_processed,
content_preview=document.get_content_preview(200), content_preview=document.get_content_preview(200),
download_url=document.download_url,
) )
def _to_chunk_response(chunk) -> ChunkResponse: def to_chunk_responses(chunks: List[Chunk]) -> List[ChunkResponse]:
"""Convert list of domain chunks to API responses."""
return [
ChunkResponse(
id=str(chunk.id),
document_id=str(chunk.document_id),
content=chunk.content,
sequence_number=chunk.sequence_number,
start_char=chunk.start_char,
end_char=chunk.end_char,
length=chunk.get_length(),
)
for chunk in chunks
]
# =============================================================================
# Skinny Routes
# =============================================================================
@router.post(
"/chunk",
response_model=ChunkListResponse,
status_code=status.HTTP_200_OK,
summary="Process Markdown from file upload or text input",
description="Unified endpoint: upload .md file or paste markdown text, then parse and chunk",
)
async def perform_chunking(
file: Optional[UploadFile] = File(None, description="Markdown file (.md) to upload"),
text: Optional[str] = Form(None, description="Markdown text to process", json_schema_extra={"x-textarea": True}),
title: str = Form("markdown_input", description="Optional title for the document"),
strategy: ChunkingStrategy = Depends(get_chunking_strategy),
service: ITextProcessor = Depends(get_service),
) -> ChunkListResponse:
""" """
Convert domain chunk to API response. Unified Markdown processing endpoint supporting both file upload and text input.
Args: Processing workflow:
chunk: Domain Chunk entity 1. Validates source (file or text, not both)
2. Extracts markdown content
Returns: 3. Parses markdown structure into sections
ChunkResponse: API response model 4. Persists document to repository
5. Chunks content according to strategy
6. Returns chunks with metadata
""" """
return ChunkResponse( # Validate source
id=str(chunk.id), validate_markdown_source(file, text)
document_id=str(chunk.document_id),
content=chunk.content,
sequence_number=chunk.sequence_number,
start_char=chunk.start_char,
end_char=chunk.end_char,
length=chunk.get_length(),
)
# Process file upload
if file:
logger.info(f"Processing uploaded markdown file: {file.filename}")
with managed_temp_file(file) as temp_path:
chunks = service.extract_and_chunk(temp_path, strategy)
def _map_domain_exception(exception: DomainException) -> HTTPException: # Process text input
"""
Map domain exceptions to HTTP exceptions.
Args:
exception: Domain exception
Returns:
HTTPException: Corresponding HTTP exception
"""
if isinstance(exception, UnsupportedFileTypeError):
return HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(exception),
)
elif isinstance(exception, ExtractionError):
return HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(exception),
)
elif isinstance(exception, ChunkingError):
return HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(exception),
)
elif isinstance(exception, ProcessingError):
return HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exception),
)
elif isinstance(exception, DocumentNotFoundError):
return HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exception),
)
else: else:
return HTTPException( if not text or not text.strip():
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, raise HTTPException(
detail=str(exception), status_code=status.HTTP_400_BAD_REQUEST,
) detail="Markdown content cannot be empty",
)
logger.info(f"Processing markdown text input: {len(text)} characters")
chunks = service.process_text_to_chunks(text, strategy, title)
logger.info(f"Successfully processed markdown: {len(chunks)} chunks created")
return ChunkListResponse(
chunks=to_chunk_responses(chunks),
total_chunks=len(chunks),
)
@router.post( @router.post(
@ -168,161 +283,60 @@ def _map_domain_exception(exception: DomainException) -> HTTPException:
description="Upload a file and extract text content with metadata", description="Upload a file and extract text content with metadata",
) )
async def extract_document( async def extract_document(
file: UploadFile = File(..., description="Document file to extract (pdf, docx, txt, zip)"), file: UploadFile = File(..., description="Document file to extract (pdf, docx, txt, md, zip)"),
service: ITextProcessor = Depends(get_service),
) -> DocumentResponse: ) -> DocumentResponse:
""" """
Extract text content from uploaded file. Extract text content from uploaded file.
This endpoint handles file extraction only: This endpoint handles file extraction only:
1. Accepts file upload (PDF, DOCX, TXT, ZIP) 1. Accepts file upload (PDF, DOCX, TXT, MD, ZIP)
2. Extracts raw text content using appropriate extractor 2. Extracts raw text content using appropriate extractor
3. Returns Document entity with metadata (no parsing) 3. Returns Document entity with metadata (no parsing)
Args:
file: Uploaded file
Returns:
Response with extracted document
Raises:
HTTPException: If extraction fails
""" """
temp_file_path = None logger.info(f"Extracting uploaded file: {file.filename}")
try: with managed_temp_file(file) as temp_path:
# Pull service from bootstrap document = service.extract_document(temp_path)
service: ITextProcessor = _get_service()
# Create temporary directory and file with original filename logger.info(f"Successfully extracted {len(document.raw_markdown)} characters from {file.filename}")
temp_dir = tempfile.mkdtemp()
original_filename = file.filename if file.filename else "uploaded_file.tmp"
temp_file_path = Path(temp_dir) / original_filename
# Copy uploaded file to temporary location return to_document_response(document)
logger.info(f"Extracting uploaded file: {file.filename}")
with open(temp_file_path, 'wb') as temp_file:
shutil.copyfileobj(file.file, temp_file)
# Execute extraction only (no parsing)
document = service.extract_document(temp_file_path)
# Convert to response
document_response = _to_document_response(document)
logger.info(f"Successfully extracted {file.filename}: {len(document.raw_markdown)} characters")
return document_response
except DomainException as e:
raise _map_domain_exception(e)
except Exception as e:
logger.error(f"Unexpected error extracting file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}",
)
finally:
# Clean up temporary file and directory
if temp_file_path and temp_file_path.exists():
try:
temp_dir = temp_file_path.parent
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
@router.post( @router.post(
"/process-file", "/process-file",
response_model=ExtractAndChunkResponse, response_model=ChunkListResponse,
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
summary="Process uploaded file (extraction to chunking)", summary="Process uploaded file (extraction to chunking)",
description="Upload a file, extract text, parse markdown, and return chunks", description="Upload a file, extract text, parse markdown, and return chunks",
) )
async def process_file( async def process_file(
file: UploadFile = File(..., description="Document file to process (pdf, docx, txt, zip)"), file: UploadFile = File(..., description="Document file to process (pdf, docx, txt, md, zip)"),
strategy_name: ChunkingMethod = Form(..., description="Chunking method"), strategy: ChunkingStrategy = Depends(get_chunking_strategy),
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000), service: ITextProcessor = Depends(get_service),
overlap_size: int = Form(0, description="Overlap between chunks", ge=0), ) -> ChunkListResponse:
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
) -> ExtractAndChunkResponse:
""" """
Complete file processing pipeline: Upload Extract Parse Chunk. Complete file processing pipeline: Upload Extract Parse Chunk.
This endpoint handles the full document processing workflow: This endpoint handles the full document processing workflow:
1. Accepts file upload (PDF, DOCX, TXT, ZIP) 1. Accepts file upload (PDF, DOCX, TXT, MD, ZIP)
2. Extracts text content using appropriate extractor 2. Extracts text content using appropriate extractor
3. Parses markdown structure into sections 3. Parses markdown structure into sections
4. Chunks content according to strategy 4. Chunks content according to strategy
5. Returns chunks with metadata 5. Returns chunks with metadata
Args:
file: Uploaded file
strategy_name: Name of chunking strategy
chunk_size: Target chunk size
overlap_size: Overlap between chunks
respect_boundaries: Whether to respect boundaries
Returns:
Response with chunks
Raises:
HTTPException: If extraction or chunking fails
""" """
temp_file_path = None logger.info(f"Processing uploaded file: {file.filename}")
try: with managed_temp_file(file) as temp_path:
# Pull service from bootstrap chunks = service.extract_and_chunk(temp_path, strategy)
service: ITextProcessor = _get_service()
# Create temporary directory and file with original filename logger.info(f"Successfully processed {file.filename}: {len(chunks)} chunks created")
temp_dir = tempfile.mkdtemp()
original_filename = file.filename if file.filename else "uploaded_file.tmp"
temp_file_path = Path(temp_dir) / original_filename
# Copy uploaded file to temporary location return ChunkListResponse(
logger.info(f"Processing uploaded file: {file.filename}") chunks=to_chunk_responses(chunks),
with open(temp_file_path, 'wb') as temp_file: total_chunks=len(chunks),
shutil.copyfileobj(file.file, temp_file) )
# Create chunking strategy
strategy = ChunkingStrategy(
strategy_name=strategy_name,
chunk_size=chunk_size,
overlap_size=overlap_size,
respect_boundaries=respect_boundaries,
)
# Execute complete pipeline: extract → parse → chunk
chunks = service.extract_and_chunk(temp_file_path, strategy)
# Convert to response
chunk_responses = [_to_chunk_response(c) for c in chunks]
logger.info(f"Successfully processed {file.filename}: {len(chunks)} chunks created")
return ExtractAndChunkResponse(
chunks=chunk_responses,
total_chunks=len(chunk_responses),
)
except DomainException as e:
raise _map_domain_exception(e)
except Exception as e:
logger.error(f"Unexpected error processing file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}",
)
finally:
# Clean up temporary file and directory
if temp_file_path and temp_file_path.exists():
try:
temp_dir = temp_file_path.parent
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
@router.get( @router.get(
@ -333,20 +347,19 @@ async def process_file(
description="Check API health and configuration", description="Check API health and configuration",
) )
async def health_check() -> HealthCheckResponse: async def health_check() -> HealthCheckResponse:
""" """Health check endpoint."""
Health check endpoint.
Returns:
Health status and configuration
"""
return HealthCheckResponse( return HealthCheckResponse(
status="healthy", status="healthy",
version="1.0.0", version="1.0.0",
supported_file_types=["pdf", "docx", "txt", "zip"], supported_file_types=["pdf", "docx", "txt", "md", "markdown", "zip"],
available_strategies=["fixed_size", "paragraph"], available_strategies=["fixed_size", "paragraph"],
) )
# =============================================================================
# Application Setup
# =============================================================================
# Include router in app # Include router in app
app.include_router(router) app.include_router(router)

View File

@ -88,6 +88,10 @@ class DocumentResponse(BaseModel):
..., ...,
description="Preview of content (first 200 chars)", description="Preview of content (first 200 chars)",
) )
download_url: Optional[str] = Field(
None,
description="Presigned URL for downloading the markdown file (expires in 1 hour)",
)
class ChunkResponse(BaseModel): class ChunkResponse(BaseModel):
@ -109,12 +113,12 @@ class ProcessDocumentResponse(BaseModel):
message: str = Field(default="Document processed successfully") message: str = Field(default="Document processed successfully")
class ExtractAndChunkResponse(BaseModel): class ChunkListResponse(BaseModel):
"""Response model for extract and chunk operation.""" """Response model for extract and chunk operation."""
chunks: List[ChunkResponse] chunks: List[ChunkResponse]
total_chunks: int total_chunks: int
message: str = Field(default="Document extracted and chunked successfully") message: str = Field(default="Document chunked successfully")
class DocumentListResponse(BaseModel): class DocumentListResponse(BaseModel):

View File

@ -0,0 +1,186 @@
"""
Markdown Extractor - Concrete implementation for Markdown file extraction.
This adapter implements the IExtractor port for .md files.
"""
import logging
from pathlib import Path
from typing import List
from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
)
from ....core.domain.models import Document, DocumentMetadata, SourceType
from ....core.ports.outgoing.extractor import IExtractor
logger = logging.getLogger(__name__)
class MarkdownExtractor(IExtractor):
"""
Concrete Markdown extractor for .md files.
This adapter:
1. Reads .md files directly
2. Handles multiple encodings
3. Returns Document with raw markdown content
"""
def __init__(self) -> None:
"""Initialize Markdown extractor."""
self._supported_extensions = ['md', 'markdown']
self._encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252']
logger.debug("MarkdownExtractor initialized")
def extract(self, file_path: Path) -> Document:
"""
Extract text content from Markdown file.
Args:
file_path: Path to the .md file
Returns:
Document entity with raw markdown and metadata
Raises:
ExtractionError: If extraction fails
EmptyContentError: If file is empty
"""
try:
logger.info(f"Extracting markdown file: {file_path}")
# Validate file
self._validate_file(file_path)
# Read markdown content
markdown_text = self._read_file(file_path)
# Validate content
if not markdown_text or not markdown_text.strip():
raise EmptyContentError(file_path=str(file_path))
# Create metadata
metadata = self._create_metadata(file_path)
# Build document with raw_markdown
document = Document(raw_markdown=markdown_text, metadata=metadata)
logger.info(
f"Successfully extracted {len(markdown_text)} characters from {file_path.name}"
)
return document
except EmptyContentError:
raise
except ExtractionError:
raise
except Exception as e:
logger.error(f"Markdown extraction failed for {file_path}: {str(e)}")
raise ExtractionError(
message=f"Failed to extract markdown from {file_path.name}",
details=str(e),
file_path=str(file_path),
)
def supports_file_type(self, file_extension: str) -> bool:
"""
Check if this extractor supports Markdown files.
Args:
file_extension: File extension (e.g., 'md', 'markdown')
Returns:
True if Markdown files are supported
"""
return file_extension.lower() in self._supported_extensions
def get_supported_types(self) -> List[str]:
"""
Get list of supported file extensions.
Returns:
List containing 'md' and 'markdown'
"""
return self._supported_extensions.copy()
def _validate_file(self, file_path: Path) -> None:
"""
Validate file exists and is readable.
Args:
file_path: Path to validate
Raises:
ExtractionError: If file is invalid
"""
if not file_path.exists():
raise ExtractionError(
message=f"File not found: {file_path}",
file_path=str(file_path),
)
if not file_path.is_file():
raise ExtractionError(
message=f"Path is not a file: {file_path}",
file_path=str(file_path),
)
if file_path.stat().st_size == 0:
raise EmptyContentError(file_path=str(file_path))
def _read_file(self, file_path: Path) -> str:
"""
Read file content with encoding detection.
Args:
file_path: Path to markdown file
Returns:
File content as string
Raises:
ExtractionError: If reading fails
"""
# Try multiple encodings
for encoding in self._encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
text = f.read()
logger.debug(f"Successfully read {file_path.name} with {encoding}")
return text
except UnicodeDecodeError:
continue
except Exception as e:
logger.error(f"Error reading {file_path.name}: {str(e)}")
raise ExtractionError(
message=f"Failed to read file: {file_path.name}",
details=str(e),
file_path=str(file_path),
)
# If all encodings fail
raise ExtractionError(
message=f"Failed to decode {file_path.name} with any supported encoding",
file_path=str(file_path),
)
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create document metadata from markdown file.
Args:
file_path: Path to the markdown file
Returns:
DocumentMetadata entity
"""
stat = file_path.stat()
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
size_bytes=stat.st_size,
)

View File

@ -0,0 +1 @@
"""Outgoing storage adapters."""

View File

@ -0,0 +1,216 @@
"""
S3 Storage Adapter - Concrete implementation using AWS S3.
This adapter implements the IFileStorage port using boto3 for AWS S3 operations.
"""
import logging
from typing import Optional
import boto3
from botocore.exceptions import ClientError
from ....core.config import Settings
from ....core.domain.exceptions import ProcessingError
from ....core.ports.outgoing.file_storage import IFileStorage
logger = logging.getLogger(__name__)
class S3StorageAdapter(IFileStorage):
"""
Concrete S3 storage adapter implementation.
This adapter:
1. Uploads content to AWS S3
2. Generates presigned URLs for secure downloads
3. Handles S3 operations with proper error handling
"""
def __init__(self, settings: Settings) -> None:
"""
Initialize S3 storage adapter.
Args:
settings: Application settings with S3 configuration
Raises:
ValueError: If S3 bucket is not configured
"""
if not settings.S3_BUCKET:
raise ValueError("S3_BUCKET must be configured in settings")
self.bucket_name = settings.S3_BUCKET
self.region = settings.S3_REGION
self.expiration_seconds = settings.S3_PRESIGNED_URL_EXPIRATION
self.upload_path_prefix = settings.S3_UPLOAD_PATH_PREFIX
# Build boto3 client config
client_kwargs = {"region_name": self.region}
# Add custom endpoint (for MinIO)
if settings.S3_ENDPOINT_URL:
client_kwargs["endpoint_url"] = settings.S3_ENDPOINT_URL
logger.debug(f"Using custom S3 endpoint: {settings.S3_ENDPOINT_URL}")
# Add credentials if provided (otherwise boto3 uses default credential chain)
if settings.S3_ACCESS_KEY and settings.S3_SECRET_KEY:
client_kwargs["aws_access_key_id"] = settings.S3_ACCESS_KEY
client_kwargs["aws_secret_access_key"] = settings.S3_SECRET_KEY
logger.debug("Using explicit S3 credentials from settings")
else:
logger.debug("Using default AWS credential chain (IAM role, AWS CLI, etc.)")
# Initialize S3 client
self.s3_client = boto3.client("s3", **client_kwargs)
logger.info(
f"S3StorageAdapter initialized: bucket={self.bucket_name}, "
f"region={self.region}, expiration={self.expiration_seconds}s"
)
def upload_content(self, content: str, destination_path: str) -> str:
"""
Upload text content to S3 and return presigned download URL.
Args:
content: Text content to upload (markdown)
destination_path: S3 object key (e.g., "extractions/doc123.md")
Returns:
Presigned download URL valid for 1 hour
Raises:
ProcessingError: If upload fails
"""
try:
logger.info(f"Uploading content to S3: {destination_path}")
# Upload content to S3
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=destination_path,
Body=content.encode("utf-8"),
ContentType="text/markdown",
ContentDisposition=f'attachment; filename="{self._get_filename(destination_path)}"',
)
logger.info(f"Successfully uploaded to S3: s3://{self.bucket_name}/{destination_path}")
# Generate presigned URL
presigned_url = self._generate_presigned_url(destination_path)
logger.info(f"Generated presigned URL (expires in {self.expiration_seconds}s)")
return presigned_url
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
logger.error(f"S3 upload failed: {error_code} - {str(e)}")
raise ProcessingError(
message=f"Failed to upload to S3: {error_code}",
details=str(e),
)
except Exception as e:
logger.error(f"Unexpected error during S3 upload: {str(e)}")
raise ProcessingError(
message="Failed to upload to S3",
details=str(e),
)
def delete_file(self, file_path: str) -> bool:
"""
Delete a file from S3.
Args:
file_path: S3 object key to delete
Returns:
True if deleted successfully, False otherwise
"""
try:
logger.info(f"Deleting file from S3: {file_path}")
self.s3_client.delete_object(
Bucket=self.bucket_name,
Key=file_path,
)
logger.info(f"Successfully deleted from S3: {file_path}")
return True
except ClientError as e:
logger.error(f"S3 delete failed: {str(e)}")
return False
except Exception as e:
logger.error(f"Unexpected error during S3 delete: {str(e)}")
return False
def file_exists(self, file_path: str) -> bool:
"""
Check if a file exists in S3.
Args:
file_path: S3 object key to check
Returns:
True if file exists, False otherwise
"""
try:
self.s3_client.head_object(
Bucket=self.bucket_name,
Key=file_path,
)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
logger.error(f"Error checking S3 file existence: {str(e)}")
return False
except Exception as e:
logger.error(f"Unexpected error checking S3 file: {str(e)}")
return False
def _generate_presigned_url(self, object_key: str) -> str:
"""
Generate presigned URL for S3 object.
Args:
object_key: S3 object key
Returns:
Presigned download URL
Raises:
ProcessingError: If URL generation fails
"""
try:
presigned_url = self.s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": self.bucket_name,
"Key": object_key,
},
ExpiresIn=self.expiration_seconds,
)
return presigned_url
except ClientError as e:
logger.error(f"Failed to generate presigned URL: {str(e)}")
raise ProcessingError(
message="Failed to generate download URL",
details=str(e),
)
def _get_filename(self, path: str) -> str:
"""
Extract filename from path.
Args:
path: Full path (e.g., "extractions/doc123.md")
Returns:
Filename only (e.g., "doc123.md")
"""
return path.split("/")[-1] if "/" in path else path

View File

@ -7,6 +7,7 @@ The Core never imports Adapters - only the Bootstrap does.
The ApplicationContainer manages ONLY: The ApplicationContainer manages ONLY:
- Core Services - Core Services
- Outgoing Adapters (Extractors, Chunkers, Repository) - Outgoing Adapters (Extractors, Chunkers, Repository)
- Configuration (Settings)
""" """
import logging import logging
@ -15,12 +16,15 @@ from .adapters.outgoing.chunkers.fixed_size_chunker import FixedSizeChunker
from .adapters.outgoing.chunkers.paragraph_chunker import ParagraphChunker from .adapters.outgoing.chunkers.paragraph_chunker import ParagraphChunker
from .adapters.outgoing.extractors.docx_extractor import DocxExtractor from .adapters.outgoing.extractors.docx_extractor import DocxExtractor
from .adapters.outgoing.extractors.factory import ExtractorFactory from .adapters.outgoing.extractors.factory import ExtractorFactory
from .adapters.outgoing.extractors.markdown_extractor import MarkdownExtractor
from .adapters.outgoing.extractors.pdf_extractor import PDFExtractor from .adapters.outgoing.extractors.pdf_extractor import PDFExtractor
from .adapters.outgoing.extractors.txt_extractor import TxtExtractor from .adapters.outgoing.extractors.txt_extractor import TxtExtractor
from .adapters.outgoing.extractors.zip_extractor import ZipExtractor from .adapters.outgoing.extractors.zip_extractor import ZipExtractor
from .adapters.outgoing.persistence.in_memory_repository import ( from .adapters.outgoing.persistence.in_memory_repository import (
InMemoryDocumentRepository, InMemoryDocumentRepository,
) )
from .adapters.outgoing.storage.s3_storage_adapter import S3StorageAdapter
from .core.config import Settings, get_settings
from .core.ports.incoming.text_processor import ITextProcessor from .core.ports.incoming.text_processor import ITextProcessor
from .core.services.document_processor_service import DocumentProcessorService from .core.services.document_processor_service import DocumentProcessorService
from .shared.logging_config import setup_logging from .shared.logging_config import setup_logging
@ -38,32 +42,47 @@ class ApplicationContainer:
Dependency Injection Container for Core and Outgoing Adapters. Dependency Injection Container for Core and Outgoing Adapters.
This container manages the lifecycle and dependencies of: This container manages the lifecycle and dependencies of:
- Configuration (Settings)
- Core Domain Services - Core Domain Services
- Outgoing Adapters (Extractors, Chunkers, Repository) - Outgoing Adapters (Extractors, Chunkers, Repository, Storage)
""" """
def __init__(self, log_level: str = "INFO") -> None: def __init__(self, settings: Settings | None = None) -> None:
""" """
Initialize the application container. Initialize the application container.
Args: Args:
log_level: Logging level for the application settings: Application settings (uses singleton if not provided)
""" """
# Setup logging first # Load settings (singleton)
setup_logging(level=log_level) self._settings = settings or get_settings()
# Setup logging
setup_logging(level=self._settings.LOG_LEVEL)
logger.info("Initializing ApplicationContainer") logger.info("Initializing ApplicationContainer")
logger.debug(f"Configuration: bucket={self._settings.S3_BUCKET}, region={self._settings.S3_REGION}")
# Create Outgoing Adapters # Create Outgoing Adapters
self._repository = self._create_repository() self._repository = self._create_repository()
self._extractor_factory = self._create_extractor_factory() self._extractor_factory = self._create_extractor_factory()
self._chunking_context = self._create_chunking_context() self._chunking_context = self._create_chunking_context()
self._file_storage = self._create_file_storage()
# Create Core Service (depends only on Ports) # Create Core Service (depends only on Ports)
self._text_processor_service = self._create_text_processor_service() self._text_processor_service = self._create_text_processor_service()
logger.info("ApplicationContainer initialized successfully") logger.info("ApplicationContainer initialized successfully")
@property
def settings(self) -> Settings:
"""
Get application settings.
Returns:
Settings: Application configuration
"""
return self._settings
@property @property
def text_processor_service(self) -> ITextProcessor: def text_processor_service(self) -> ITextProcessor:
""" """
@ -100,6 +119,7 @@ class ApplicationContainer:
factory.register_extractor(PDFExtractor()) factory.register_extractor(PDFExtractor())
factory.register_extractor(DocxExtractor()) factory.register_extractor(DocxExtractor())
factory.register_extractor(TxtExtractor()) factory.register_extractor(TxtExtractor())
factory.register_extractor(MarkdownExtractor())
factory.register_extractor(ZipExtractor()) factory.register_extractor(ZipExtractor())
logger.info( logger.info(
@ -130,6 +150,16 @@ class ApplicationContainer:
return context return context
def _create_file_storage(self) -> S3StorageAdapter:
"""
Create and configure the file storage adapter.
Returns:
Configured S3 storage adapter
"""
logger.debug("Creating S3StorageAdapter")
return S3StorageAdapter(settings=self._settings)
def _create_text_processor_service(self) -> DocumentProcessorService: def _create_text_processor_service(self) -> DocumentProcessorService:
""" """
Create the core text processor service. Create the core text processor service.
@ -144,6 +174,8 @@ class ApplicationContainer:
extractor_factory=self._extractor_factory, extractor_factory=self._extractor_factory,
chunking_context=self._chunking_context, chunking_context=self._chunking_context,
repository=self._repository, repository=self._repository,
file_storage=self._file_storage,
settings=self._settings,
) )
@ -165,12 +197,12 @@ def get_processor_service() -> ITextProcessor:
if _container is None: if _container is None:
logger.info("Lazy initializing ApplicationContainer (first access)") logger.info("Lazy initializing ApplicationContainer (first access)")
_container = ApplicationContainer(log_level="INFO") _container = ApplicationContainer()
return _container.text_processor_service return _container.text_processor_service
def create_application(log_level: str = "INFO") -> ApplicationContainer: def create_application(settings: Settings | None = None) -> ApplicationContainer:
""" """
Factory function to create a fully wired application container. Factory function to create a fully wired application container.
@ -178,14 +210,14 @@ def create_application(log_level: str = "INFO") -> ApplicationContainer:
For API routes, use get_processor_service() instead. For API routes, use get_processor_service() instead.
Args: Args:
log_level: Logging level for the application settings: Application settings (uses singleton if not provided)
Returns: Returns:
Configured application container Configured application container
Example: Example:
>>> container = create_application(log_level="DEBUG") >>> container = create_application()
>>> service = container.text_processor_service >>> service = container.text_processor_service
""" """
logger.info("Creating application container via factory") logger.info("Creating application container via factory")
return ApplicationContainer(log_level=log_level) return ApplicationContainer(settings=settings)

41
src/core/config.py Normal file
View File

@ -0,0 +1,41 @@
"""S3 Configuration."""
from typing import Optional
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""S3/MinIO settings loaded from environment variables or .env file."""
S3_BUCKET: str = "bi-chatbot"
S3_REGION: str = "us-east-1"
S3_ACCESS_KEY: Optional[str] = "bi-chatbot"
S3_SECRET_KEY: Optional[str] = "9ixloSaqtYTkfmrJzE"
S3_ENDPOINT_URL: Optional[str] = "https://cdn.d.aiengines.ir"
S3_PRESIGNED_URL_EXPIRATION: int = 3600
S3_UPLOAD_PATH_PREFIX: str = "extractions"
LOG_LEVEL: str = "INFO"
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True,
extra="ignore",
)
def get_s3_path(self, document_id: str) -> str:
"""Generate S3 path for a document."""
return f"{self.S3_UPLOAD_PATH_PREFIX}/{document_id}.md"
_settings_instance: Optional[Settings] = None
def get_settings() -> Settings:
"""Get or create singleton settings instance."""
global _settings_instance
if _settings_instance is None:
_settings_instance = Settings()
return _settings_instance

View File

@ -17,6 +17,7 @@ class SourceType(str, Enum):
"""Enumeration of supported source types.""" """Enumeration of supported source types."""
FILE = "file" FILE = "file"
WEB = "web" WEB = "web"
TEXT = "text"
class ChunkingMethod(str, Enum): class ChunkingMethod(str, Enum):
@ -230,6 +231,8 @@ class Document(BaseModel):
sections: Parsed structured sections from Markdown sections: Parsed structured sections from Markdown
metadata: Associated metadata metadata: Associated metadata
is_processed: Flag indicating if document has been processed is_processed: Flag indicating if document has been processed
download_url: Optional presigned URL for downloading the markdown file
""" """
id: UUID = Field(default_factory=uuid4, description="Unique document ID") id: UUID = Field(default_factory=uuid4, description="Unique document ID")
raw_markdown: str = Field(..., description="Raw Markdown content") raw_markdown: str = Field(..., description="Raw Markdown content")
@ -239,6 +242,7 @@ class Document(BaseModel):
) )
metadata: DocumentMetadata = Field(..., description="Document metadata") metadata: DocumentMetadata = Field(..., description="Document metadata")
is_processed: bool = Field(default=False, description="Processing status") is_processed: bool = Field(default=False, description="Processing status")
download_url: Optional[str] = Field(None, description="Presigned download URL")
model_config = { model_config = {
"frozen": False, # Allow mutation for processing status "frozen": False, # Allow mutation for processing status

View File

@ -20,29 +20,6 @@ class ITextProcessor(ABC):
the entry point into the core domain logic. the entry point into the core domain logic.
""" """
@abstractmethod
def process_document(
self,
file_path: Path,
chunking_strategy: ChunkingStrategy,
) -> Document:
"""
Process a document by extracting text and storing it.
Args:
file_path: Path to the document file
chunking_strategy: Strategy configuration for chunking
Returns:
Processed Document entity
Raises:
ExtractionError: If text extraction fails
ProcessingError: If document processing fails
UnsupportedFileTypeError: If file type is not supported
"""
pass
@abstractmethod @abstractmethod
def extract_and_chunk( def extract_and_chunk(
self, self,

View File

@ -0,0 +1,59 @@
"""
Outgoing Port - File Storage Interface.
This port defines the contract for storing files and generating download URLs.
Implementations can use S3, Azure Blob, GCS, or local filesystem.
"""
from abc import ABC, abstractmethod
class IFileStorage(ABC):
"""
Port interface for file storage operations.
This abstraction allows the core domain to store files without
depending on specific storage implementations (S3, Azure, etc.).
"""
@abstractmethod
def upload_content(self, content: str, destination_path: str) -> str:
"""
Upload text content to storage and return a download URL.
Args:
content: Text content to upload (e.g., markdown)
destination_path: Destination path in storage (e.g., "extractions/doc123.md")
Returns:
Secure download URL (presigned URL for cloud storage)
Raises:
StorageError: If upload fails
"""
pass
@abstractmethod
def delete_file(self, file_path: str) -> bool:
"""
Delete a file from storage.
Args:
file_path: Path to file in storage
Returns:
True if deleted successfully, False otherwise
"""
pass
@abstractmethod
def file_exists(self, file_path: str) -> bool:
"""
Check if a file exists in storage.
Args:
file_path: Path to file in storage
Returns:
True if file exists, False otherwise
"""
pass

View File

@ -9,6 +9,7 @@ from pathlib import Path
from typing import List from typing import List
from uuid import UUID from uuid import UUID
from ..config import Settings
from ..domain import logic_utils from ..domain import logic_utils
from ..domain.exceptions import ( from ..domain.exceptions import (
DocumentNotFoundError, DocumentNotFoundError,
@ -20,6 +21,7 @@ from ..domain.models import Chunk, ChunkingStrategy, Document, SourceFile
from ..ports.incoming.text_processor import ITextProcessor from ..ports.incoming.text_processor import ITextProcessor
from ..ports.outgoing.chunking_context import IChunkingContext from ..ports.outgoing.chunking_context import IChunkingContext
from ..ports.outgoing.extractor_factory import IExtractorFactory from ..ports.outgoing.extractor_factory import IExtractorFactory
from ..ports.outgoing.file_storage import IFileStorage
from ..ports.outgoing.repository import IDocumentRepository from ..ports.outgoing.repository import IDocumentRepository
@ -39,6 +41,8 @@ class DocumentProcessorService(ITextProcessor):
extractor_factory: IExtractorFactory, extractor_factory: IExtractorFactory,
chunking_context: IChunkingContext, chunking_context: IChunkingContext,
repository: IDocumentRepository, repository: IDocumentRepository,
file_storage: IFileStorage,
settings: Settings,
) -> None: ) -> None:
""" """
Initialize the document processor service. Initialize the document processor service.
@ -47,77 +51,16 @@ class DocumentProcessorService(ITextProcessor):
extractor_factory: Factory for creating appropriate extractors extractor_factory: Factory for creating appropriate extractors
chunking_context: Context for managing chunking strategies chunking_context: Context for managing chunking strategies
repository: Repository for document persistence repository: Repository for document persistence
file_storage: File storage for uploading extracted content
settings: Application settings for configuration
""" """
self._extractor_factory = extractor_factory self._extractor_factory = extractor_factory
self._chunking_context = chunking_context self._chunking_context = chunking_context
self._repository = repository self._repository = repository
self._file_storage = file_storage
self._settings = settings
logger.info("DocumentProcessorService initialized") logger.info("DocumentProcessorService initialized")
def process_document(
self,
file_path: Path,
chunking_strategy: ChunkingStrategy,
) -> Document:
"""
Process a document using the stateless pipeline.
Pipeline Order:
1. Extract Document with raw_markdown and metadata (via Adapter)
2. Parse Markdown into DocumentSection objects
3. Update Document with sections
4. Validate and persist Document
5. Mark as processed
Args:
file_path: Path to the document file
chunking_strategy: Strategy configuration (for metadata)
Returns:
Fully processed Document entity
Raises:
ExtractionError: If text extraction fails
ProcessingError: If document processing fails
UnsupportedFileTypeError: If file type is not supported
"""
try:
logger.info(f"Processing document: {file_path}")
# Step 1: Extract Document with raw_markdown and metadata
document = self._extract_document(file_path)
# Step 2: Parse Markdown into structured sections
sections = parse_markdown(document.raw_markdown)
logger.debug(f"Parsed {len(sections)} sections from document")
# Step 3: Update Document with sections
document = document.model_copy(update={"sections": sections})
# Step 4: Validate document content
document.validate_content()
# Step 5: Persist to repository
saved_document = self._repository.save(document)
# Step 6: Mark as processed
saved_document.mark_as_processed()
self._repository.save(saved_document)
logger.info(
f"Document processed successfully: {saved_document.id} "
f"({len(sections)} sections)"
)
return saved_document
except ExtractionError:
raise
except Exception as e:
logger.error(f"Failed to process document: {str(e)}")
raise ProcessingError(
message="Document processing failed",
details=str(e),
)
def extract_and_chunk( def extract_and_chunk(
self, self,
file_path: Path, file_path: Path,
@ -167,28 +110,45 @@ class DocumentProcessorService(ITextProcessor):
def extract_document(self, file_path: Path) -> Document: def extract_document(self, file_path: Path) -> Document:
""" """
Extract text content from document without parsing or chunking. Extract text content from document and upload to S3.
This method only performs extraction: This method:
1. Extracts raw text content from file 1. Extracts raw text content from file
2. Creates Document entity with metadata 2. Uploads markdown to S3
3. Returns Document with raw_markdown (no sections) 3. Generates presigned download URL
4. Returns Document with raw_markdown and download_url
Args: Args:
file_path: Path to the document file file_path: Path to the document file
Returns: Returns:
Document entity with raw markdown Document entity with raw markdown and download URL
Raises: Raises:
ExtractionError: If text extraction fails ExtractionError: If text extraction fails
UnsupportedFileTypeError: If file type is not supported UnsupportedFileTypeError: If file type is not supported
ProcessingError: If S3 upload fails
""" """
try: try:
logger.info(f"Extracting document: {file_path}") logger.info(f"Extracting document: {file_path}")
# Extract document
document = self._extract_document(file_path) document = self._extract_document(file_path)
logger.info(f"Successfully extracted {len(document.raw_markdown)} characters") logger.info(f"Successfully extracted {len(document.raw_markdown)} characters")
# Upload to S3 and get download URL
destination_path = self._settings.get_s3_path(str(document.id))
download_url = self._file_storage.upload_content(
content=document.raw_markdown,
destination_path=destination_path,
)
# Update document with download URL
document = document.model_copy(update={"download_url": download_url})
logger.info(f"Uploaded to S3 and generated download URL")
return document return document
except Exception as e: except Exception as e:
logger.error(f"Failed to extract document: {str(e)}") logger.error(f"Failed to extract document: {str(e)}")
raise raise
@ -260,7 +220,7 @@ class DocumentProcessorService(ITextProcessor):
metadata = DocumentMetadata( metadata = DocumentMetadata(
source_id="text_input", source_id="text_input",
source_type=SourceType.WEB, # Using WEB type for text input source_type=SourceType.TEXT,
display_name=f"{title}.md", display_name=f"{title}.md",
size_bytes=len(text.encode('utf-8')), size_bytes=len(text.encode('utf-8')),
) )