refactor api routes

This commit is contained in:
m.dabbagh 2026-01-19 22:03:36 +03:30
parent 6086ddf818
commit 0c09c79a2e

View File

@ -3,43 +3,48 @@ API Routes - Functional FastAPI routes for text processing.
This is the incoming adapter that translates HTTP requests into
domain operations. Routes pull the service directly from bootstrap.
Refactored for "Skinny Routes" pattern with:
- Global exception handling
- Dependency injection for common parameters
- Context managers for resource management
- Minimal route logic
"""
import contextlib
import logging
import shutil
import tempfile
from pathlib import Path
from typing import Optional
from uuid import UUID
from typing import Iterator, List, Optional
from fastapi import APIRouter, FastAPI, File, Form, HTTPException, UploadFile, status
from fastapi import APIRouter, Depends, FastAPI, File, Form, HTTPException, UploadFile, status
from fastapi.responses import JSONResponse
from ...core.domain.exceptions import (
ChunkingError,
DocumentNotFoundError,
DomainException,
DocumentNotFoundError,
ExtractionError,
ProcessingError,
UnsupportedFileTypeError,
)
from ...core.domain.models import ChunkingMethod, ChunkingStrategy
from ...core.domain.models import Chunk, ChunkingMethod, ChunkingStrategy, Document
from ...core.ports.incoming.text_processor import ITextProcessor
from .api_schemas import (
ChunkResponse,
DeleteDocumentResponse,
DocumentListResponse,
DocumentResponse,
ExtractAndChunkRequest,
ChunkListResponse,
ChunkResponse,
DocumentResponse,
HealthCheckResponse,
ProcessDocumentRequest,
ProcessDocumentResponse,
)
logger = logging.getLogger(__name__)
# Create FastAPI application
# =============================================================================
# Application Setup
# =============================================================================
app = FastAPI(
title="Text Processor API",
description="Text extraction and chunking system using Hexagonal Architecture",
@ -48,169 +53,108 @@ app = FastAPI(
redoc_url="/redoc",
)
# Create API router
router = APIRouter(prefix="/api/v1", tags=["Text Processing"])
def _get_service() -> ITextProcessor:
"""
Get the text processor service from bootstrap singleton.
# =============================================================================
# Global Exception Handler
# =============================================================================
This function pulls the service directly without using FastAPI's Depends.
Returns:
ITextProcessor: Core service instance
@app.exception_handler(DomainException)
async def domain_exception_handler(request, exc: DomainException) -> JSONResponse:
"""
Global exception handler for all domain exceptions.
Maps domain exceptions to appropriate HTTP status codes.
"""
status_code_map = {
UnsupportedFileTypeError: status.HTTP_400_BAD_REQUEST,
ExtractionError: status.HTTP_422_UNPROCESSABLE_ENTITY,
ChunkingError: status.HTTP_422_UNPROCESSABLE_ENTITY,
ProcessingError: status.HTTP_500_INTERNAL_SERVER_ERROR,
DocumentNotFoundError: status.HTTP_404_NOT_FOUND,
}
status_code = status_code_map.get(type(exc), status.HTTP_500_INTERNAL_SERVER_ERROR)
logger.error(f"Domain exception: {type(exc).__name__}: {str(exc)}")
return JSONResponse(
status_code=status_code,
content={"detail": str(exc)},
)
# =============================================================================
# Helper Functions & Dependencies
# =============================================================================
def get_service() -> ITextProcessor:
"""Dependency: Get the text processor service from bootstrap."""
from ...bootstrap import get_processor_service
return get_processor_service()
def _to_document_response(document) -> DocumentResponse:
"""
Convert domain document to API response.
Args:
document: Domain Document entity
Returns:
DocumentResponse: API response model
"""
from .api_schemas import DocumentMetadataResponse
# Extract file type from display_name or source_id
display_name = document.metadata.display_name
file_type = Path(display_name).suffix.lstrip('.') if '.' in display_name else 'unknown'
return DocumentResponse(
id=str(document.id),
content=document.content,
metadata=DocumentMetadataResponse(
file_name=document.metadata.display_name,
file_type=file_type,
file_size_bytes=document.metadata.size_bytes,
created_at=document.metadata.created_at.isoformat(),
author=document.metadata.author,
page_count=None, # Not available in new metadata model
),
is_processed=document.is_processed,
content_preview=document.get_content_preview(200),
)
def _to_chunk_response(chunk) -> ChunkResponse:
"""
Convert domain chunk to API response.
Args:
chunk: Domain Chunk entity
Returns:
ChunkResponse: API response model
"""
return ChunkResponse(
id=str(chunk.id),
document_id=str(chunk.document_id),
content=chunk.content,
sequence_number=chunk.sequence_number,
start_char=chunk.start_char,
end_char=chunk.end_char,
length=chunk.get_length(),
)
def _map_domain_exception(exception: DomainException) -> HTTPException:
"""
Map domain exceptions to HTTP exceptions.
Args:
exception: Domain exception
Returns:
HTTPException: Corresponding HTTP exception
"""
if isinstance(exception, UnsupportedFileTypeError):
return HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(exception),
)
elif isinstance(exception, ExtractionError):
return HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(exception),
)
elif isinstance(exception, ChunkingError):
return HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(exception),
)
elif isinstance(exception, ProcessingError):
return HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exception),
)
elif isinstance(exception, DocumentNotFoundError):
return HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exception),
)
else:
return HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(exception),
)
@router.post(
"/chunk",
response_model=ChunkListResponse,
status_code=status.HTTP_200_OK,
summary="Process Markdown from file upload or text input",
description="Unified endpoint: upload .md file or paste markdown text, then parse and chunk",
)
async def perform_chunking(
file: Optional[UploadFile] = File(None, description="Markdown file (.md) to upload"),
text: Optional[str] = Form(None, description="Markdown text to process", json_schema_extra={"x-textarea": True}),
def get_chunking_strategy(
strategy_name: ChunkingMethod = Form(..., description="Chunking method"),
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000),
overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
title: str = Form("markdown_input", description="Optional title for the document"),
) -> ChunkListResponse:
) -> ChunkingStrategy:
"""Dependency: Create chunking strategy from form parameters."""
return ChunkingStrategy(
strategy_name=strategy_name,
chunk_size=chunk_size,
overlap_size=overlap_size,
respect_boundaries=respect_boundaries,
)
@contextlib.contextmanager
def managed_temp_file(file: UploadFile) -> Iterator[Path]:
"""
Unified Markdown processing endpoint supporting both file upload and text input.
Context manager for temporary file handling.
This endpoint handles Markdown from either source:
1. **File Upload**: Upload a .md file
2. **Text Input**: Paste markdown text directly
Processing workflow:
1. Validates source (file or text, not both)
2. Extracts markdown content
3. Parses markdown structure into sections
4. Persists document to repository
5. Chunks content according to strategy
6. Returns chunks with metadata
Creates temporary directory, copies uploaded file, yields path,
and ensures cleanup on exit.
Args:
file: Optional .md file upload
text: Optional markdown text input
strategy_name: Chunking method (fixed_size or paragraph)
chunk_size: Target chunk size
overlap_size: Overlap between chunks
respect_boundaries: Whether to respect boundaries
title: Optional title for the document
file: Uploaded file from FastAPI
Returns:
Response with chunks
Raises:
HTTPException: If validation fails or processing fails
Yields:
Path to temporary file with original filename
"""
temp_file_path = None
temp_dir = tempfile.mkdtemp()
filename = file.filename if file.filename else "uploaded_file.tmp"
temp_file_path = Path(temp_dir) / filename
try:
# Validation: Ensure exactly one source is provided
logger.debug(f"Creating temporary file: {temp_file_path}")
with open(temp_file_path, 'wb') as f:
shutil.copyfileobj(file.file, f)
yield temp_file_path
finally:
# Cleanup temporary directory
try:
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
def validate_markdown_source(file: Optional[UploadFile], text: Optional[str]) -> None:
"""
Validate that exactly one markdown source is provided.
Args:
file: Optional uploaded file
text: Optional text input
Raises:
HTTPException: If validation fails
"""
if not file and not text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
@ -223,86 +167,108 @@ async def perform_chunking(
detail="Provide either 'file' or 'text', not both",
)
# Get service from bootstrap
service: ITextProcessor = _get_service()
# Create chunking strategy
strategy = ChunkingStrategy(
strategy_name=strategy_name,
chunk_size=chunk_size,
overlap_size=overlap_size,
respect_boundaries=respect_boundaries,
)
# File Logic: Delegate to extract_and_chunk via MarkdownExtractor
if file is not None:
# Validate file extension
if not file.filename or not file.filename.lower().endswith('.md'):
if file and file.filename and not file.filename.lower().endswith('.md'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported file type. Only .md files are accepted",
)
# Create temporary directory and file with original filename
temp_dir = tempfile.mkdtemp()
temp_file_path = Path(temp_dir) / file.filename
# Save uploaded file to temporary location
def to_document_response(document: Document) -> DocumentResponse:
"""Convert domain document to API response."""
from .api_schemas import DocumentMetadataResponse
display_name = document.metadata.display_name
file_type = Path(display_name).suffix.lstrip('.') if '.' in display_name else 'unknown'
return DocumentResponse(
id=str(document.id),
content=document.content,
metadata=DocumentMetadataResponse(
file_name=document.metadata.display_name,
file_type=file_type,
file_size_bytes=document.metadata.size_bytes,
created_at=document.metadata.created_at.isoformat(),
author=document.metadata.author,
page_count=None,
),
is_processed=document.is_processed,
content_preview=document.get_content_preview(200),
)
def to_chunk_responses(chunks: List[Chunk]) -> List[ChunkResponse]:
"""Convert list of domain chunks to API responses."""
return [
ChunkResponse(
id=str(chunk.id),
document_id=str(chunk.document_id),
content=chunk.content,
sequence_number=chunk.sequence_number,
start_char=chunk.start_char,
end_char=chunk.end_char,
length=chunk.get_length(),
)
for chunk in chunks
]
# =============================================================================
# Skinny Routes
# =============================================================================
@router.post(
"/chunk",
response_model=ChunkListResponse,
status_code=status.HTTP_200_OK,
summary="Process Markdown from file upload or text input",
description="Unified endpoint: upload .md file or paste markdown text, then parse and chunk",
)
async def perform_chunking(
file: Optional[UploadFile] = File(None, description="Markdown file (.md) to upload"),
text: Optional[str] = Form(None, description="Markdown text to process", json_schema_extra={"x-textarea": True}),
title: str = Form("markdown_input", description="Optional title for the document"),
strategy: ChunkingStrategy = Depends(get_chunking_strategy),
service: ITextProcessor = Depends(get_service),
) -> ChunkListResponse:
"""
Unified Markdown processing endpoint supporting both file upload and text input.
Processing workflow:
1. Validates source (file or text, not both)
2. Extracts markdown content
3. Parses markdown structure into sections
4. Persists document to repository
5. Chunks content according to strategy
6. Returns chunks with metadata
"""
# Validate source
validate_markdown_source(file, text)
# Process file upload
if file:
logger.info(f"Processing uploaded markdown file: {file.filename}")
with open(temp_file_path, 'wb') as temp_file:
shutil.copyfileobj(file.file, temp_file)
with managed_temp_file(file) as temp_path:
chunks = service.extract_and_chunk(temp_path, strategy)
# Delegate to extract_and_chunk (uses MarkdownExtractor)
chunks = service.extract_and_chunk(temp_file_path, strategy)
# Text Logic: Process text directly
# Process text input
else:
logger.info("Processing markdown text input")
# Validate content is not empty
if not text or not text.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Markdown content cannot be empty",
)
# Process text through service
chunks = service.process_text_to_chunks(
text=text,
chunking_strategy=strategy,
title=title,
)
# Convert to response
chunk_responses = [_to_chunk_response(c) for c in chunks]
logger.info(f"Processing markdown text input: {len(text)} characters")
chunks = service.process_text_to_chunks(text, strategy, title)
logger.info(f"Successfully processed markdown: {len(chunks)} chunks created")
return ChunkListResponse(
chunks=chunk_responses,
total_chunks=len(chunk_responses),
chunks=to_chunk_responses(chunks),
total_chunks=len(chunks),
)
except HTTPException:
raise
except DomainException as e:
raise _map_domain_exception(e)
except Exception as e:
logger.error(f"Unexpected error processing markdown: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}",
)
finally:
# Clean up temporary file and directory if file was uploaded
if temp_file_path and temp_file_path.exists():
try:
temp_dir = temp_file_path.parent
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
@router.post(
"/extract",
@ -312,68 +278,25 @@ async def perform_chunking(
description="Upload a file and extract text content with metadata",
)
async def extract_document(
file: UploadFile = File(..., description="Document file to extract (pdf, docx, txt, zip)"),
file: UploadFile = File(..., description="Document file to extract (pdf, docx, txt, md, zip)"),
service: ITextProcessor = Depends(get_service),
) -> DocumentResponse:
"""
Extract text content from uploaded file.
This endpoint handles file extraction only:
1. Accepts file upload (PDF, DOCX, TXT, ZIP)
1. Accepts file upload (PDF, DOCX, TXT, MD, ZIP)
2. Extracts raw text content using appropriate extractor
3. Returns Document entity with metadata (no parsing)
Args:
file: Uploaded file
Returns:
Response with extracted document
Raises:
HTTPException: If extraction fails
"""
temp_file_path = None
try:
# Pull service from bootstrap
service: ITextProcessor = _get_service()
# Create temporary directory and file with original filename
temp_dir = tempfile.mkdtemp()
original_filename = file.filename if file.filename else "uploaded_file.tmp"
temp_file_path = Path(temp_dir) / original_filename
# Copy uploaded file to temporary location
logger.info(f"Extracting uploaded file: {file.filename}")
with open(temp_file_path, 'wb') as temp_file:
shutil.copyfileobj(file.file, temp_file)
# Execute extraction only (no parsing)
document = service.extract_document(temp_file_path)
with managed_temp_file(file) as temp_path:
document = service.extract_document(temp_path)
# Convert to response
document_response = _to_document_response(document)
logger.info(f"Successfully extracted {len(document.raw_markdown)} characters from {file.filename}")
logger.info(f"Successfully extracted {file.filename}: {len(document.raw_markdown)} characters")
return document_response
except DomainException as e:
raise _map_domain_exception(e)
except Exception as e:
logger.error(f"Unexpected error extracting file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}",
)
finally:
# Clean up temporary file and directory
if temp_file_path and temp_file_path.exists():
try:
temp_dir = temp_file_path.parent
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
return to_document_response(document)
@router.post(
@ -384,90 +307,32 @@ async def extract_document(
description="Upload a file, extract text, parse markdown, and return chunks",
)
async def process_file(
file: UploadFile = File(..., description="Document file to process (pdf, docx, txt, zip)"),
strategy_name: ChunkingMethod = Form(..., description="Chunking method"),
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000),
overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
file: UploadFile = File(..., description="Document file to process (pdf, docx, txt, md, zip)"),
strategy: ChunkingStrategy = Depends(get_chunking_strategy),
service: ITextProcessor = Depends(get_service),
) -> ChunkListResponse:
"""
Complete file processing pipeline: Upload Extract Parse Chunk.
This endpoint handles the full document processing workflow:
1. Accepts file upload (PDF, DOCX, TXT, ZIP)
1. Accepts file upload (PDF, DOCX, TXT, MD, ZIP)
2. Extracts text content using appropriate extractor
3. Parses markdown structure into sections
4. Chunks content according to strategy
5. Returns chunks with metadata
Args:
file: Uploaded file
strategy_name: Name of chunking strategy
chunk_size: Target chunk size
overlap_size: Overlap between chunks
respect_boundaries: Whether to respect boundaries
Returns:
Response with chunks
Raises:
HTTPException: If extraction or chunking fails
"""
temp_file_path = None
try:
# Pull service from bootstrap
service: ITextProcessor = _get_service()
# Create temporary directory and file with original filename
temp_dir = tempfile.mkdtemp()
original_filename = file.filename if file.filename else "uploaded_file.tmp"
temp_file_path = Path(temp_dir) / original_filename
# Copy uploaded file to temporary location
logger.info(f"Processing uploaded file: {file.filename}")
with open(temp_file_path, 'wb') as temp_file:
shutil.copyfileobj(file.file, temp_file)
# Create chunking strategy
strategy = ChunkingStrategy(
strategy_name=strategy_name,
chunk_size=chunk_size,
overlap_size=overlap_size,
respect_boundaries=respect_boundaries,
)
# Execute complete pipeline: extract → parse → chunk
chunks = service.extract_and_chunk(temp_file_path, strategy)
# Convert to response
chunk_responses = [_to_chunk_response(c) for c in chunks]
with managed_temp_file(file) as temp_path:
chunks = service.extract_and_chunk(temp_path, strategy)
logger.info(f"Successfully processed {file.filename}: {len(chunks)} chunks created")
return ChunkListResponse(
chunks=chunk_responses,
total_chunks=len(chunk_responses),
chunks=to_chunk_responses(chunks),
total_chunks=len(chunks),
)
except DomainException as e:
raise _map_domain_exception(e)
except Exception as e:
logger.error(f"Unexpected error processing file: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}",
)
finally:
# Clean up temporary file and directory
if temp_file_path and temp_file_path.exists():
try:
temp_dir = temp_file_path.parent
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
@router.get(
"/health",
@ -477,12 +342,7 @@ async def process_file(
description="Check API health and configuration",
)
async def health_check() -> HealthCheckResponse:
"""
Health check endpoint.
Returns:
Health status and configuration
"""
"""Health check endpoint."""
return HealthCheckResponse(
status="healthy",
version="1.0.0",
@ -491,6 +351,10 @@ async def health_check() -> HealthCheckResponse:
)
# =============================================================================
# Application Setup
# =============================================================================
# Include router in app
app.include_router(router)