416 lines
14 KiB
Python

"""
Core Domain Models - Rich Pydantic v2 Entities with Internal Validation.
This module contains the domain entities that represent the core business concepts.
All models are immutable by default and include comprehensive validation.
"""
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
from uuid import UUID, uuid4
from pydantic import BaseModel, Field, field_validator, model_validator
class SourceType(str, Enum):
"""Enumeration of supported source types."""
FILE = "file"
WEB = "web"
TEXT = "text"
class ChunkingMethod(str, Enum):
"""Enumeration of supported chunking methods."""
FIXED_SIZE = "fixed_size"
PARAGRAPH = "paragraph"
class SourceFile(BaseModel):
"""
Represents the raw input file before processing.
This model encapsulates file system information about the document source.
Flow: SourceFile -> Extraction -> Document
Attributes:
path: Absolute path to the source file
extension: File extension (e.g., 'md', 'pdf', 'docx')
size_bytes: Size of the file in bytes
"""
path: Path = Field(..., description="Absolute path to source file")
extension: str = Field(..., min_length=1, description="File extension")
size_bytes: int = Field(..., ge=0, description="File size in bytes")
model_config = {
"frozen": True, # SourceFile is immutable
}
@field_validator('extension')
@classmethod
def normalize_extension(cls, value: str) -> str:
"""Normalize extension to lowercase without leading dot."""
normalized = value.lower().strip()
return normalized.lstrip('.')
@field_validator('path')
@classmethod
def validate_path_exists(cls, value: Path) -> Path:
"""Validate that the path exists."""
if not value.exists():
raise ValueError(f"Source file does not exist: {value}")
if not value.is_file():
raise ValueError(f"Path is not a file: {value}")
return value
def get_file_name(self) -> str:
"""Get the filename without path."""
return self.path.name
def get_file_stem(self) -> str:
"""Get the filename without extension."""
return self.path.stem
class WebPageSource(BaseModel):
"""
Represents a web page source for document extraction.
This model encapsulates URL information about the document source.
Flow: WebPageSource -> Extraction -> Document
Attributes:
url: URL of the web page
display_name: Human-readable name (e.g., 'about_us.html')
content_length: Optional content length in bytes
"""
url: str = Field(..., min_length=1, description="Web page URL")
display_name: str = Field(..., min_length=1, description="Display name")
content_length: Optional[int] = Field(None, ge=0, description="Content length")
model_config = {
"frozen": True, # WebPageSource is immutable
}
@field_validator('url')
@classmethod
def validate_url(cls, value: str) -> str:
"""Validate URL format."""
value = value.strip()
if not (value.startswith('http://') or value.startswith('https://')):
raise ValueError(f"URL must start with http:// or https://: {value}")
return value
@field_validator('display_name')
@classmethod
def normalize_display_name(cls, value: str) -> str:
"""Normalize display name."""
return value.strip()
def get_domain(self) -> str:
"""Extract domain from URL."""
from urllib.parse import urlparse
parsed = urlparse(self.url)
return parsed.netloc
class DocumentSection(BaseModel):
"""
Represents a structured section of a Markdown document.
Sections are created by parsing Markdown headers. Text before the first
header is grouped into an "Introduction" section.
Attributes:
title: Section title (from header or "Introduction")
level: Header level (1-6 for h1-h6, 0 for Introduction)
content: Section content with preserved Markdown formatting
"""
title: Optional[str] = Field(None, min_length=1, description="Section title")
level: int = Field(..., ge=0, le=6, description="Header level (0=intro)")
content: str = Field(..., description="Section content with formatting")
model_config = {
"frozen": True, # Sections are immutable
}
@field_validator('title')
@classmethod
def normalize_title(cls, value: str) -> str:
"""Normalize title by stripping whitespace."""
if value:
return value.strip()
return value
def is_introduction(self) -> bool:
"""Check if this is the introduction section."""
return self.level == 0 and self.title == "Introduction"
def get_word_count(self) -> int:
"""Get approximate word count of section content."""
return len(self.content.split())
class DocumentMetadata(BaseModel):
"""
Source-neutral metadata for documents.
This metadata works for both file-based and web-based sources,
enabling a unified processing pipeline.
Attributes:
source_id: Path or URL identifying the source
source_type: Type of source (FILE or WEB)
size_bytes: Size in bytes (file size or content length)
created_at: Timestamp when metadata was created
author: Optional author information
extra_metadata: Additional source-specific metadata
"""
source_id: str = Field(..., min_length=1, description="Path or URL")
source_type: SourceType = Field(..., description="Source type enum")
size_bytes: int = Field(..., ge=0, description="Size in bytes")
created_at: datetime = Field(default_factory=datetime.utcnow)
author: Optional[str] = Field(None, description="Author information")
extra_metadata: Dict[str, str] = Field(
default_factory=dict,
description="Additional metadata"
)
def _format_size(self) -> str:
"""Format size in human-readable format."""
size = self.size_bytes
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.2f} {unit}"
size /= 1024.0
return f"{size:.2f} TB"
def is_file_source(self) -> bool:
"""Check if this is a file-based source."""
return self.source_type == SourceType.FILE
def is_web_source(self) -> bool:
"""Check if this is a web-based source."""
return self.source_type == SourceType.WEB
class Document(BaseModel):
"""
Core domain entity representing a document with extracted and structured content.
This rich model contains both the raw Markdown and parsed sections,
enabling flexible querying and processing strategies.
Attributes:
id: Unique identifier for the document
raw_markdown: Raw Markdown text extracted from source
sections: Parsed structured sections from Markdown
metadata: Associated metadata
is_processed: Flag indicating if document has been processed
download_url: Optional presigned URL for downloading the markdown file
"""
id: UUID = Field(default_factory=uuid4, description="Unique document ID")
raw_markdown: str = Field(..., description="Raw Markdown content")
title: str = Field(..., description="Document title")
sections: List[DocumentSection] = Field(
default_factory=list,
description="Structured document sections"
)
metadata: DocumentMetadata = Field(..., description="Document metadata")
is_processed: bool = Field(default=False, description="Processing status")
download_url: Optional[str] = Field(None, description="Presigned download URL")
model_config = {
"frozen": False, # Allow mutation for processing status
"str_strip_whitespace": True,
}
@field_validator('raw_markdown')
@classmethod
def validate_content_not_empty(cls, value: str) -> str:
"""Ensure content is not empty or just whitespace."""
if not value or not value.strip():
raise ValueError("Document content cannot be empty")
return value
@property
def content(self) -> str:
"""
Backward compatibility property for raw content access.
Returns:
Raw markdown content
"""
return self.raw_markdown
def validate_content(self) -> bool:
"""
Validate that the document content meets quality standards.
Returns:
True if content is valid, raises ValueError otherwise
Raises:
ValueError: If content fails validation checks
"""
# Check minimum length
if len(self.raw_markdown.strip()) < 10:
raise ValueError("Document content is too short (minimum 10 characters)")
# Check for suspicious patterns (e.g., too many special characters)
special_char_ratio = sum(
not c.isalnum() and not c.isspace()
for c in self.raw_markdown
) / len(self.raw_markdown)
if special_char_ratio > 0.5:
raise ValueError(
f"Document content has too many special characters ({special_char_ratio:.2%})"
)
return True
def mark_as_processed(self) -> None:
"""Mark the document as processed."""
self.is_processed = True
def get_content_preview(self, length: int = 100) -> str:
"""
Get a preview of the document content.
Args:
length: Maximum length of preview
Returns:
Truncated content with ellipsis if needed
"""
if len(self.raw_markdown) <= length:
return self.raw_markdown
return f"{self.raw_markdown[:length]}..."
def get_section_count(self) -> int:
"""Get the number of sections in the document."""
return len(self.sections)
def get_sections_by_level(self, level: int) -> List[DocumentSection]:
"""
Get all sections at a specific header level.
Args:
level: Header level to filter by (0-6)
Returns:
List of sections at the specified level
"""
return [section for section in self.sections if section.level == level]
def get_section_titles(self) -> List[str]:
"""
Get all section titles in document order.
Returns:
List of section titles
"""
return [section.title for section in self.sections]
class Chunk(BaseModel):
"""
Represents a chunk of text extracted from a document.
Enhanced to track section membership for precision chunking.
Attributes:
id: Unique identifier for the chunk
document_id: ID of the parent document
content: Text content of the chunk
sequence_number: Order of this chunk in the document
section_title: Title of the section this chunk belongs to
section_index: Index of the section in document.sections
metadata: Optional metadata specific to this chunk
"""
id: UUID = Field(default_factory=uuid4, description="Unique chunk ID")
document_id: UUID = Field(..., description="Parent document ID")
content: str = Field(..., min_length=1, description="Chunk text content")
sequence_number: int = Field(..., ge=0, description="Chunk order in document")
section_title: Optional[str] = Field(None, description="Section title")
section_index: Optional[int] = Field(None, ge=0, description="Section index")
metadata: Dict[str, str] = Field(default_factory=dict)
model_config = {
"frozen": True, # Chunks are immutable
}
def get_length(self) -> int:
"""Get the length of the chunk content."""
return len(self.content)
def contains_text(self, text: str, case_sensitive: bool = False) -> bool:
"""
Check if chunk contains specific text.
Args:
text: Text to search for
case_sensitive: Whether search should be case-sensitive
Returns:
True if text is found in chunk
"""
content = self.content if case_sensitive else self.content.lower()
search_text = text if case_sensitive else text.lower()
return search_text in content
def belongs_to_section(self) -> bool:
"""Check if this chunk belongs to a specific section."""
return self.section_title is not None and self.section_index is not None
def get_section_context(self) -> str:
"""
Get a string describing the section context.
Returns:
Section context description or 'No section'
"""
if self.belongs_to_section():
return f"Section {self.section_index}: {self.section_title}"
return "No section"
class ChunkingStrategy(BaseModel):
"""
Configuration for a chunking strategy.
Attributes:
strategy_name: Chunking method (fixed_size or paragraph)
chunk_size: Target size for chunks (in characters)
overlap_size: Number of characters to overlap between chunks
respect_boundaries: Whether to respect sentence/paragraph boundaries
"""
strategy_name: ChunkingMethod = Field(..., description="Chunking method")
chunk_size: int = Field(..., ge=1, le=10000, description="Target chunk size")
overlap_size: int = Field(default=0, ge=0, description="Overlap between chunks")
respect_boundaries: bool = Field(
default=True,
description="Respect text boundaries"
)
@model_validator(mode='after')
def validate_overlap_less_than_size(self) -> 'ChunkingStrategy':
"""Ensure overlap is less than chunk size."""
if self.overlap_size >= self.chunk_size:
raise ValueError(
f"overlap_size ({self.overlap_size}) must be less than "
f"chunk_size ({self.chunk_size})"
)
return self
def calculate_effective_step(self) -> int:
"""
Calculate the effective step size between chunks.
Returns:
Number of characters to advance for next chunk
"""
return self.chunk_size - self.overlap_size