add SourceFile, DocumentSection models and markdown parser

This commit is contained in:
m.dabbagh 2026-01-08 03:46:35 +03:30
parent 10a619494b
commit 359026fa98
7 changed files with 345 additions and 57 deletions

View File

@ -2,6 +2,9 @@
pydantic==2.10.5
pydantic-settings==2.7.1
# Markdown Processing (tolerated domain dependency)
marko==2.1.2
# Web Framework
fastapi==0.115.6
uvicorn[standard]==0.34.0

View File

@ -64,8 +64,8 @@ class DocxExtractor(IExtractor):
# Create metadata
metadata = self._create_metadata(file_path)
# Build document
document = Document(content=text, metadata=metadata)
# Build document with raw_markdown
document = Document(raw_markdown=text, metadata=metadata)
logger.info(
f"Successfully extracted {len(text)} characters from {file_path.name}"

View File

@ -64,8 +64,8 @@ class PDFExtractor(IExtractor):
# Create metadata
metadata = self._create_metadata(file_path)
# Build document
document = Document(content=text, metadata=metadata)
# Build document with raw_markdown
document = Document(raw_markdown=text, metadata=metadata)
logger.info(
f"Successfully extracted {len(text)} characters from {file_path.name}"

View File

@ -65,8 +65,8 @@ class TxtExtractor(IExtractor):
# Create metadata
metadata = self._create_metadata(file_path)
# Build document
document = Document(content=text, metadata=metadata)
# Build document with raw_markdown
document = Document(raw_markdown=text, metadata=metadata)
logger.info(
f"Successfully extracted {len(text)} characters from {file_path.name}"

View File

@ -5,12 +5,94 @@ This module contains the domain entities that represent the core business concep
All models are immutable by default and include comprehensive validation.
"""
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from uuid import UUID, uuid4
from pydantic import BaseModel, Field, field_validator, model_validator
class SourceFile(BaseModel):
"""
Represents the raw input file before processing.
This model encapsulates file system information about the document source.
Flow: SourceFile -> Extraction -> Document
Attributes:
path: Absolute path to the source file
extension: File extension (e.g., 'md', 'pdf', 'docx')
size_bytes: Size of the file in bytes
"""
path: Path = Field(..., description="Absolute path to source file")
extension: str = Field(..., min_length=1, description="File extension")
size_bytes: int = Field(..., ge=0, description="File size in bytes")
model_config = {
"frozen": True, # SourceFile is immutable
}
@field_validator('extension')
@classmethod
def normalize_extension(cls, value: str) -> str:
"""Normalize extension to lowercase without leading dot."""
normalized = value.lower().strip()
return normalized.lstrip('.')
@field_validator('path')
@classmethod
def validate_path_exists(cls, value: Path) -> Path:
"""Validate that the path exists."""
if not value.exists():
raise ValueError(f"Source file does not exist: {value}")
if not value.is_file():
raise ValueError(f"Path is not a file: {value}")
return value
def get_file_name(self) -> str:
"""Get the filename without path."""
return self.path.name
def get_file_stem(self) -> str:
"""Get the filename without extension."""
return self.path.stem
class DocumentSection(BaseModel):
"""
Represents a structured section of a Markdown document.
Sections are created by parsing Markdown headers. Text before the first
header is grouped into an "Introduction" section.
Attributes:
title: Section title (from header or "Introduction")
level: Header level (1-6 for h1-h6, 0 for Introduction)
content: Section content with preserved Markdown formatting
"""
title: str = Field(..., min_length=1, description="Section title")
level: int = Field(..., ge=0, le=6, description="Header level (0=intro)")
content: str = Field(..., description="Section content with formatting")
model_config = {
"frozen": True, # Sections are immutable
}
@field_validator('title')
@classmethod
def normalize_title(cls, value: str) -> str:
"""Normalize title by stripping whitespace."""
return value.strip()
def is_introduction(self) -> bool:
"""Check if this is the introduction section."""
return self.level == 0 and self.title == "Introduction"
def get_word_count(self) -> int:
"""Get approximate word count of section content."""
return len(self.content.split())
class DocumentMetadata(BaseModel):
"""
Metadata associated with a document.
@ -71,16 +153,24 @@ class DocumentMetadata(BaseModel):
class Document(BaseModel):
"""
Core domain entity representing a document with extracted text.
Core domain entity representing a document with extracted and structured content.
This rich model contains both the raw Markdown and parsed sections,
enabling flexible querying and processing strategies.
Attributes:
id: Unique identifier for the document
content: Extracted text content from the document
raw_markdown: Raw Markdown text extracted from source
sections: Parsed structured sections from Markdown
metadata: Associated metadata
is_processed: Flag indicating if document has been processed
"""
id: UUID = Field(default_factory=uuid4, description="Unique document ID")
content: str = Field(..., description="Extracted text content")
raw_markdown: str = Field(..., description="Raw Markdown content")
sections: List[DocumentSection] = Field(
default_factory=list,
description="Structured document sections"
)
metadata: DocumentMetadata = Field(..., description="Document metadata")
is_processed: bool = Field(default=False, description="Processing status")
@ -89,7 +179,7 @@ class Document(BaseModel):
"str_strip_whitespace": True,
}
@field_validator('content')
@field_validator('raw_markdown')
@classmethod
def validate_content_not_empty(cls, value: str) -> str:
"""Ensure content is not empty or just whitespace."""
@ -97,6 +187,16 @@ class Document(BaseModel):
raise ValueError("Document content cannot be empty")
return value
@property
def content(self) -> str:
"""
Backward compatibility property for raw content access.
Returns:
Raw markdown content
"""
return self.raw_markdown
def validate_content(self) -> bool:
"""
Validate that the document content meets quality standards.
@ -108,14 +208,14 @@ class Document(BaseModel):
ValueError: If content fails validation checks
"""
# Check minimum length
if len(self.content.strip()) < 10:
if len(self.raw_markdown.strip()) < 10:
raise ValueError("Document content is too short (minimum 10 characters)")
# Check for suspicious patterns (e.g., too many special characters)
special_char_ratio = sum(
not c.isalnum() and not c.isspace()
for c in self.content
) / len(self.content)
for c in self.raw_markdown
) / len(self.raw_markdown)
if special_char_ratio > 0.5:
raise ValueError(
@ -147,9 +247,34 @@ class Document(BaseModel):
Returns:
Truncated content with ellipsis if needed
"""
if len(self.content) <= length:
return self.content
return f"{self.content[:length]}..."
if len(self.raw_markdown) <= length:
return self.raw_markdown
return f"{self.raw_markdown[:length]}..."
def get_section_count(self) -> int:
"""Get the number of sections in the document."""
return len(self.sections)
def get_sections_by_level(self, level: int) -> List[DocumentSection]:
"""
Get all sections at a specific header level.
Args:
level: Header level to filter by (0-6)
Returns:
List of sections at the specified level
"""
return [section for section in self.sections if section.level == level]
def get_section_titles(self) -> List[str]:
"""
Get all section titles in document order.
Returns:
List of section titles
"""
return [section.title for section in self.sections]
class Chunk(BaseModel):

138
src/core/domain/parsers.py Normal file
View File

@ -0,0 +1,138 @@
"""
Markdown Parsing Utilities - Domain Logic for Markdown Processing.
This module provides pragmatic Markdown parsing utilities using the marko library.
As a tolerated dependency, marko is acceptable within the domain layer for this
specific parsing task.
"""
from typing import List
import marko
from marko.block import BlockElement, Document as MarkoDocument, Heading
from marko.inline import InlineElement
from .models import DocumentSection
def parse_markdown(text: str) -> List[DocumentSection]:
"""
Parse Markdown text into structured DocumentSection objects.
This function walks the Markdown AST and groups content under headers.
Text before the first header is placed in an "Introduction" section.
Args:
text: Raw Markdown text to parse
Returns:
List of DocumentSection objects in document order
Example:
>>> markdown = "# Title\\n\\nContent here\\n## Section\\nMore content"
>>> sections = parse_markdown(markdown)
>>> len(sections)
2
>>> sections[0].title
'Title'
>>> sections[0].level
1
"""
if not text or not text.strip():
return []
# Parse the Markdown into an AST
doc: MarkoDocument = marko.parse(text)
sections: List[DocumentSection] = []
current_heading: str | None = None
current_level: int = 0
current_content_parts: List[str] = []
def finalize_section() -> None:
"""Helper to finalize and append the current section."""
if current_heading is not None or current_content_parts:
content = "".join(current_content_parts).strip()
if content: # Only add sections with actual content
title = current_heading if current_heading else "Introduction"
sections.append(
DocumentSection(
title=title,
level=current_level,
content=content,
)
)
# Walk through all children of the document
for child in doc.children:
if isinstance(child, Heading):
# Finalize previous section before starting new one
finalize_section()
# Start new section
current_heading = _extract_heading_text(child)
current_level = child.level
current_content_parts = []
else:
# Add content to current section
rendered = marko.render(child).strip()
if rendered:
current_content_parts.append(rendered + "\n\n")
# Finalize the last section
finalize_section()
return sections
def _extract_heading_text(heading: Heading) -> str:
"""
Extract plain text from a Heading node.
Args:
heading: Heading AST node
Returns:
Plain text content of the heading
"""
parts: List[str] = []
for child in heading.children:
if isinstance(child, InlineElement):
# Render the inline element to preserve formatting
rendered = marko.render(child).strip()
parts.append(rendered)
elif hasattr(child, 'children'):
# Recursively extract from nested elements
parts.append(_extract_text_recursive(child))
else:
# Raw text
parts.append(str(child))
return "".join(parts).strip()
def _extract_text_recursive(element) -> str:
"""
Recursively extract text from an AST element.
Args:
element: AST element to extract text from
Returns:
Concatenated text content
"""
parts: List[str] = []
if hasattr(element, 'children'):
for child in element.children:
if isinstance(child, (BlockElement, InlineElement)):
rendered = marko.render(child).strip()
parts.append(rendered)
elif hasattr(child, 'children'):
parts.append(_extract_text_recursive(child))
else:
parts.append(str(child))
else:
parts.append(str(element))
return "".join(parts).strip()

View File

@ -1,7 +1,7 @@
"""
Core Service - Document Processor Implementation.
This service orchestrates the workflow: Extract -> Clean -> Chunk -> Save.
This service orchestrates the workflow: Extract -> Parse -> Assemble -> Chunk -> Save.
It depends only on port interfaces, never on concrete implementations.
"""
import logging
@ -15,7 +15,8 @@ from ..domain.exceptions import (
ExtractionError,
ProcessingError,
)
from ..domain.models import Chunk, ChunkingStrategy, Document
from ..domain.parsers import parse_markdown
from ..domain.models import Chunk, ChunkingStrategy, Document, SourceFile
from ..ports.incoming.text_processor import ITextProcessor
from ..ports.outgoing.chunking_context import IChunkingContext
from ..ports.outgoing.extractor_factory import IExtractorFactory
@ -58,21 +59,21 @@ class DocumentProcessorService(ITextProcessor):
chunking_strategy: ChunkingStrategy,
) -> Document:
"""
Process a document by extracting, cleaning, and storing it.
Process a document by extracting, parsing, and storing it.
Workflow:
1. Extract text from file using appropriate extractor
2. Clean and normalize the text
3. Validate the document
4. Save to repository
5. Mark as processed
New Pragmatic Pipeline:
1. Extract: Get raw Markdown from SourceFile using extractor
2. Parse: Use parse_markdown to create structured sections
3. Assemble: Create rich Document with raw_markdown + sections
4. Persist: Save to repository
5. Finalize: Mark as processed
Args:
file_path: Path to the document file
chunking_strategy: Strategy configuration (for metadata)
Returns:
Processed Document entity
Processed Document entity with structured sections
Raises:
ExtractionError: If text extraction fails
@ -82,23 +83,31 @@ class DocumentProcessorService(ITextProcessor):
try:
logger.info(f"Processing document: {file_path}")
# Step 1: Extract text from document
document = self._extract_document(file_path)
# Step 1: Extract raw Markdown from SourceFile
source_file = self._create_source_file(file_path)
document = self._extract_from_source(source_file)
# Step 2: Clean and normalize text
document = self._clean_document(document)
# Step 2: Parse Markdown into structured sections
sections = parse_markdown(document.raw_markdown)
logger.debug(f"Parsed {len(sections)} sections from document")
# Step 3: Validate document content
# Step 3: Assemble rich Document model
document = document.model_copy(update={"sections": sections})
# Step 4: Validate document content
document.validate_content()
# Step 4: Save to repository
# Step 5: Persist to repository
saved_document = self._repository.save(document)
# Step 5: Mark as processed
# Step 6: Finalize - mark as processed
saved_document.mark_as_processed()
self._repository.save(saved_document)
logger.info(f"Document processed successfully: {saved_document.id}")
logger.info(
f"Document processed successfully: {saved_document.id} "
f"({len(sections)} sections)"
)
return saved_document
except ExtractionError:
@ -118,10 +127,10 @@ class DocumentProcessorService(ITextProcessor):
"""
Extract text from document and split into chunks.
Workflow:
1. Extract text from file
2. Clean and normalize text
3. Apply chunking strategy
Pipeline:
1. Extract raw Markdown from SourceFile
2. Parse into structured sections
3. Apply chunking strategy to raw content
4. Return chunks
Args:
@ -138,9 +147,13 @@ class DocumentProcessorService(ITextProcessor):
try:
logger.info(f"Extracting and chunking: {file_path}")
# Extract and clean
document = self._extract_document(file_path)
document = self._clean_document(document)
# Extract from source
source_file = self._create_source_file(file_path)
document = self._extract_from_source(source_file)
# Parse sections
sections = parse_markdown(document.raw_markdown)
document = document.model_copy(update={"sections": sections})
# Chunk using strategy
chunks = self._chunk_document(document, chunking_strategy)
@ -210,34 +223,43 @@ class DocumentProcessorService(ITextProcessor):
return self._repository.delete(document_id)
def _extract_document(self, file_path: Path) -> Document:
def _create_source_file(self, file_path: Path) -> SourceFile:
"""
Extract document using appropriate extractor.
Create a SourceFile model from a file path.
Args:
file_path: Path to document file
file_path: Path to the source file
Returns:
Extracted Document entity
"""
extractor = self._extractor_factory.create_extractor(file_path)
return extractor.extract(file_path)
SourceFile entity
def _clean_document(self, document: Document) -> Document:
Raises:
ValueError: If file doesn't exist or is invalid
"""
Clean and normalize document text.
if not file_path.exists():
raise ValueError(f"File does not exist: {file_path}")
return SourceFile(
path=file_path,
extension=file_path.suffix.lstrip('.'),
size_bytes=file_path.stat().st_size,
)
def _extract_from_source(self, source_file: SourceFile) -> Document:
"""
Extract raw Markdown from SourceFile using appropriate extractor.
Args:
document: Document to clean
source_file: Source file to extract from
Returns:
Document with cleaned content
"""
cleaned_content = logic_utils.clean_text(document.content)
Document entity with raw_markdown populated
# Create new document with cleaned content
# Note: Pydantic models are immutable by default, so we use model_copy
return document.model_copy(update={"content": cleaned_content})
Raises:
ExtractionError: If extraction fails
"""
extractor = self._extractor_factory.create_extractor(source_file.path)
return extractor.extract(source_file.path)
def _chunk_document(
self,