make the domain general and open to add crawling system

This commit is contained in:
m.dabbagh 2026-01-08 04:57:35 +03:30
parent 359026fa98
commit 2c375ce6bd
8 changed files with 183 additions and 119 deletions

View File

@ -12,7 +12,7 @@ from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
)
from ....core.domain.models import Document, DocumentMetadata
from ....core.domain.models import Document, DocumentMetadata, SourceType
from ....core.ports.outgoing.extractor import IExtractor
@ -209,7 +209,7 @@ class DocxExtractor(IExtractor):
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create document metadata from file.
Create source-neutral document metadata from file.
Args:
file_path: Path to the file
@ -220,7 +220,8 @@ class DocxExtractor(IExtractor):
stat = file_path.stat()
return DocumentMetadata(
file_name=file_path.name,
file_type=file_path.suffix.lstrip('.').lower(),
file_size_bytes=stat.st_size,
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
size_bytes=stat.st_size,
)

View File

@ -12,7 +12,7 @@ from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
)
from ....core.domain.models import Document, DocumentMetadata
from ....core.domain.models import Document, DocumentMetadata, SourceType
from ....core.ports.outgoing.extractor import IExtractor
@ -200,7 +200,7 @@ class PDFExtractor(IExtractor):
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create document metadata from file.
Create source-neutral document metadata from file.
Args:
file_path: Path to the file
@ -211,7 +211,8 @@ class PDFExtractor(IExtractor):
stat = file_path.stat()
return DocumentMetadata(
file_name=file_path.name,
file_type=file_path.suffix.lstrip('.').lower(),
file_size_bytes=stat.st_size,
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
size_bytes=stat.st_size,
)

View File

@ -12,7 +12,7 @@ from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
)
from ....core.domain.models import Document, DocumentMetadata
from ....core.domain.models import Document, DocumentMetadata, SourceType
from ....core.ports.outgoing.extractor import IExtractor
@ -187,7 +187,7 @@ class TxtExtractor(IExtractor):
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create document metadata from file.
Create source-neutral document metadata from file.
Args:
file_path: Path to the file
@ -198,7 +198,8 @@ class TxtExtractor(IExtractor):
stat = file_path.stat()
return DocumentMetadata(
file_name=file_path.name,
file_type=file_path.suffix.lstrip('.').lower(),
file_size_bytes=stat.st_size,
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
size_bytes=stat.st_size,
)

View File

@ -5,6 +5,7 @@ This module contains the domain entities that represent the core business concep
All models are immutable by default and include comprehensive validation.
"""
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
from uuid import UUID, uuid4
@ -12,6 +13,12 @@ from uuid import UUID, uuid4
from pydantic import BaseModel, Field, field_validator, model_validator
class SourceType(str, Enum):
"""Enumeration of supported source types."""
FILE = "file"
WEB = "web"
class SourceFile(BaseModel):
"""
Represents the raw input file before processing.
@ -58,6 +65,48 @@ class SourceFile(BaseModel):
return self.path.stem
class WebPageSource(BaseModel):
"""
Represents a web page source for document extraction.
This model encapsulates URL information about the document source.
Flow: WebPageSource -> Extraction -> Document
Attributes:
url: URL of the web page
display_name: Human-readable name (e.g., 'about_us.html')
content_length: Optional content length in bytes
"""
url: str = Field(..., min_length=1, description="Web page URL")
display_name: str = Field(..., min_length=1, description="Display name")
content_length: Optional[int] = Field(None, ge=0, description="Content length")
model_config = {
"frozen": True, # WebPageSource is immutable
}
@field_validator('url')
@classmethod
def validate_url(cls, value: str) -> str:
"""Validate URL format."""
value = value.strip()
if not (value.startswith('http://') or value.startswith('https://')):
raise ValueError(f"URL must start with http:// or https://: {value}")
return value
@field_validator('display_name')
@classmethod
def normalize_display_name(cls, value: str) -> str:
"""Normalize display name."""
return value.strip()
def get_domain(self) -> str:
"""Extract domain from URL."""
from urllib.parse import urlparse
parsed = urlparse(self.url)
return parsed.netloc
class DocumentSection(BaseModel):
"""
Represents a structured section of a Markdown document.
@ -95,30 +144,36 @@ class DocumentSection(BaseModel):
class DocumentMetadata(BaseModel):
"""
Metadata associated with a document.
Source-neutral metadata for documents.
This metadata works for both file-based and web-based sources,
enabling a unified processing pipeline.
Attributes:
file_name: Original filename of the document
file_type: Type/extension of the file (e.g., 'pdf', 'docx')
file_size_bytes: Size of the file in bytes
created_at: Timestamp when document was created
source_id: Path or URL identifying the source
source_type: Type of source (FILE or WEB)
display_name: Human-readable name (e.g., 'manual.pdf', 'about_us.html')
size_bytes: Size in bytes (file size or content length)
created_at: Timestamp when metadata was created
author: Optional author information
page_count: Optional number of pages in document
custom_fields: Additional metadata fields
extra_metadata: Additional source-specific metadata
"""
file_name: str = Field(..., min_length=1, description="Original filename")
file_type: str = Field(..., min_length=1, description="File extension")
file_size_bytes: int = Field(..., ge=0, description="File size in bytes")
source_id: str = Field(..., min_length=1, description="Path or URL")
source_type: SourceType = Field(..., description="Source type enum")
display_name: str = Field(..., min_length=1, description="Display name")
size_bytes: int = Field(..., ge=0, description="Size in bytes")
created_at: datetime = Field(default_factory=datetime.utcnow)
author: Optional[str] = Field(None, description="Document author")
page_count: Optional[int] = Field(None, ge=1, description="Number of pages")
custom_fields: Dict[str, str] = Field(default_factory=dict)
author: Optional[str] = Field(None, description="Author information")
extra_metadata: Dict[str, str] = Field(
default_factory=dict,
description="Additional metadata"
)
@field_validator('file_type')
@field_validator('display_name')
@classmethod
def validate_file_type(cls, value: str) -> str:
"""Ensure file type is lowercase and stripped."""
return value.lower().strip()
def normalize_display_name(cls, value: str) -> str:
"""Normalize display name."""
return value.strip()
def get_summary(self) -> str:
"""
@ -128,28 +183,33 @@ class DocumentMetadata(BaseModel):
Formatted string containing key metadata information
"""
summary_parts = [
f"File: {self.file_name}",
f"Type: {self.file_type}",
f"Size: {self._format_file_size()}",
f"Source: {self.display_name}",
f"Type: {self.source_type.value}",
f"Size: {self._format_size()}",
]
if self.author:
summary_parts.append(f"Author: {self.author}")
if self.page_count:
summary_parts.append(f"Pages: {self.page_count}")
return " | ".join(summary_parts)
def _format_file_size(self) -> str:
"""Format file size in human-readable format."""
size = self.file_size_bytes
def _format_size(self) -> str:
"""Format size in human-readable format."""
size = self.size_bytes
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.2f} {unit}"
size /= 1024.0
return f"{size:.2f} TB"
def is_file_source(self) -> bool:
"""Check if this is a file-based source."""
return self.source_type == SourceType.FILE
def is_web_source(self) -> bool:
"""Check if this is a web-based source."""
return self.source_type == SourceType.WEB
class Document(BaseModel):
"""
@ -281,6 +341,8 @@ class Chunk(BaseModel):
"""
Represents a chunk of text extracted from a document.
Enhanced to track section membership for precision chunking.
Attributes:
id: Unique identifier for the chunk
document_id: ID of the parent document
@ -288,6 +350,8 @@ class Chunk(BaseModel):
sequence_number: Order of this chunk in the document
start_char: Starting character position in original document
end_char: Ending character position in original document
section_title: Title of the section this chunk belongs to
section_index: Index of the section in document.sections
metadata: Optional metadata specific to this chunk
"""
id: UUID = Field(default_factory=uuid4, description="Unique chunk ID")
@ -296,6 +360,8 @@ class Chunk(BaseModel):
sequence_number: int = Field(..., ge=0, description="Chunk order in document")
start_char: int = Field(..., ge=0, description="Start position in document")
end_char: int = Field(..., gt=0, description="End position in document")
section_title: Optional[str] = Field(None, description="Section title")
section_index: Optional[int] = Field(None, ge=0, description="Section index")
metadata: Dict[str, str] = Field(default_factory=dict)
model_config = {
@ -342,6 +408,21 @@ class Chunk(BaseModel):
search_text = text if case_sensitive else text.lower()
return search_text in content
def belongs_to_section(self) -> bool:
"""Check if this chunk belongs to a specific section."""
return self.section_title is not None and self.section_index is not None
def get_section_context(self) -> str:
"""
Get a string describing the section context.
Returns:
Section context description or 'No section'
"""
if self.belongs_to_section():
return f"Section {self.section_index}: {self.section_title}"
return "No section"
class ChunkingStrategy(BaseModel):
"""

View File

@ -1,14 +1,13 @@
"""
Outgoing Port - Text Chunker Interface.
This defines the contract for chunking text into smaller pieces.
This defines the contract for chunking documents into smaller pieces.
Different strategies can be implemented as adapters.
"""
from abc import ABC, abstractmethod
from typing import List
from uuid import UUID
from ...domain.models import Chunk, ChunkingStrategy
from ...domain.models import Chunk, ChunkingStrategy, Document
class IChunker(ABC):
@ -16,26 +15,26 @@ class IChunker(ABC):
Interface for text chunking strategies.
Implementations of this interface provide different strategies
for splitting text into manageable chunks.
for splitting documents into manageable chunks with section awareness.
"""
@abstractmethod
def chunk(
self,
text: str,
document_id: UUID,
document: Document,
strategy: ChunkingStrategy,
) -> List[Chunk]:
"""
Split text into chunks according to a strategy.
Split document into chunks according to a strategy.
Chunkers can utilize document.sections for section-aware chunking.
Args:
text: Text content to chunk
document_id: ID of the parent document
document: Full Document entity with raw_markdown and sections
strategy: Chunking strategy configuration
Returns:
List of Chunk entities
List of Chunk entities with section metadata
Raises:
ChunkingError: If chunking fails

View File

@ -5,9 +5,8 @@ This defines the contract for managing chunking strategies.
"""
from abc import ABC, abstractmethod
from typing import List
from uuid import UUID
from ...domain.models import Chunk, ChunkingStrategy
from ...domain.models import Chunk, ChunkingStrategy, Document
from .chunker import IChunker
@ -22,23 +21,21 @@ class IChunkingContext(ABC):
@abstractmethod
def execute_chunking(
self,
text: str,
document_id: UUID,
document: Document,
strategy: ChunkingStrategy,
) -> List[Chunk]:
"""
Execute chunking using the specified strategy.
This method is stateless and thread-safe. It selects the appropriate
chunker based on the strategy configuration and executes chunking.
This method is stateless and thread-safe. It accepts the full
Document object (with sections) to enable section-aware chunking.
Args:
text: Text to chunk
document_id: ID of parent document
document: Full Document entity with raw_markdown and sections
strategy: Chunking strategy configuration (includes strategy_name)
Returns:
List of chunks
List of chunks with section metadata
Raises:
ChunkingError: If strategy is not registered or chunking fails

View File

@ -1,8 +1,8 @@
"""
Outgoing Port - Text Extractor Interface.
This defines the contract for extracting text from documents.
Different adapters can implement this for various file types.
This defines the contract for extracting content from documents.
Different adapters can implement this for various file types and sources.
"""
from abc import ABC, abstractmethod
from pathlib import Path
@ -16,7 +16,7 @@ class IExtractor(ABC):
Interface for text extraction from documents.
Implementations of this interface handle specific file formats
(PDF, DOCX, TXT, etc.) and adapt external libraries to the domain.
(PDF, DOCX, TXT, etc.) or web sources and return Document entities.
"""
@abstractmethod
@ -24,11 +24,14 @@ class IExtractor(ABC):
"""
Extract text and metadata from a document file.
Extractors create Document entities with raw_markdown and metadata.
Sections are parsed later in the pipeline.
Args:
file_path: Path to the document file
Returns:
Document entity with extracted content and metadata
Document entity with raw_markdown and metadata populated
Raises:
ExtractionError: If extraction fails

View File

@ -59,21 +59,21 @@ class DocumentProcessorService(ITextProcessor):
chunking_strategy: ChunkingStrategy,
) -> Document:
"""
Process a document by extracting, parsing, and storing it.
Process a document using the stateless pipeline.
New Pragmatic Pipeline:
1. Extract: Get raw Markdown from SourceFile using extractor
2. Parse: Use parse_markdown to create structured sections
3. Assemble: Create rich Document with raw_markdown + sections
4. Persist: Save to repository
5. Finalize: Mark as processed
Pipeline Order:
1. Extract Document with raw_markdown and metadata (via Adapter)
2. Parse Markdown into DocumentSection objects
3. Update Document with sections
4. Validate and persist Document
5. Mark as processed
Args:
file_path: Path to the document file
chunking_strategy: Strategy configuration (for metadata)
Returns:
Processed Document entity with structured sections
Fully processed Document entity
Raises:
ExtractionError: If text extraction fails
@ -83,15 +83,14 @@ class DocumentProcessorService(ITextProcessor):
try:
logger.info(f"Processing document: {file_path}")
# Step 1: Extract raw Markdown from SourceFile
source_file = self._create_source_file(file_path)
document = self._extract_from_source(source_file)
# Step 1: Extract Document with raw_markdown and metadata
document = self._extract_document(file_path)
# Step 2: Parse Markdown into structured sections
sections = parse_markdown(document.raw_markdown)
logger.debug(f"Parsed {len(sections)} sections from document")
# Step 3: Assemble rich Document model
# Step 3: Update Document with sections
document = document.model_copy(update={"sections": sections})
# Step 4: Validate document content
@ -100,7 +99,7 @@ class DocumentProcessorService(ITextProcessor):
# Step 5: Persist to repository
saved_document = self._repository.save(document)
# Step 6: Finalize - mark as processed
# Step 6: Mark as processed
saved_document.mark_as_processed()
self._repository.save(saved_document)
@ -128,17 +127,17 @@ class DocumentProcessorService(ITextProcessor):
Extract text from document and split into chunks.
Pipeline:
1. Extract raw Markdown from SourceFile
1. Extract Document with raw_markdown and metadata
2. Parse into structured sections
3. Apply chunking strategy to raw content
4. Return chunks
3. Update Document with sections
4. Apply chunking strategy with section awareness
Args:
file_path: Path to the document file
chunking_strategy: Strategy configuration for chunking
Returns:
List of text chunks
List of chunks with section metadata
Raises:
ExtractionError: If text extraction fails
@ -147,15 +146,16 @@ class DocumentProcessorService(ITextProcessor):
try:
logger.info(f"Extracting and chunking: {file_path}")
# Extract from source
source_file = self._create_source_file(file_path)
document = self._extract_from_source(source_file)
# Extract Document
document = self._extract_document(file_path)
# Parse sections
sections = parse_markdown(document.raw_markdown)
# Update Document with sections
document = document.model_copy(update={"sections": sections})
# Chunk using strategy
# Chunk using strategy (section-aware)
chunks = self._chunk_document(document, chunking_strategy)
logger.info(f"Created {len(chunks)} chunks from document")
@ -223,43 +223,24 @@ class DocumentProcessorService(ITextProcessor):
return self._repository.delete(document_id)
def _create_source_file(self, file_path: Path) -> SourceFile:
def _extract_document(self, file_path: Path) -> Document:
"""
Create a SourceFile model from a file path.
Extract Document using appropriate extractor.
Extractors create Document entities with raw_markdown and metadata.
Sections will be parsed later in the pipeline.
Args:
file_path: Path to the source file
file_path: Path to document file
Returns:
SourceFile entity
Raises:
ValueError: If file doesn't exist or is invalid
"""
if not file_path.exists():
raise ValueError(f"File does not exist: {file_path}")
return SourceFile(
path=file_path,
extension=file_path.suffix.lstrip('.'),
size_bytes=file_path.stat().st_size,
)
def _extract_from_source(self, source_file: SourceFile) -> Document:
"""
Extract raw Markdown from SourceFile using appropriate extractor.
Args:
source_file: Source file to extract from
Returns:
Document entity with raw_markdown populated
Document entity with raw_markdown and metadata (sections empty)
Raises:
ExtractionError: If extraction fails
"""
extractor = self._extractor_factory.create_extractor(source_file.path)
return extractor.extract(source_file.path)
extractor = self._extractor_factory.create_extractor(file_path)
return extractor.extract(file_path)
def _chunk_document(
self,
@ -267,20 +248,20 @@ class DocumentProcessorService(ITextProcessor):
strategy: ChunkingStrategy,
) -> List[Chunk]:
"""
Chunk document using specified strategy.
Chunk document using specified strategy with section awareness.
This method is thread-safe as it delegates to a stateless
chunking context that selects the strategy based on configuration.
chunking context. The full Document (with sections) is passed
to enable section-aware chunking.
Args:
document: Document to chunk
document: Full Document entity with sections
strategy: Chunking strategy configuration
Returns:
List of chunks
List of chunks with section metadata
"""
return self._chunking_context.execute_chunking(
text=document.content,
document_id=document.id,
document=document,
strategy=strategy,
)