From 2c375ce6bdba0b3480410403577382ca1f8a5b91 Mon Sep 17 00:00:00 2001 From: "m.dabbagh" Date: Thu, 8 Jan 2026 04:57:35 +0330 Subject: [PATCH] make the domain general and open to add crawling system --- .../outgoing/extractors/docx_extractor.py | 11 +- .../outgoing/extractors/pdf_extractor.py | 11 +- .../outgoing/extractors/txt_extractor.py | 11 +- src/core/domain/models.py | 133 ++++++++++++++---- src/core/ports/outgoing/chunker.py | 19 ++- src/core/ports/outgoing/chunking_context.py | 15 +- src/core/ports/outgoing/extractor.py | 11 +- .../services/document_processor_service.py | 91 +++++------- 8 files changed, 183 insertions(+), 119 deletions(-) diff --git a/src/adapters/outgoing/extractors/docx_extractor.py b/src/adapters/outgoing/extractors/docx_extractor.py index d763fee..fe36772 100644 --- a/src/adapters/outgoing/extractors/docx_extractor.py +++ b/src/adapters/outgoing/extractors/docx_extractor.py @@ -12,7 +12,7 @@ from ....core.domain.exceptions import ( EmptyContentError, ExtractionError, ) -from ....core.domain.models import Document, DocumentMetadata +from ....core.domain.models import Document, DocumentMetadata, SourceType from ....core.ports.outgoing.extractor import IExtractor @@ -209,7 +209,7 @@ class DocxExtractor(IExtractor): def _create_metadata(self, file_path: Path) -> DocumentMetadata: """ - Create document metadata from file. + Create source-neutral document metadata from file. Args: file_path: Path to the file @@ -220,7 +220,8 @@ class DocxExtractor(IExtractor): stat = file_path.stat() return DocumentMetadata( - file_name=file_path.name, - file_type=file_path.suffix.lstrip('.').lower(), - file_size_bytes=stat.st_size, + source_id=str(file_path.absolute()), + source_type=SourceType.FILE, + display_name=file_path.name, + size_bytes=stat.st_size, ) diff --git a/src/adapters/outgoing/extractors/pdf_extractor.py b/src/adapters/outgoing/extractors/pdf_extractor.py index f48cccf..8ee0090 100644 --- a/src/adapters/outgoing/extractors/pdf_extractor.py +++ b/src/adapters/outgoing/extractors/pdf_extractor.py @@ -12,7 +12,7 @@ from ....core.domain.exceptions import ( EmptyContentError, ExtractionError, ) -from ....core.domain.models import Document, DocumentMetadata +from ....core.domain.models import Document, DocumentMetadata, SourceType from ....core.ports.outgoing.extractor import IExtractor @@ -200,7 +200,7 @@ class PDFExtractor(IExtractor): def _create_metadata(self, file_path: Path) -> DocumentMetadata: """ - Create document metadata from file. + Create source-neutral document metadata from file. Args: file_path: Path to the file @@ -211,7 +211,8 @@ class PDFExtractor(IExtractor): stat = file_path.stat() return DocumentMetadata( - file_name=file_path.name, - file_type=file_path.suffix.lstrip('.').lower(), - file_size_bytes=stat.st_size, + source_id=str(file_path.absolute()), + source_type=SourceType.FILE, + display_name=file_path.name, + size_bytes=stat.st_size, ) diff --git a/src/adapters/outgoing/extractors/txt_extractor.py b/src/adapters/outgoing/extractors/txt_extractor.py index 09b91f9..be7de3d 100644 --- a/src/adapters/outgoing/extractors/txt_extractor.py +++ b/src/adapters/outgoing/extractors/txt_extractor.py @@ -12,7 +12,7 @@ from ....core.domain.exceptions import ( EmptyContentError, ExtractionError, ) -from ....core.domain.models import Document, DocumentMetadata +from ....core.domain.models import Document, DocumentMetadata, SourceType from ....core.ports.outgoing.extractor import IExtractor @@ -187,7 +187,7 @@ class TxtExtractor(IExtractor): def _create_metadata(self, file_path: Path) -> DocumentMetadata: """ - Create document metadata from file. + Create source-neutral document metadata from file. Args: file_path: Path to the file @@ -198,7 +198,8 @@ class TxtExtractor(IExtractor): stat = file_path.stat() return DocumentMetadata( - file_name=file_path.name, - file_type=file_path.suffix.lstrip('.').lower(), - file_size_bytes=stat.st_size, + source_id=str(file_path.absolute()), + source_type=SourceType.FILE, + display_name=file_path.name, + size_bytes=stat.st_size, ) diff --git a/src/core/domain/models.py b/src/core/domain/models.py index 1d3d09d..8464214 100644 --- a/src/core/domain/models.py +++ b/src/core/domain/models.py @@ -5,6 +5,7 @@ This module contains the domain entities that represent the core business concep All models are immutable by default and include comprehensive validation. """ from datetime import datetime +from enum import Enum from pathlib import Path from typing import Dict, List, Optional from uuid import UUID, uuid4 @@ -12,6 +13,12 @@ from uuid import UUID, uuid4 from pydantic import BaseModel, Field, field_validator, model_validator +class SourceType(str, Enum): + """Enumeration of supported source types.""" + FILE = "file" + WEB = "web" + + class SourceFile(BaseModel): """ Represents the raw input file before processing. @@ -58,6 +65,48 @@ class SourceFile(BaseModel): return self.path.stem +class WebPageSource(BaseModel): + """ + Represents a web page source for document extraction. + + This model encapsulates URL information about the document source. + Flow: WebPageSource -> Extraction -> Document + + Attributes: + url: URL of the web page + display_name: Human-readable name (e.g., 'about_us.html') + content_length: Optional content length in bytes + """ + url: str = Field(..., min_length=1, description="Web page URL") + display_name: str = Field(..., min_length=1, description="Display name") + content_length: Optional[int] = Field(None, ge=0, description="Content length") + + model_config = { + "frozen": True, # WebPageSource is immutable + } + + @field_validator('url') + @classmethod + def validate_url(cls, value: str) -> str: + """Validate URL format.""" + value = value.strip() + if not (value.startswith('http://') or value.startswith('https://')): + raise ValueError(f"URL must start with http:// or https://: {value}") + return value + + @field_validator('display_name') + @classmethod + def normalize_display_name(cls, value: str) -> str: + """Normalize display name.""" + return value.strip() + + def get_domain(self) -> str: + """Extract domain from URL.""" + from urllib.parse import urlparse + parsed = urlparse(self.url) + return parsed.netloc + + class DocumentSection(BaseModel): """ Represents a structured section of a Markdown document. @@ -95,30 +144,36 @@ class DocumentSection(BaseModel): class DocumentMetadata(BaseModel): """ - Metadata associated with a document. + Source-neutral metadata for documents. + + This metadata works for both file-based and web-based sources, + enabling a unified processing pipeline. Attributes: - file_name: Original filename of the document - file_type: Type/extension of the file (e.g., 'pdf', 'docx') - file_size_bytes: Size of the file in bytes - created_at: Timestamp when document was created + source_id: Path or URL identifying the source + source_type: Type of source (FILE or WEB) + display_name: Human-readable name (e.g., 'manual.pdf', 'about_us.html') + size_bytes: Size in bytes (file size or content length) + created_at: Timestamp when metadata was created author: Optional author information - page_count: Optional number of pages in document - custom_fields: Additional metadata fields + extra_metadata: Additional source-specific metadata """ - file_name: str = Field(..., min_length=1, description="Original filename") - file_type: str = Field(..., min_length=1, description="File extension") - file_size_bytes: int = Field(..., ge=0, description="File size in bytes") + source_id: str = Field(..., min_length=1, description="Path or URL") + source_type: SourceType = Field(..., description="Source type enum") + display_name: str = Field(..., min_length=1, description="Display name") + size_bytes: int = Field(..., ge=0, description="Size in bytes") created_at: datetime = Field(default_factory=datetime.utcnow) - author: Optional[str] = Field(None, description="Document author") - page_count: Optional[int] = Field(None, ge=1, description="Number of pages") - custom_fields: Dict[str, str] = Field(default_factory=dict) + author: Optional[str] = Field(None, description="Author information") + extra_metadata: Dict[str, str] = Field( + default_factory=dict, + description="Additional metadata" + ) - @field_validator('file_type') + @field_validator('display_name') @classmethod - def validate_file_type(cls, value: str) -> str: - """Ensure file type is lowercase and stripped.""" - return value.lower().strip() + def normalize_display_name(cls, value: str) -> str: + """Normalize display name.""" + return value.strip() def get_summary(self) -> str: """ @@ -128,28 +183,33 @@ class DocumentMetadata(BaseModel): Formatted string containing key metadata information """ summary_parts = [ - f"File: {self.file_name}", - f"Type: {self.file_type}", - f"Size: {self._format_file_size()}", + f"Source: {self.display_name}", + f"Type: {self.source_type.value}", + f"Size: {self._format_size()}", ] if self.author: summary_parts.append(f"Author: {self.author}") - if self.page_count: - summary_parts.append(f"Pages: {self.page_count}") - return " | ".join(summary_parts) - def _format_file_size(self) -> str: - """Format file size in human-readable format.""" - size = self.file_size_bytes + def _format_size(self) -> str: + """Format size in human-readable format.""" + size = self.size_bytes for unit in ['B', 'KB', 'MB', 'GB']: if size < 1024.0: return f"{size:.2f} {unit}" size /= 1024.0 return f"{size:.2f} TB" + def is_file_source(self) -> bool: + """Check if this is a file-based source.""" + return self.source_type == SourceType.FILE + + def is_web_source(self) -> bool: + """Check if this is a web-based source.""" + return self.source_type == SourceType.WEB + class Document(BaseModel): """ @@ -281,6 +341,8 @@ class Chunk(BaseModel): """ Represents a chunk of text extracted from a document. + Enhanced to track section membership for precision chunking. + Attributes: id: Unique identifier for the chunk document_id: ID of the parent document @@ -288,6 +350,8 @@ class Chunk(BaseModel): sequence_number: Order of this chunk in the document start_char: Starting character position in original document end_char: Ending character position in original document + section_title: Title of the section this chunk belongs to + section_index: Index of the section in document.sections metadata: Optional metadata specific to this chunk """ id: UUID = Field(default_factory=uuid4, description="Unique chunk ID") @@ -296,6 +360,8 @@ class Chunk(BaseModel): sequence_number: int = Field(..., ge=0, description="Chunk order in document") start_char: int = Field(..., ge=0, description="Start position in document") end_char: int = Field(..., gt=0, description="End position in document") + section_title: Optional[str] = Field(None, description="Section title") + section_index: Optional[int] = Field(None, ge=0, description="Section index") metadata: Dict[str, str] = Field(default_factory=dict) model_config = { @@ -342,6 +408,21 @@ class Chunk(BaseModel): search_text = text if case_sensitive else text.lower() return search_text in content + def belongs_to_section(self) -> bool: + """Check if this chunk belongs to a specific section.""" + return self.section_title is not None and self.section_index is not None + + def get_section_context(self) -> str: + """ + Get a string describing the section context. + + Returns: + Section context description or 'No section' + """ + if self.belongs_to_section(): + return f"Section {self.section_index}: {self.section_title}" + return "No section" + class ChunkingStrategy(BaseModel): """ diff --git a/src/core/ports/outgoing/chunker.py b/src/core/ports/outgoing/chunker.py index bac4098..521d4a9 100644 --- a/src/core/ports/outgoing/chunker.py +++ b/src/core/ports/outgoing/chunker.py @@ -1,14 +1,13 @@ """ Outgoing Port - Text Chunker Interface. -This defines the contract for chunking text into smaller pieces. +This defines the contract for chunking documents into smaller pieces. Different strategies can be implemented as adapters. """ from abc import ABC, abstractmethod from typing import List -from uuid import UUID -from ...domain.models import Chunk, ChunkingStrategy +from ...domain.models import Chunk, ChunkingStrategy, Document class IChunker(ABC): @@ -16,26 +15,26 @@ class IChunker(ABC): Interface for text chunking strategies. Implementations of this interface provide different strategies - for splitting text into manageable chunks. + for splitting documents into manageable chunks with section awareness. """ @abstractmethod def chunk( self, - text: str, - document_id: UUID, + document: Document, strategy: ChunkingStrategy, ) -> List[Chunk]: """ - Split text into chunks according to a strategy. + Split document into chunks according to a strategy. + + Chunkers can utilize document.sections for section-aware chunking. Args: - text: Text content to chunk - document_id: ID of the parent document + document: Full Document entity with raw_markdown and sections strategy: Chunking strategy configuration Returns: - List of Chunk entities + List of Chunk entities with section metadata Raises: ChunkingError: If chunking fails diff --git a/src/core/ports/outgoing/chunking_context.py b/src/core/ports/outgoing/chunking_context.py index 6cf8358..25dfc9d 100644 --- a/src/core/ports/outgoing/chunking_context.py +++ b/src/core/ports/outgoing/chunking_context.py @@ -5,9 +5,8 @@ This defines the contract for managing chunking strategies. """ from abc import ABC, abstractmethod from typing import List -from uuid import UUID -from ...domain.models import Chunk, ChunkingStrategy +from ...domain.models import Chunk, ChunkingStrategy, Document from .chunker import IChunker @@ -22,23 +21,21 @@ class IChunkingContext(ABC): @abstractmethod def execute_chunking( self, - text: str, - document_id: UUID, + document: Document, strategy: ChunkingStrategy, ) -> List[Chunk]: """ Execute chunking using the specified strategy. - This method is stateless and thread-safe. It selects the appropriate - chunker based on the strategy configuration and executes chunking. + This method is stateless and thread-safe. It accepts the full + Document object (with sections) to enable section-aware chunking. Args: - text: Text to chunk - document_id: ID of parent document + document: Full Document entity with raw_markdown and sections strategy: Chunking strategy configuration (includes strategy_name) Returns: - List of chunks + List of chunks with section metadata Raises: ChunkingError: If strategy is not registered or chunking fails diff --git a/src/core/ports/outgoing/extractor.py b/src/core/ports/outgoing/extractor.py index f81b8f8..e524a11 100644 --- a/src/core/ports/outgoing/extractor.py +++ b/src/core/ports/outgoing/extractor.py @@ -1,8 +1,8 @@ """ Outgoing Port - Text Extractor Interface. -This defines the contract for extracting text from documents. -Different adapters can implement this for various file types. +This defines the contract for extracting content from documents. +Different adapters can implement this for various file types and sources. """ from abc import ABC, abstractmethod from pathlib import Path @@ -16,7 +16,7 @@ class IExtractor(ABC): Interface for text extraction from documents. Implementations of this interface handle specific file formats - (PDF, DOCX, TXT, etc.) and adapt external libraries to the domain. + (PDF, DOCX, TXT, etc.) or web sources and return Document entities. """ @abstractmethod @@ -24,11 +24,14 @@ class IExtractor(ABC): """ Extract text and metadata from a document file. + Extractors create Document entities with raw_markdown and metadata. + Sections are parsed later in the pipeline. + Args: file_path: Path to the document file Returns: - Document entity with extracted content and metadata + Document entity with raw_markdown and metadata populated Raises: ExtractionError: If extraction fails diff --git a/src/core/services/document_processor_service.py b/src/core/services/document_processor_service.py index 08c7813..1c60860 100644 --- a/src/core/services/document_processor_service.py +++ b/src/core/services/document_processor_service.py @@ -59,21 +59,21 @@ class DocumentProcessorService(ITextProcessor): chunking_strategy: ChunkingStrategy, ) -> Document: """ - Process a document by extracting, parsing, and storing it. + Process a document using the stateless pipeline. - New Pragmatic Pipeline: - 1. Extract: Get raw Markdown from SourceFile using extractor - 2. Parse: Use parse_markdown to create structured sections - 3. Assemble: Create rich Document with raw_markdown + sections - 4. Persist: Save to repository - 5. Finalize: Mark as processed + Pipeline Order: + 1. Extract Document with raw_markdown and metadata (via Adapter) + 2. Parse Markdown into DocumentSection objects + 3. Update Document with sections + 4. Validate and persist Document + 5. Mark as processed Args: file_path: Path to the document file chunking_strategy: Strategy configuration (for metadata) Returns: - Processed Document entity with structured sections + Fully processed Document entity Raises: ExtractionError: If text extraction fails @@ -83,15 +83,14 @@ class DocumentProcessorService(ITextProcessor): try: logger.info(f"Processing document: {file_path}") - # Step 1: Extract raw Markdown from SourceFile - source_file = self._create_source_file(file_path) - document = self._extract_from_source(source_file) + # Step 1: Extract Document with raw_markdown and metadata + document = self._extract_document(file_path) # Step 2: Parse Markdown into structured sections sections = parse_markdown(document.raw_markdown) logger.debug(f"Parsed {len(sections)} sections from document") - # Step 3: Assemble rich Document model + # Step 3: Update Document with sections document = document.model_copy(update={"sections": sections}) # Step 4: Validate document content @@ -100,7 +99,7 @@ class DocumentProcessorService(ITextProcessor): # Step 5: Persist to repository saved_document = self._repository.save(document) - # Step 6: Finalize - mark as processed + # Step 6: Mark as processed saved_document.mark_as_processed() self._repository.save(saved_document) @@ -128,17 +127,17 @@ class DocumentProcessorService(ITextProcessor): Extract text from document and split into chunks. Pipeline: - 1. Extract raw Markdown from SourceFile + 1. Extract Document with raw_markdown and metadata 2. Parse into structured sections - 3. Apply chunking strategy to raw content - 4. Return chunks + 3. Update Document with sections + 4. Apply chunking strategy with section awareness Args: file_path: Path to the document file chunking_strategy: Strategy configuration for chunking Returns: - List of text chunks + List of chunks with section metadata Raises: ExtractionError: If text extraction fails @@ -147,15 +146,16 @@ class DocumentProcessorService(ITextProcessor): try: logger.info(f"Extracting and chunking: {file_path}") - # Extract from source - source_file = self._create_source_file(file_path) - document = self._extract_from_source(source_file) + # Extract Document + document = self._extract_document(file_path) # Parse sections sections = parse_markdown(document.raw_markdown) + + # Update Document with sections document = document.model_copy(update={"sections": sections}) - # Chunk using strategy + # Chunk using strategy (section-aware) chunks = self._chunk_document(document, chunking_strategy) logger.info(f"Created {len(chunks)} chunks from document") @@ -223,43 +223,24 @@ class DocumentProcessorService(ITextProcessor): return self._repository.delete(document_id) - def _create_source_file(self, file_path: Path) -> SourceFile: + def _extract_document(self, file_path: Path) -> Document: """ - Create a SourceFile model from a file path. + Extract Document using appropriate extractor. + + Extractors create Document entities with raw_markdown and metadata. + Sections will be parsed later in the pipeline. Args: - file_path: Path to the source file + file_path: Path to document file Returns: - SourceFile entity - - Raises: - ValueError: If file doesn't exist or is invalid - """ - if not file_path.exists(): - raise ValueError(f"File does not exist: {file_path}") - - return SourceFile( - path=file_path, - extension=file_path.suffix.lstrip('.'), - size_bytes=file_path.stat().st_size, - ) - - def _extract_from_source(self, source_file: SourceFile) -> Document: - """ - Extract raw Markdown from SourceFile using appropriate extractor. - - Args: - source_file: Source file to extract from - - Returns: - Document entity with raw_markdown populated + Document entity with raw_markdown and metadata (sections empty) Raises: ExtractionError: If extraction fails """ - extractor = self._extractor_factory.create_extractor(source_file.path) - return extractor.extract(source_file.path) + extractor = self._extractor_factory.create_extractor(file_path) + return extractor.extract(file_path) def _chunk_document( self, @@ -267,20 +248,20 @@ class DocumentProcessorService(ITextProcessor): strategy: ChunkingStrategy, ) -> List[Chunk]: """ - Chunk document using specified strategy. + Chunk document using specified strategy with section awareness. This method is thread-safe as it delegates to a stateless - chunking context that selects the strategy based on configuration. + chunking context. The full Document (with sections) is passed + to enable section-aware chunking. Args: - document: Document to chunk + document: Full Document entity with sections strategy: Chunking strategy configuration Returns: - List of chunks + List of chunks with section metadata """ return self._chunking_context.execute_chunking( - text=document.content, - document_id=document.id, + document=document, strategy=strategy, )