2026-01-28 22:10:24 +03:30

389 lines
13 KiB
Python

"""
API Routes - Functional FastAPI routes for text processing.
This is the incoming adapter that translates HTTP requests into
domain operations. Routes pull the service directly from bootstrap.
Refactored for "Skinny Routes" pattern with:
- Global exception handling
- Dependency injection for common parameters
- Context managers for resource management
- Minimal route logic
"""
import contextlib
import logging
import shutil
import tempfile
from pathlib import Path
from typing import Iterator, List, Optional
from fastapi import APIRouter, Depends, FastAPI, File, Form, HTTPException, UploadFile, status
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasicCredentials
from .auth import check_docs_credentials, validate_api_key
from ...core.config import get_settings
from ...core.domain.exceptions import (
ChunkingError,
DomainException,
DocumentNotFoundError,
ExtractionError,
ProcessingError,
UnsupportedFileTypeError,
)
from ...core.domain.models import Chunk, ChunkingMethod, ChunkingStrategy, Document
from ...core.ports.incoming.text_processor import ITextProcessor
from .api_schemas import (
ChunkListResponse,
ChunkResponse,
DocumentResponse,
HealthCheckResponse,
)
logger = logging.getLogger(__name__)
# Load settings
settings = get_settings()
app = FastAPI(
title="Text Processor API",
description="Text extraction and chunking system using Hexagonal Architecture",
version="1.0.0",
# docs_url=None,
# redoc_url=None,
)
router = APIRouter(
prefix="/api/v1",
tags=["Text Processing"],
dependencies=[Depends(validate_api_key)]
)
public_router = APIRouter(
tags=["System"],
)
# =============================================================================
# Global Exception Handler
# =============================================================================
@app.exception_handler(DomainException)
async def domain_exception_handler(request, exc: DomainException) -> JSONResponse:
"""
Global exception handler for all domain exceptions.
Maps domain exceptions to appropriate HTTP status codes.
"""
status_code_map = {
UnsupportedFileTypeError: status.HTTP_400_BAD_REQUEST,
ExtractionError: status.HTTP_422_UNPROCESSABLE_ENTITY,
ChunkingError: status.HTTP_422_UNPROCESSABLE_ENTITY,
ProcessingError: status.HTTP_500_INTERNAL_SERVER_ERROR,
DocumentNotFoundError: status.HTTP_404_NOT_FOUND,
}
status_code = status_code_map.get(type(exc), status.HTTP_500_INTERNAL_SERVER_ERROR)
logger.error(f"Domain exception: {type(exc).__name__}: {str(exc)}")
return JSONResponse(
status_code=status_code,
content={"detail": str(exc)},
)
# =============================================================================
# Helper Functions & Dependencies
# =============================================================================
def get_service() -> ITextProcessor:
"""Dependency: Get the text processor service from bootstrap."""
from ...bootstrap import get_processor_service
return get_processor_service()
def get_chunking_strategy(
strategy_name: ChunkingMethod = Form(..., description="Chunking method"),
chunk_size: int = Form(512, description="Target chunk size in characters", ge=1, le=10000),
overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
) -> ChunkingStrategy:
"""Dependency: Create chunking strategy from form parameters."""
return ChunkingStrategy(
strategy_name=strategy_name,
chunk_size=chunk_size,
overlap_size=overlap_size,
respect_boundaries=respect_boundaries,
)
@contextlib.contextmanager
def managed_temp_file(file: UploadFile) -> Iterator[Path]:
"""
Context manager for temporary file handling.
Creates temporary directory, copies uploaded file, yields path,
and ensures cleanup on exit.
Args:
file: Uploaded file from FastAPI
Yields:
Path to temporary file with original filename
"""
temp_dir = tempfile.mkdtemp()
filename = file.filename if file.filename else "uploaded_file.tmp"
temp_file_path = Path(temp_dir) / filename
try:
logger.debug(f"Creating temporary file: {temp_file_path}")
with open(temp_file_path, 'wb') as f:
shutil.copyfileobj(file.file, f)
yield temp_file_path
finally:
# Cleanup temporary directory
try:
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
def validate_markdown_source(file: Optional[UploadFile], text: Optional[str]) -> None:
"""
Validate that exactly one markdown source is provided.
Args:
file: Optional uploaded file
text: Optional text input
Raises:
HTTPException: If validation fails
"""
if not file and not text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Either 'file' or 'text' must be provided",
)
if file and text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Provide either 'file' or 'text', not both",
)
if file and file.filename and not file.filename.lower().endswith('.md'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported file type. Only .md files are accepted",
)
def to_document_response(document: Document) -> DocumentResponse:
"""Convert domain document to API response."""
from .api_schemas import DocumentMetadataResponse
display_name = document.metadata.display_name
file_type = Path(display_name).suffix.lstrip('.') if '.' in display_name else 'unknown'
return DocumentResponse(
id=str(document.id),
content=document.content,
metadata=DocumentMetadataResponse(
file_name=document.metadata.display_name,
file_type=file_type,
file_size_bytes=document.metadata.size_bytes,
created_at=document.metadata.created_at.isoformat(),
author=document.metadata.author,
page_count=None,
),
is_processed=document.is_processed,
content_preview=document.get_content_preview(200),
download_url=document.download_url,
)
def to_chunk_responses(chunks: List[Chunk]) -> List[ChunkResponse]:
"""Convert list of domain chunks to API responses."""
return [
ChunkResponse(
id=str(chunk.id),
document_id=str(chunk.document_id),
content=chunk.content,
sequence_number=chunk.sequence_number,
length=chunk.get_length(),
)
for chunk in chunks
]
# =============================================================================
# Skinny Routes
# =============================================================================
@router.post(
"/chunk",
response_model=ChunkListResponse,
status_code=status.HTTP_200_OK,
summary="Process Markdown from file upload or text input",
description="Unified endpoint: upload .md file or paste markdown text, then parse and chunk",
)
async def perform_chunking(
file: Optional[UploadFile] = File(None, description="Markdown file (.md) to upload"),
text: Optional[str] = Form('', description="Markdown text to process"),
title: Optional[str] = Form('', description="Optional title for the document"),
strategy: ChunkingStrategy = Depends(get_chunking_strategy),
service: ITextProcessor = Depends(get_service),
) -> ChunkListResponse:
"""
Unified Markdown processing endpoint supporting both file upload and text input.
Processing workflow:
1. Validates source (file or text, not both)
2. Extracts markdown content
3. Parses markdown structure into sections
4. Persists document to repository
5. Chunks content according to strategy
6. Returns chunks with metadata
"""
# Validate source
validate_markdown_source(file, text)
# Process file upload
if file:
logger.info(f"Processing uploaded markdown file: {file.filename}")
with managed_temp_file(file) as temp_path:
chunks = service.extract_and_chunk(temp_path, strategy)
# Process text input
else:
if not text or not text.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Markdown content cannot be empty",
)
logger.info(f"Processing markdown text input: {len(text)} characters")
chunks = service.process_text_to_chunks(text, strategy, title)
logger.info(f"Successfully processed markdown: {len(chunks)} chunks created")
return ChunkListResponse(
chunks=to_chunk_responses(chunks),
total_chunks=len(chunks),
)
@router.post(
"/extract",
response_model=DocumentResponse,
status_code=status.HTTP_200_OK,
summary="Extract document from uploaded file",
description="Upload a file and extract text content with metadata",
)
async def extract_document(
file: UploadFile = File(..., description="Document file to extract (pdf, docx, txt, md, zip)"),
service: ITextProcessor = Depends(get_service),
) -> DocumentResponse:
"""
Extract text content from uploaded file.
This endpoint handles file extraction only:
1. Accepts file upload (PDF, DOCX, TXT, MD, ZIP)
2. Extracts raw text content using appropriate extractor
3. Returns Document entity with metadata (no parsing)
"""
logger.info(f"Extracting uploaded file: {file.filename}")
with managed_temp_file(file) as temp_path:
document = service.extract_document(temp_path)
logger.info(f"Successfully extracted {len(document.raw_markdown)} characters from {file.filename}")
return to_document_response(document)
@router.post(
"/process-file",
response_model=ChunkListResponse,
status_code=status.HTTP_200_OK,
summary="Process uploaded file (extraction to chunking)",
description="Upload a file, extract text, parse markdown, and return chunks",
)
async def process_file(
file: UploadFile = File(..., description="Document file to process (pdf, docx, txt, md, zip)"),
strategy: ChunkingStrategy = Depends(get_chunking_strategy),
service: ITextProcessor = Depends(get_service),
) -> ChunkListResponse:
"""
Complete file processing pipeline: Upload → Extract → Parse → Chunk.
This endpoint handles the full document processing workflow:
1. Accepts file upload (PDF, DOCX, TXT, MD, ZIP)
2. Extracts text content using appropriate extractor
3. Parses markdown structure into sections
4. Chunks content according to strategy
5. Returns chunks with metadata
"""
logger.info(f"Processing uploaded file: {file.filename}")
with managed_temp_file(file) as temp_path:
chunks = service.extract_and_chunk(temp_path, strategy)
logger.info(f"Successfully processed {file.filename}: {len(chunks)} chunks created")
return ChunkListResponse(
chunks=to_chunk_responses(chunks),
total_chunks=len(chunks),
)
@public_router.get(
"/health",
response_model=HealthCheckResponse,
status_code=status.HTTP_200_OK,
summary="Health check",
description="Check API health and configuration",
)
async def health_check() -> HealthCheckResponse:
"""Health check endpoint."""
return HealthCheckResponse(
status="healthy",
version="1.0.0",
supported_file_types=["pdf", "docx", "txt", "md", "markdown", "zip"],
available_strategies=["fixed_size", "paragraph"],
)
# =============================================================================
# Protected Documentation Routes
# =============================================================================
# @app.get("/docs", include_in_schema=False)
# def api_docs(_: HTTPBasicCredentials = Depends(check_docs_credentials)):
# return get_swagger_ui_html(
# openapi_url="/openapi.json",
# title="Protected Text-Processor API Docs"
# )
#
#
# @app.get("/redoc", include_in_schema=False)
# def api_docs(_: HTTPBasicCredentials = Depends(check_docs_credentials)):
# return get_redoc_html(
# openapi_url="/openapi.json",
# title="Protected Text-Processor API Docs"
# )
# =============================================================================
# Application Setup
# =============================================================================
# Include routers in app
app.include_router(router)
app.include_router(public_router)