make chunking method enum and remove some redundant code in core and api
This commit is contained in:
parent
e2e1c86dd4
commit
e783d92eca
@ -20,7 +20,7 @@ from ...core.domain.exceptions import (
|
||||
ProcessingError,
|
||||
UnsupportedFileTypeError,
|
||||
)
|
||||
from ...core.domain.models import ChunkingStrategy
|
||||
from ...core.domain.models import ChunkingMethod, ChunkingStrategy
|
||||
from ...core.ports.incoming.text_processor import ITextProcessor
|
||||
from .api_schemas import (
|
||||
ChunkResponse,
|
||||
@ -187,7 +187,7 @@ def _map_domain_exception(exception: DomainException) -> HTTPException:
|
||||
)
|
||||
async def process_file(
|
||||
file: UploadFile = File(..., description="Document file to process (pdf, docx, txt, zip)"),
|
||||
strategy_name: str = Form(..., description="Chunking strategy name", examples=["fixed_size", "paragraph"]),
|
||||
strategy_name: ChunkingMethod = Form(..., description="Chunking method"),
|
||||
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000),
|
||||
overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
|
||||
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
|
||||
@ -270,95 +270,6 @@ async def process_file(
|
||||
logger.warning(f"Failed to delete temporary file {temp_file_path}: {str(e)}")
|
||||
|
||||
|
||||
@router.post(
|
||||
"/process-text",
|
||||
response_model=ExtractAndChunkResponse,
|
||||
status_code=status.HTTP_200_OK,
|
||||
summary="Process markdown text (parse and chunk)",
|
||||
description="Accept markdown text, parse structure, and return chunks",
|
||||
)
|
||||
async def process_text(
|
||||
text: str = Form(..., description="Markdown text to process"),
|
||||
strategy_name: str = Form(..., description="Chunking strategy name", examples=["fixed_size", "paragraph"]),
|
||||
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000),
|
||||
overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
|
||||
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
|
||||
title: str = Form("text_input", description="Optional title for the text document"),
|
||||
) -> ExtractAndChunkResponse:
|
||||
"""
|
||||
Process raw markdown text: Parse → Chunk.
|
||||
|
||||
This endpoint handles text processing workflow:
|
||||
1. Accepts markdown text as string
|
||||
2. Parses markdown structure into sections
|
||||
3. Persists document to repository
|
||||
4. Chunks content according to strategy
|
||||
5. Returns chunks with metadata
|
||||
|
||||
Args:
|
||||
text: Markdown text content
|
||||
strategy_name: Name of chunking strategy
|
||||
chunk_size: Target chunk size
|
||||
overlap_size: Overlap between chunks
|
||||
respect_boundaries: Whether to respect boundaries
|
||||
title: Optional title for the document
|
||||
|
||||
Returns:
|
||||
Response with chunks
|
||||
|
||||
Raises:
|
||||
HTTPException: If parsing or chunking fails
|
||||
"""
|
||||
try:
|
||||
# Basic validation at API boundary
|
||||
if not text or not text.strip():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Text content cannot be empty",
|
||||
)
|
||||
|
||||
# Get service from bootstrap
|
||||
service: ITextProcessor = _get_service()
|
||||
|
||||
# Create chunking strategy
|
||||
strategy = ChunkingStrategy(
|
||||
strategy_name=strategy_name,
|
||||
chunk_size=chunk_size,
|
||||
overlap_size=overlap_size,
|
||||
respect_boundaries=respect_boundaries,
|
||||
)
|
||||
|
||||
# Execute complete workflow through service
|
||||
logger.info(f"Processing text input via service: {len(text)} characters")
|
||||
chunks = service.process_text_to_chunks(
|
||||
text=text,
|
||||
chunking_strategy=strategy,
|
||||
title=title,
|
||||
)
|
||||
|
||||
# Convert to response
|
||||
chunk_responses = [_to_chunk_response(c) for c in chunks]
|
||||
|
||||
logger.info(f"Successfully processed text: {len(chunks)} chunks created")
|
||||
|
||||
return ExtractAndChunkResponse(
|
||||
chunks=chunk_responses,
|
||||
total_chunks=len(chunk_responses),
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except DomainException as e:
|
||||
raise _map_domain_exception(e)
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error processing text: {str(e)}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Internal server error: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@router.get(
|
||||
"/health",
|
||||
response_model=HealthCheckResponse,
|
||||
|
||||
@ -9,14 +9,15 @@ from uuid import UUID
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ...core.domain.models import ChunkingMethod
|
||||
|
||||
|
||||
class ChunkingStrategyRequest(BaseModel):
|
||||
"""Request model for chunking strategy configuration."""
|
||||
|
||||
strategy_name: str = Field(
|
||||
strategy_name: ChunkingMethod = Field(
|
||||
...,
|
||||
description="Name of chunking strategy (e.g., 'fixed_size', 'paragraph')",
|
||||
examples=["fixed_size", "paragraph"],
|
||||
description="Chunking method (FIXED_SIZE or PARAGRAPH)",
|
||||
)
|
||||
chunk_size: int = Field(
|
||||
...,
|
||||
|
||||
@ -19,6 +19,12 @@ class SourceType(str, Enum):
|
||||
WEB = "web"
|
||||
|
||||
|
||||
class ChunkingMethod(str, Enum):
|
||||
"""Enumeration of supported chunking methods."""
|
||||
FIXED_SIZE = "fixed_size"
|
||||
PARAGRAPH = "paragraph"
|
||||
|
||||
|
||||
class SourceFile(BaseModel):
|
||||
"""
|
||||
Represents the raw input file before processing.
|
||||
@ -429,12 +435,12 @@ class ChunkingStrategy(BaseModel):
|
||||
Configuration for a chunking strategy.
|
||||
|
||||
Attributes:
|
||||
strategy_name: Name of the chunking strategy
|
||||
strategy_name: Chunking method (fixed_size or paragraph)
|
||||
chunk_size: Target size for chunks (in characters)
|
||||
overlap_size: Number of characters to overlap between chunks
|
||||
respect_boundaries: Whether to respect sentence/paragraph boundaries
|
||||
"""
|
||||
strategy_name: str = Field(..., min_length=1, description="Strategy name")
|
||||
strategy_name: ChunkingMethod = Field(..., description="Chunking method")
|
||||
chunk_size: int = Field(..., ge=1, le=10000, description="Target chunk size")
|
||||
overlap_size: int = Field(default=0, ge=0, description="Overlap between chunks")
|
||||
respect_boundaries: bool = Field(
|
||||
|
||||
@ -65,54 +65,6 @@ class ITextProcessor(ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_document(self, document_id: UUID) -> Document:
|
||||
"""
|
||||
Retrieve a document by its ID.
|
||||
|
||||
Args:
|
||||
document_id: Unique identifier of the document
|
||||
|
||||
Returns:
|
||||
Document entity
|
||||
|
||||
Raises:
|
||||
DocumentNotFoundError: If document doesn't exist
|
||||
RepositoryError: If retrieval fails
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list_documents(self, limit: int = 100, offset: int = 0) -> List[Document]:
|
||||
"""
|
||||
List documents with pagination.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of documents to return
|
||||
offset: Number of documents to skip
|
||||
|
||||
Returns:
|
||||
List of Document entities
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_document(self, document_id: UUID) -> bool:
|
||||
"""
|
||||
Delete a document by its ID.
|
||||
|
||||
Args:
|
||||
document_id: Unique identifier of the document
|
||||
|
||||
Returns:
|
||||
True if deletion was successful
|
||||
|
||||
Raises:
|
||||
DocumentNotFoundError: If document doesn't exist
|
||||
RepositoryError: If deletion fails
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def process_text_to_chunks(
|
||||
self,
|
||||
|
||||
@ -165,64 +165,6 @@ class DocumentProcessorService(ITextProcessor):
|
||||
logger.error(f"Failed to extract and chunk: {str(e)}")
|
||||
raise
|
||||
|
||||
def get_document(self, document_id: UUID) -> Document:
|
||||
"""
|
||||
Retrieve a document by its ID.
|
||||
|
||||
Args:
|
||||
document_id: Unique identifier of the document
|
||||
|
||||
Returns:
|
||||
Document entity
|
||||
|
||||
Raises:
|
||||
DocumentNotFoundError: If document doesn't exist
|
||||
RepositoryError: If retrieval fails
|
||||
"""
|
||||
logger.debug(f"Retrieving document: {document_id}")
|
||||
|
||||
document = self._repository.find_by_id(document_id)
|
||||
|
||||
if document is None:
|
||||
raise DocumentNotFoundError(str(document_id))
|
||||
|
||||
return document
|
||||
|
||||
def list_documents(self, limit: int = 100, offset: int = 0) -> List[Document]:
|
||||
"""
|
||||
List documents with pagination.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of documents to return
|
||||
offset: Number of documents to skip
|
||||
|
||||
Returns:
|
||||
List of Document entities
|
||||
"""
|
||||
logger.debug(f"Listing documents: limit={limit}, offset={offset}")
|
||||
return self._repository.find_all(limit=limit, offset=offset)
|
||||
|
||||
def delete_document(self, document_id: UUID) -> bool:
|
||||
"""
|
||||
Delete a document by its ID.
|
||||
|
||||
Args:
|
||||
document_id: Unique identifier of the document
|
||||
|
||||
Returns:
|
||||
True if deletion was successful
|
||||
|
||||
Raises:
|
||||
DocumentNotFoundError: If document doesn't exist
|
||||
RepositoryError: If deletion fails
|
||||
"""
|
||||
logger.info(f"Deleting document: {document_id}")
|
||||
|
||||
if not self._repository.exists(document_id):
|
||||
raise DocumentNotFoundError(str(document_id))
|
||||
|
||||
return self._repository.delete(document_id)
|
||||
|
||||
def _extract_document(self, file_path: Path) -> Document:
|
||||
"""
|
||||
Extract Document using appropriate extractor.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user