From 0c09c79a2e9cedd2aa95670a86bff6876f6c2a61 Mon Sep 17 00:00:00 2001 From: "m.dabbagh" Date: Mon, 19 Jan 2026 22:03:36 +0330 Subject: [PATCH] refactor api routes --- src/adapters/incoming/api_routes.py | 518 ++++++++++------------------ 1 file changed, 191 insertions(+), 327 deletions(-) diff --git a/src/adapters/incoming/api_routes.py b/src/adapters/incoming/api_routes.py index 0dbd8d6..7d815c3 100644 --- a/src/adapters/incoming/api_routes.py +++ b/src/adapters/incoming/api_routes.py @@ -3,43 +3,48 @@ API Routes - Functional FastAPI routes for text processing. This is the incoming adapter that translates HTTP requests into domain operations. Routes pull the service directly from bootstrap. + +Refactored for "Skinny Routes" pattern with: +- Global exception handling +- Dependency injection for common parameters +- Context managers for resource management +- Minimal route logic """ +import contextlib import logging import shutil import tempfile from pathlib import Path -from typing import Optional -from uuid import UUID +from typing import Iterator, List, Optional -from fastapi import APIRouter, FastAPI, File, Form, HTTPException, UploadFile, status +from fastapi import APIRouter, Depends, FastAPI, File, Form, HTTPException, UploadFile, status +from fastapi.responses import JSONResponse from ...core.domain.exceptions import ( ChunkingError, - DocumentNotFoundError, DomainException, + DocumentNotFoundError, ExtractionError, ProcessingError, UnsupportedFileTypeError, ) -from ...core.domain.models import ChunkingMethod, ChunkingStrategy +from ...core.domain.models import Chunk, ChunkingMethod, ChunkingStrategy, Document from ...core.ports.incoming.text_processor import ITextProcessor from .api_schemas import ( - ChunkResponse, - DeleteDocumentResponse, - DocumentListResponse, - DocumentResponse, - ExtractAndChunkRequest, ChunkListResponse, + ChunkResponse, + DocumentResponse, HealthCheckResponse, - ProcessDocumentRequest, - ProcessDocumentResponse, ) logger = logging.getLogger(__name__) -# Create FastAPI application +# ============================================================================= +# Application Setup +# ============================================================================= + app = FastAPI( title="Text Processor API", description="Text extraction and chunking system using Hexagonal Architecture", @@ -48,37 +53,131 @@ app = FastAPI( redoc_url="/redoc", ) -# Create API router router = APIRouter(prefix="/api/v1", tags=["Text Processing"]) -def _get_service() -> ITextProcessor: - """ - Get the text processor service from bootstrap singleton. +# ============================================================================= +# Global Exception Handler +# ============================================================================= - This function pulls the service directly without using FastAPI's Depends. - - Returns: - ITextProcessor: Core service instance +@app.exception_handler(DomainException) +async def domain_exception_handler(request, exc: DomainException) -> JSONResponse: """ + Global exception handler for all domain exceptions. + + Maps domain exceptions to appropriate HTTP status codes. + """ + status_code_map = { + UnsupportedFileTypeError: status.HTTP_400_BAD_REQUEST, + ExtractionError: status.HTTP_422_UNPROCESSABLE_ENTITY, + ChunkingError: status.HTTP_422_UNPROCESSABLE_ENTITY, + ProcessingError: status.HTTP_500_INTERNAL_SERVER_ERROR, + DocumentNotFoundError: status.HTTP_404_NOT_FOUND, + } + + status_code = status_code_map.get(type(exc), status.HTTP_500_INTERNAL_SERVER_ERROR) + + logger.error(f"Domain exception: {type(exc).__name__}: {str(exc)}") + + return JSONResponse( + status_code=status_code, + content={"detail": str(exc)}, + ) + + +# ============================================================================= +# Helper Functions & Dependencies +# ============================================================================= + +def get_service() -> ITextProcessor: + """Dependency: Get the text processor service from bootstrap.""" from ...bootstrap import get_processor_service - return get_processor_service() -def _to_document_response(document) -> DocumentResponse: +def get_chunking_strategy( + strategy_name: ChunkingMethod = Form(..., description="Chunking method"), + chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000), + overlap_size: int = Form(0, description="Overlap between chunks", ge=0), + respect_boundaries: bool = Form(True, description="Respect text boundaries"), +) -> ChunkingStrategy: + """Dependency: Create chunking strategy from form parameters.""" + return ChunkingStrategy( + strategy_name=strategy_name, + chunk_size=chunk_size, + overlap_size=overlap_size, + respect_boundaries=respect_boundaries, + ) + + +@contextlib.contextmanager +def managed_temp_file(file: UploadFile) -> Iterator[Path]: """ - Convert domain document to API response. + Context manager for temporary file handling. + + Creates temporary directory, copies uploaded file, yields path, + and ensures cleanup on exit. Args: - document: Domain Document entity + file: Uploaded file from FastAPI - Returns: - DocumentResponse: API response model + Yields: + Path to temporary file with original filename """ + temp_dir = tempfile.mkdtemp() + filename = file.filename if file.filename else "uploaded_file.tmp" + temp_file_path = Path(temp_dir) / filename + + try: + logger.debug(f"Creating temporary file: {temp_file_path}") + with open(temp_file_path, 'wb') as f: + shutil.copyfileobj(file.file, f) + + yield temp_file_path + + finally: + # Cleanup temporary directory + try: + shutil.rmtree(temp_dir) + logger.debug(f"Cleaned up temporary directory: {temp_dir}") + except Exception as e: + logger.warning(f"Failed to delete temporary directory: {str(e)}") + + +def validate_markdown_source(file: Optional[UploadFile], text: Optional[str]) -> None: + """ + Validate that exactly one markdown source is provided. + + Args: + file: Optional uploaded file + text: Optional text input + + Raises: + HTTPException: If validation fails + """ + if not file and not text: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Either 'file' or 'text' must be provided", + ) + + if file and text: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Provide either 'file' or 'text', not both", + ) + + if file and file.filename and not file.filename.lower().endswith('.md'): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Unsupported file type. Only .md files are accepted", + ) + + +def to_document_response(document: Document) -> DocumentResponse: + """Convert domain document to API response.""" from .api_schemas import DocumentMetadataResponse - # Extract file type from display_name or source_id display_name = document.metadata.display_name file_type = Path(display_name).suffix.lstrip('.') if '.' in display_name else 'unknown' @@ -91,76 +190,33 @@ def _to_document_response(document) -> DocumentResponse: file_size_bytes=document.metadata.size_bytes, created_at=document.metadata.created_at.isoformat(), author=document.metadata.author, - page_count=None, # Not available in new metadata model + page_count=None, ), is_processed=document.is_processed, content_preview=document.get_content_preview(200), ) -def _to_chunk_response(chunk) -> ChunkResponse: - """ - Convert domain chunk to API response. - - Args: - chunk: Domain Chunk entity - - Returns: - ChunkResponse: API response model - """ - return ChunkResponse( - id=str(chunk.id), - document_id=str(chunk.document_id), - content=chunk.content, - sequence_number=chunk.sequence_number, - start_char=chunk.start_char, - end_char=chunk.end_char, - length=chunk.get_length(), - ) - - -def _map_domain_exception(exception: DomainException) -> HTTPException: - """ - Map domain exceptions to HTTP exceptions. - - Args: - exception: Domain exception - - Returns: - HTTPException: Corresponding HTTP exception - """ - if isinstance(exception, UnsupportedFileTypeError): - return HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=str(exception), - ) - elif isinstance(exception, ExtractionError): - return HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=str(exception), - ) - elif isinstance(exception, ChunkingError): - return HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=str(exception), - ) - elif isinstance(exception, ProcessingError): - return HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(exception), - ) - elif isinstance(exception, DocumentNotFoundError): - return HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=str(exception), - ) - else: - return HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(exception), +def to_chunk_responses(chunks: List[Chunk]) -> List[ChunkResponse]: + """Convert list of domain chunks to API responses.""" + return [ + ChunkResponse( + id=str(chunk.id), + document_id=str(chunk.document_id), + content=chunk.content, + sequence_number=chunk.sequence_number, + start_char=chunk.start_char, + end_char=chunk.end_char, + length=chunk.get_length(), ) + for chunk in chunks + ] +# ============================================================================= +# Skinny Routes +# ============================================================================= + @router.post( "/chunk", response_model=ChunkListResponse, @@ -171,19 +227,13 @@ def _map_domain_exception(exception: DomainException) -> HTTPException: async def perform_chunking( file: Optional[UploadFile] = File(None, description="Markdown file (.md) to upload"), text: Optional[str] = Form(None, description="Markdown text to process", json_schema_extra={"x-textarea": True}), - strategy_name: ChunkingMethod = Form(..., description="Chunking method"), - chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000), - overlap_size: int = Form(0, description="Overlap between chunks", ge=0), - respect_boundaries: bool = Form(True, description="Respect text boundaries"), title: str = Form("markdown_input", description="Optional title for the document"), + strategy: ChunkingStrategy = Depends(get_chunking_strategy), + service: ITextProcessor = Depends(get_service), ) -> ChunkListResponse: """ Unified Markdown processing endpoint supporting both file upload and text input. - This endpoint handles Markdown from either source: - 1. **File Upload**: Upload a .md file - 2. **Text Input**: Paste markdown text directly - Processing workflow: 1. Validates source (file or text, not both) 2. Extracts markdown content @@ -191,117 +241,33 @@ async def perform_chunking( 4. Persists document to repository 5. Chunks content according to strategy 6. Returns chunks with metadata - - Args: - file: Optional .md file upload - text: Optional markdown text input - strategy_name: Chunking method (fixed_size or paragraph) - chunk_size: Target chunk size - overlap_size: Overlap between chunks - respect_boundaries: Whether to respect boundaries - title: Optional title for the document - - Returns: - Response with chunks - - Raises: - HTTPException: If validation fails or processing fails """ - temp_file_path = None + # Validate source + validate_markdown_source(file, text) - try: - # Validation: Ensure exactly one source is provided - if not file and not text: + # Process file upload + if file: + logger.info(f"Processing uploaded markdown file: {file.filename}") + with managed_temp_file(file) as temp_path: + chunks = service.extract_and_chunk(temp_path, strategy) + + # Process text input + else: + if not text or not text.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Either 'file' or 'text' must be provided", + detail="Markdown content cannot be empty", ) - if file and text: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Provide either 'file' or 'text', not both", - ) + logger.info(f"Processing markdown text input: {len(text)} characters") + chunks = service.process_text_to_chunks(text, strategy, title) - # Get service from bootstrap - service: ITextProcessor = _get_service() + logger.info(f"Successfully processed markdown: {len(chunks)} chunks created") - # Create chunking strategy - strategy = ChunkingStrategy( - strategy_name=strategy_name, - chunk_size=chunk_size, - overlap_size=overlap_size, - respect_boundaries=respect_boundaries, - ) - - # File Logic: Delegate to extract_and_chunk via MarkdownExtractor - if file is not None: - # Validate file extension - if not file.filename or not file.filename.lower().endswith('.md'): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Unsupported file type. Only .md files are accepted", - ) - - # Create temporary directory and file with original filename - temp_dir = tempfile.mkdtemp() - temp_file_path = Path(temp_dir) / file.filename - - # Save uploaded file to temporary location - logger.info(f"Processing uploaded markdown file: {file.filename}") - with open(temp_file_path, 'wb') as temp_file: - shutil.copyfileobj(file.file, temp_file) - - # Delegate to extract_and_chunk (uses MarkdownExtractor) - chunks = service.extract_and_chunk(temp_file_path, strategy) - - # Text Logic: Process text directly - else: - logger.info("Processing markdown text input") - - # Validate content is not empty - if not text or not text.strip(): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Markdown content cannot be empty", - ) - - # Process text through service - chunks = service.process_text_to_chunks( - text=text, - chunking_strategy=strategy, - title=title, - ) - - # Convert to response - chunk_responses = [_to_chunk_response(c) for c in chunks] - - logger.info(f"Successfully processed markdown: {len(chunks)} chunks created") - - return ChunkListResponse( - chunks=chunk_responses, - total_chunks=len(chunk_responses), - ) - - except HTTPException: - raise - except DomainException as e: - raise _map_domain_exception(e) - except Exception as e: - logger.error(f"Unexpected error processing markdown: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Internal server error: {str(e)}", - ) - finally: - # Clean up temporary file and directory if file was uploaded - if temp_file_path and temp_file_path.exists(): - try: - temp_dir = temp_file_path.parent - shutil.rmtree(temp_dir) - logger.debug(f"Cleaned up temporary directory: {temp_dir}") - except Exception as e: - logger.warning(f"Failed to delete temporary directory: {str(e)}") + return ChunkListResponse( + chunks=to_chunk_responses(chunks), + total_chunks=len(chunks), + ) @router.post( @@ -312,68 +278,25 @@ async def perform_chunking( description="Upload a file and extract text content with metadata", ) async def extract_document( - file: UploadFile = File(..., description="Document file to extract (pdf, docx, txt, zip)"), + file: UploadFile = File(..., description="Document file to extract (pdf, docx, txt, md, zip)"), + service: ITextProcessor = Depends(get_service), ) -> DocumentResponse: """ Extract text content from uploaded file. This endpoint handles file extraction only: - 1. Accepts file upload (PDF, DOCX, TXT, ZIP) + 1. Accepts file upload (PDF, DOCX, TXT, MD, ZIP) 2. Extracts raw text content using appropriate extractor 3. Returns Document entity with metadata (no parsing) - - Args: - file: Uploaded file - - Returns: - Response with extracted document - - Raises: - HTTPException: If extraction fails """ - temp_file_path = None + logger.info(f"Extracting uploaded file: {file.filename}") - try: - # Pull service from bootstrap - service: ITextProcessor = _get_service() + with managed_temp_file(file) as temp_path: + document = service.extract_document(temp_path) - # Create temporary directory and file with original filename - temp_dir = tempfile.mkdtemp() - original_filename = file.filename if file.filename else "uploaded_file.tmp" - temp_file_path = Path(temp_dir) / original_filename + logger.info(f"Successfully extracted {len(document.raw_markdown)} characters from {file.filename}") - # Copy uploaded file to temporary location - logger.info(f"Extracting uploaded file: {file.filename}") - with open(temp_file_path, 'wb') as temp_file: - shutil.copyfileobj(file.file, temp_file) - - # Execute extraction only (no parsing) - document = service.extract_document(temp_file_path) - - # Convert to response - document_response = _to_document_response(document) - - logger.info(f"Successfully extracted {file.filename}: {len(document.raw_markdown)} characters") - - return document_response - - except DomainException as e: - raise _map_domain_exception(e) - except Exception as e: - logger.error(f"Unexpected error extracting file: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Internal server error: {str(e)}", - ) - finally: - # Clean up temporary file and directory - if temp_file_path and temp_file_path.exists(): - try: - temp_dir = temp_file_path.parent - shutil.rmtree(temp_dir) - logger.debug(f"Cleaned up temporary directory: {temp_dir}") - except Exception as e: - logger.warning(f"Failed to delete temporary directory: {str(e)}") + return to_document_response(document) @router.post( @@ -384,89 +307,31 @@ async def extract_document( description="Upload a file, extract text, parse markdown, and return chunks", ) async def process_file( - file: UploadFile = File(..., description="Document file to process (pdf, docx, txt, zip)"), - strategy_name: ChunkingMethod = Form(..., description="Chunking method"), - chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000), - overlap_size: int = Form(0, description="Overlap between chunks", ge=0), - respect_boundaries: bool = Form(True, description="Respect text boundaries"), + file: UploadFile = File(..., description="Document file to process (pdf, docx, txt, md, zip)"), + strategy: ChunkingStrategy = Depends(get_chunking_strategy), + service: ITextProcessor = Depends(get_service), ) -> ChunkListResponse: """ Complete file processing pipeline: Upload → Extract → Parse → Chunk. This endpoint handles the full document processing workflow: - 1. Accepts file upload (PDF, DOCX, TXT, ZIP) + 1. Accepts file upload (PDF, DOCX, TXT, MD, ZIP) 2. Extracts text content using appropriate extractor 3. Parses markdown structure into sections 4. Chunks content according to strategy 5. Returns chunks with metadata - - Args: - file: Uploaded file - strategy_name: Name of chunking strategy - chunk_size: Target chunk size - overlap_size: Overlap between chunks - respect_boundaries: Whether to respect boundaries - - Returns: - Response with chunks - - Raises: - HTTPException: If extraction or chunking fails """ - temp_file_path = None + logger.info(f"Processing uploaded file: {file.filename}") - try: - # Pull service from bootstrap - service: ITextProcessor = _get_service() + with managed_temp_file(file) as temp_path: + chunks = service.extract_and_chunk(temp_path, strategy) - # Create temporary directory and file with original filename - temp_dir = tempfile.mkdtemp() - original_filename = file.filename if file.filename else "uploaded_file.tmp" - temp_file_path = Path(temp_dir) / original_filename + logger.info(f"Successfully processed {file.filename}: {len(chunks)} chunks created") - # Copy uploaded file to temporary location - logger.info(f"Processing uploaded file: {file.filename}") - with open(temp_file_path, 'wb') as temp_file: - shutil.copyfileobj(file.file, temp_file) - - # Create chunking strategy - strategy = ChunkingStrategy( - strategy_name=strategy_name, - chunk_size=chunk_size, - overlap_size=overlap_size, - respect_boundaries=respect_boundaries, - ) - - # Execute complete pipeline: extract → parse → chunk - chunks = service.extract_and_chunk(temp_file_path, strategy) - - # Convert to response - chunk_responses = [_to_chunk_response(c) for c in chunks] - - logger.info(f"Successfully processed {file.filename}: {len(chunks)} chunks created") - - return ChunkListResponse( - chunks=chunk_responses, - total_chunks=len(chunk_responses), - ) - - except DomainException as e: - raise _map_domain_exception(e) - except Exception as e: - logger.error(f"Unexpected error processing file: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Internal server error: {str(e)}", - ) - finally: - # Clean up temporary file and directory - if temp_file_path and temp_file_path.exists(): - try: - temp_dir = temp_file_path.parent - shutil.rmtree(temp_dir) - logger.debug(f"Cleaned up temporary directory: {temp_dir}") - except Exception as e: - logger.warning(f"Failed to delete temporary directory: {str(e)}") + return ChunkListResponse( + chunks=to_chunk_responses(chunks), + total_chunks=len(chunks), + ) @router.get( @@ -477,12 +342,7 @@ async def process_file( description="Check API health and configuration", ) async def health_check() -> HealthCheckResponse: - """ - Health check endpoint. - - Returns: - Health status and configuration - """ + """Health check endpoint.""" return HealthCheckResponse( status="healthy", version="1.0.0", @@ -491,6 +351,10 @@ async def health_check() -> HealthCheckResponse: ) +# ============================================================================= +# Application Setup +# ============================================================================= + # Include router in app app.include_router(router)