add /chunk route

This commit is contained in:
m.dabbagh 2026-01-19 21:54:23 +03:30
parent 2c4a59f84b
commit 6086ddf818
6 changed files with 155 additions and 96 deletions

View File

@ -8,6 +8,7 @@ import logging
import shutil import shutil
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Optional
from uuid import UUID from uuid import UUID
from fastapi import APIRouter, FastAPI, File, Form, HTTPException, UploadFile, status from fastapi import APIRouter, FastAPI, File, Form, HTTPException, UploadFile, status
@ -28,7 +29,7 @@ from .api_schemas import (
DocumentListResponse, DocumentListResponse,
DocumentResponse, DocumentResponse,
ExtractAndChunkRequest, ExtractAndChunkRequest,
ExtractAndChunkResponse, ChunkListResponse,
HealthCheckResponse, HealthCheckResponse,
ProcessDocumentRequest, ProcessDocumentRequest,
ProcessDocumentResponse, ProcessDocumentResponse,
@ -160,6 +161,149 @@ def _map_domain_exception(exception: DomainException) -> HTTPException:
) )
@router.post(
"/chunk",
response_model=ChunkListResponse,
status_code=status.HTTP_200_OK,
summary="Process Markdown from file upload or text input",
description="Unified endpoint: upload .md file or paste markdown text, then parse and chunk",
)
async def perform_chunking(
file: Optional[UploadFile] = File(None, description="Markdown file (.md) to upload"),
text: Optional[str] = Form(None, description="Markdown text to process", json_schema_extra={"x-textarea": True}),
strategy_name: ChunkingMethod = Form(..., description="Chunking method"),
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000),
overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
title: str = Form("markdown_input", description="Optional title for the document"),
) -> ChunkListResponse:
"""
Unified Markdown processing endpoint supporting both file upload and text input.
This endpoint handles Markdown from either source:
1. **File Upload**: Upload a .md file
2. **Text Input**: Paste markdown text directly
Processing workflow:
1. Validates source (file or text, not both)
2. Extracts markdown content
3. Parses markdown structure into sections
4. Persists document to repository
5. Chunks content according to strategy
6. Returns chunks with metadata
Args:
file: Optional .md file upload
text: Optional markdown text input
strategy_name: Chunking method (fixed_size or paragraph)
chunk_size: Target chunk size
overlap_size: Overlap between chunks
respect_boundaries: Whether to respect boundaries
title: Optional title for the document
Returns:
Response with chunks
Raises:
HTTPException: If validation fails or processing fails
"""
temp_file_path = None
try:
# Validation: Ensure exactly one source is provided
if not file and not text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Either 'file' or 'text' must be provided",
)
if file and text:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Provide either 'file' or 'text', not both",
)
# Get service from bootstrap
service: ITextProcessor = _get_service()
# Create chunking strategy
strategy = ChunkingStrategy(
strategy_name=strategy_name,
chunk_size=chunk_size,
overlap_size=overlap_size,
respect_boundaries=respect_boundaries,
)
# File Logic: Delegate to extract_and_chunk via MarkdownExtractor
if file is not None:
# Validate file extension
if not file.filename or not file.filename.lower().endswith('.md'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported file type. Only .md files are accepted",
)
# Create temporary directory and file with original filename
temp_dir = tempfile.mkdtemp()
temp_file_path = Path(temp_dir) / file.filename
# Save uploaded file to temporary location
logger.info(f"Processing uploaded markdown file: {file.filename}")
with open(temp_file_path, 'wb') as temp_file:
shutil.copyfileobj(file.file, temp_file)
# Delegate to extract_and_chunk (uses MarkdownExtractor)
chunks = service.extract_and_chunk(temp_file_path, strategy)
# Text Logic: Process text directly
else:
logger.info("Processing markdown text input")
# Validate content is not empty
if not text or not text.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Markdown content cannot be empty",
)
# Process text through service
chunks = service.process_text_to_chunks(
text=text,
chunking_strategy=strategy,
title=title,
)
# Convert to response
chunk_responses = [_to_chunk_response(c) for c in chunks]
logger.info(f"Successfully processed markdown: {len(chunks)} chunks created")
return ChunkListResponse(
chunks=chunk_responses,
total_chunks=len(chunk_responses),
)
except HTTPException:
raise
except DomainException as e:
raise _map_domain_exception(e)
except Exception as e:
logger.error(f"Unexpected error processing markdown: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error: {str(e)}",
)
finally:
# Clean up temporary file and directory if file was uploaded
if temp_file_path and temp_file_path.exists():
try:
temp_dir = temp_file_path.parent
shutil.rmtree(temp_dir)
logger.debug(f"Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"Failed to delete temporary directory: {str(e)}")
@router.post( @router.post(
"/extract", "/extract",
response_model=DocumentResponse, response_model=DocumentResponse,
@ -234,7 +378,7 @@ async def extract_document(
@router.post( @router.post(
"/process-file", "/process-file",
response_model=ExtractAndChunkResponse, response_model=ChunkListResponse,
status_code=status.HTTP_200_OK, status_code=status.HTTP_200_OK,
summary="Process uploaded file (extraction to chunking)", summary="Process uploaded file (extraction to chunking)",
description="Upload a file, extract text, parse markdown, and return chunks", description="Upload a file, extract text, parse markdown, and return chunks",
@ -245,7 +389,7 @@ async def process_file(
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000), chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000),
overlap_size: int = Form(0, description="Overlap between chunks", ge=0), overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
respect_boundaries: bool = Form(True, description="Respect text boundaries"), respect_boundaries: bool = Form(True, description="Respect text boundaries"),
) -> ExtractAndChunkResponse: ) -> ChunkListResponse:
""" """
Complete file processing pipeline: Upload Extract Parse Chunk. Complete file processing pipeline: Upload Extract Parse Chunk.
@ -301,7 +445,7 @@ async def process_file(
logger.info(f"Successfully processed {file.filename}: {len(chunks)} chunks created") logger.info(f"Successfully processed {file.filename}: {len(chunks)} chunks created")
return ExtractAndChunkResponse( return ChunkListResponse(
chunks=chunk_responses, chunks=chunk_responses,
total_chunks=len(chunk_responses), total_chunks=len(chunk_responses),
) )
@ -342,7 +486,7 @@ async def health_check() -> HealthCheckResponse:
return HealthCheckResponse( return HealthCheckResponse(
status="healthy", status="healthy",
version="1.0.0", version="1.0.0",
supported_file_types=["pdf", "docx", "txt", "zip"], supported_file_types=["pdf", "docx", "txt", "md", "markdown", "zip"],
available_strategies=["fixed_size", "paragraph"], available_strategies=["fixed_size", "paragraph"],
) )

View File

@ -109,12 +109,12 @@ class ProcessDocumentResponse(BaseModel):
message: str = Field(default="Document processed successfully") message: str = Field(default="Document processed successfully")
class ExtractAndChunkResponse(BaseModel): class ChunkListResponse(BaseModel):
"""Response model for extract and chunk operation.""" """Response model for extract and chunk operation."""
chunks: List[ChunkResponse] chunks: List[ChunkResponse]
total_chunks: int total_chunks: int
message: str = Field(default="Document extracted and chunked successfully") message: str = Field(default="Document chunked successfully")
class DocumentListResponse(BaseModel): class DocumentListResponse(BaseModel):

View File

@ -15,6 +15,7 @@ from .adapters.outgoing.chunkers.fixed_size_chunker import FixedSizeChunker
from .adapters.outgoing.chunkers.paragraph_chunker import ParagraphChunker from .adapters.outgoing.chunkers.paragraph_chunker import ParagraphChunker
from .adapters.outgoing.extractors.docx_extractor import DocxExtractor from .adapters.outgoing.extractors.docx_extractor import DocxExtractor
from .adapters.outgoing.extractors.factory import ExtractorFactory from .adapters.outgoing.extractors.factory import ExtractorFactory
from .adapters.outgoing.extractors.markdown_extractor import MarkdownExtractor
from .adapters.outgoing.extractors.pdf_extractor import PDFExtractor from .adapters.outgoing.extractors.pdf_extractor import PDFExtractor
from .adapters.outgoing.extractors.txt_extractor import TxtExtractor from .adapters.outgoing.extractors.txt_extractor import TxtExtractor
from .adapters.outgoing.extractors.zip_extractor import ZipExtractor from .adapters.outgoing.extractors.zip_extractor import ZipExtractor
@ -100,6 +101,7 @@ class ApplicationContainer:
factory.register_extractor(PDFExtractor()) factory.register_extractor(PDFExtractor())
factory.register_extractor(DocxExtractor()) factory.register_extractor(DocxExtractor())
factory.register_extractor(TxtExtractor()) factory.register_extractor(TxtExtractor())
factory.register_extractor(MarkdownExtractor())
factory.register_extractor(ZipExtractor()) factory.register_extractor(ZipExtractor())
logger.info( logger.info(

View File

@ -17,6 +17,7 @@ class SourceType(str, Enum):
"""Enumeration of supported source types.""" """Enumeration of supported source types."""
FILE = "file" FILE = "file"
WEB = "web" WEB = "web"
TEXT = "text"
class ChunkingMethod(str, Enum): class ChunkingMethod(str, Enum):

View File

@ -20,29 +20,6 @@ class ITextProcessor(ABC):
the entry point into the core domain logic. the entry point into the core domain logic.
""" """
@abstractmethod
def process_document(
self,
file_path: Path,
chunking_strategy: ChunkingStrategy,
) -> Document:
"""
Process a document by extracting text and storing it.
Args:
file_path: Path to the document file
chunking_strategy: Strategy configuration for chunking
Returns:
Processed Document entity
Raises:
ExtractionError: If text extraction fails
ProcessingError: If document processing fails
UnsupportedFileTypeError: If file type is not supported
"""
pass
@abstractmethod @abstractmethod
def extract_and_chunk( def extract_and_chunk(
self, self,

View File

@ -53,71 +53,6 @@ class DocumentProcessorService(ITextProcessor):
self._repository = repository self._repository = repository
logger.info("DocumentProcessorService initialized") logger.info("DocumentProcessorService initialized")
def process_document(
self,
file_path: Path,
chunking_strategy: ChunkingStrategy,
) -> Document:
"""
Process a document using the stateless pipeline.
Pipeline Order:
1. Extract Document with raw_markdown and metadata (via Adapter)
2. Parse Markdown into DocumentSection objects
3. Update Document with sections
4. Validate and persist Document
5. Mark as processed
Args:
file_path: Path to the document file
chunking_strategy: Strategy configuration (for metadata)
Returns:
Fully processed Document entity
Raises:
ExtractionError: If text extraction fails
ProcessingError: If document processing fails
UnsupportedFileTypeError: If file type is not supported
"""
try:
logger.info(f"Processing document: {file_path}")
# Step 1: Extract Document with raw_markdown and metadata
document = self._extract_document(file_path)
# Step 2: Parse Markdown into structured sections
sections = parse_markdown(document.raw_markdown)
logger.debug(f"Parsed {len(sections)} sections from document")
# Step 3: Update Document with sections
document = document.model_copy(update={"sections": sections})
# Step 4: Validate document content
document.validate_content()
# Step 5: Persist to repository
saved_document = self._repository.save(document)
# Step 6: Mark as processed
saved_document.mark_as_processed()
self._repository.save(saved_document)
logger.info(
f"Document processed successfully: {saved_document.id} "
f"({len(sections)} sections)"
)
return saved_document
except ExtractionError:
raise
except Exception as e:
logger.error(f"Failed to process document: {str(e)}")
raise ProcessingError(
message="Document processing failed",
details=str(e),
)
def extract_and_chunk( def extract_and_chunk(
self, self,
file_path: Path, file_path: Path,
@ -260,7 +195,7 @@ class DocumentProcessorService(ITextProcessor):
metadata = DocumentMetadata( metadata = DocumentMetadata(
source_id="text_input", source_id="text_input",
source_type=SourceType.WEB, # Using WEB type for text input source_type=SourceType.TEXT,
display_name=f"{title}.md", display_name=f"{title}.md",
size_bytes=len(text.encode('utf-8')), size_bytes=len(text.encode('utf-8')),
) )