Compare commits

..

8 Commits

Author SHA1 Message Date
m.dabbagh
80dd901e42 fix: remove file extension from DocumentMetadata.display_name 2026-01-25 11:33:50 +03:30
m.dabbagh
9e1e49bc59 add document title and section title to the beginning of each chunk in paragraph chunker 2026-01-25 11:32:35 +03:30
m.dabbagh
cda128e438 one paragraph per chunk in paragraph chunking method 2026-01-25 11:03:54 +03:30
m.dabbagh
8ecbd88498 make DocumentSection.title optional 2026-01-24 20:25:34 +03:30
m.dabbagh
3aad734140 comment out swagger authentication 2026-01-24 17:06:25 +03:30
m.dabbagh
c6302bc792 add api-key header and swagger authentication 2026-01-24 17:05:29 +03:30
m.dabbagh
2ccb38179d use docling in extractors 2026-01-24 13:43:07 +03:30
m.dabbagh
ad163eb665 change api defaults 2026-01-20 23:36:02 +03:30
16 changed files with 364 additions and 333 deletions

View File

@ -11,8 +11,7 @@ uvicorn[standard]==0.34.0
python-multipart==0.0.20
# Document Processing - Extractors
PyPDF2==3.0.1 # PDF extraction
python-docx==1.1.2 # DOCX extraction
docling # Unified document extraction (PDF, DOCX, Excel)
# Cloud Storage
boto3==1.35.94 # AWS S3 integration

View File

@ -18,7 +18,11 @@ from pathlib import Path
from typing import Iterator, List, Optional
from fastapi import APIRouter, Depends, FastAPI, File, Form, HTTPException, UploadFile, status
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasicCredentials
from .auth import check_docs_credentials, validate_api_key
from ...core.config import get_settings
from ...core.domain.exceptions import (
@ -41,11 +45,6 @@ from .api_schemas import (
logger = logging.getLogger(__name__)
# =============================================================================
# Application Setup
# =============================================================================
# Load settings
settings = get_settings()
@ -53,12 +52,19 @@ app = FastAPI(
title="Text Processor API",
description="Text extraction and chunking system using Hexagonal Architecture",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
# docs_url=None,
# redoc_url=None,
)
router = APIRouter(prefix="/api/v1", tags=["Text Processing"])
router = APIRouter(
prefix="/api/v1",
tags=["Text Processing"],
dependencies=[Depends(validate_api_key)]
)
public_router = APIRouter(
tags=["System"],
)
# =============================================================================
# Global Exception Handler
@ -101,7 +107,7 @@ def get_service() -> ITextProcessor:
def get_chunking_strategy(
strategy_name: ChunkingMethod = Form(..., description="Chunking method"),
chunk_size: int = Form(..., description="Target chunk size in characters", ge=1, le=10000),
chunk_size: int = Form(512, description="Target chunk size in characters", ge=1, le=10000),
overlap_size: int = Form(0, description="Overlap between chunks", ge=0),
respect_boundaries: bool = Form(True, description="Respect text boundaries"),
) -> ChunkingStrategy:
@ -210,8 +216,6 @@ def to_chunk_responses(chunks: List[Chunk]) -> List[ChunkResponse]:
document_id=str(chunk.document_id),
content=chunk.content,
sequence_number=chunk.sequence_number,
start_char=chunk.start_char,
end_char=chunk.end_char,
length=chunk.get_length(),
)
for chunk in chunks
@ -231,8 +235,8 @@ def to_chunk_responses(chunks: List[Chunk]) -> List[ChunkResponse]:
)
async def perform_chunking(
file: Optional[UploadFile] = File(None, description="Markdown file (.md) to upload"),
text: Optional[str] = Form(None, description="Markdown text to process", json_schema_extra={"x-textarea": True}),
title: str = Form("markdown_input", description="Optional title for the document"),
text: Optional[str] = Form('', description="Markdown text to process"),
title: Optional[str] = Form('', description="Optional title for the document"),
strategy: ChunkingStrategy = Depends(get_chunking_strategy),
service: ITextProcessor = Depends(get_service),
) -> ChunkListResponse:
@ -339,7 +343,7 @@ async def process_file(
)
@router.get(
@public_router.get(
"/health",
response_model=HealthCheckResponse,
status_code=status.HTTP_200_OK,
@ -356,21 +360,29 @@ async def health_check() -> HealthCheckResponse:
)
# =============================================================================
# Protected Documentation Routes
# =============================================================================
# @app.get("/docs", include_in_schema=False)
# def api_docs(_: HTTPBasicCredentials = Depends(check_docs_credentials)):
# return get_swagger_ui_html(
# openapi_url="/openapi.json",
# title="Protected Text-Processor API Docs"
# )
#
#
# @app.get("/redoc", include_in_schema=False)
# def api_docs(_: HTTPBasicCredentials = Depends(check_docs_credentials)):
# return get_redoc_html(
# openapi_url="/openapi.json",
# title="Protected Text-Processor API Docs"
# )
# =============================================================================
# Application Setup
# =============================================================================
# Include router in app
# Include routers in app
app.include_router(router)
@app.get("/")
async def root():
"""Root endpoint with API information."""
return {
"name": "Text Processor API",
"version": "1.0.0",
"description": "Text extraction and chunking system using Hexagonal Architecture",
"docs_url": "/docs",
"api_prefix": "/api/v1",
}
app.include_router(public_router)

View File

@ -101,8 +101,6 @@ class ChunkResponse(BaseModel):
document_id: str
content: str
sequence_number: int
start_char: int
end_char: int
length: int

View File

@ -0,0 +1,34 @@
import secrets
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader, HTTPBasic, HTTPBasicCredentials
from ...core.config import get_settings
settings = get_settings()
# This allows Swagger UI to detect the "Authorize" button
api_key_header = APIKeyHeader(name=settings.API_KEY_NAME, auto_error=False)
http_basic = HTTPBasic()
async def validate_api_key(api_key: str = Security(api_key_header)):
"""
Validates the X-API-Key header.
Using secrets.compare_digest protects against timing attacks.
"""
if not api_key or not secrets.compare_digest(api_key, settings.API_KEY):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials. Invalid or missing API Key.",
)
return api_key
security = HTTPBasic()
def check_docs_credentials(credentials: HTTPBasicCredentials = Depends(security)):
is_correct_user = secrets.compare_digest(credentials.username, settings.DOCS_USERNAME)
is_correct_password = secrets.compare_digest(credentials.password, settings.DOCS_PASSWORD)
if not (is_correct_user and is_correct_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Basic"},
)

View File

@ -70,8 +70,8 @@ class FixedSizeChunker(IChunker):
chunks = self._chunk_by_sections(document, strategy)
else:
# Standard chunking: process entire raw_markdown
segments = self._split_into_segments(document.raw_markdown, strategy)
chunks = self._create_chunks(segments, document.id)
chunk_texts = self._split_into_segments(document.raw_markdown, strategy)
chunks = self._create_chunks(chunk_texts, document.id)
logger.info(f"Created {len(chunks)} fixed-size chunks")
return chunks
@ -136,7 +136,7 @@ class FixedSizeChunker(IChunker):
self,
text: str,
strategy: ChunkingStrategy,
) -> List[tuple[str, int, int]]:
) -> List[str]:
"""
Split text into fixed-size segments.
@ -145,7 +145,7 @@ class FixedSizeChunker(IChunker):
strategy: Chunking strategy configuration
Returns:
List of (chunk_text, start_position, end_position) tuples
List of chunk text strings
"""
segments = []
text_length = len(text)
@ -155,7 +155,7 @@ class FixedSizeChunker(IChunker):
position = 0
while position < text_length:
segment = self._extract_segment(
chunk_text = self._extract_segment(
text=text,
position=position,
chunk_size=chunk_size,
@ -163,10 +163,8 @@ class FixedSizeChunker(IChunker):
respect_boundaries=strategy.respect_boundaries,
)
if segment:
chunk_text, start_pos, end_pos = segment
if chunk_text.strip():
segments.append((chunk_text, start_pos, end_pos))
if chunk_text and chunk_text.strip():
segments.append(chunk_text)
position += step_size
@ -183,7 +181,7 @@ class FixedSizeChunker(IChunker):
chunk_size: int,
text_length: int,
respect_boundaries: bool,
) -> tuple[str, int, int] | None:
) -> str:
"""
Extract a single segment from text.
@ -195,16 +193,15 @@ class FixedSizeChunker(IChunker):
respect_boundaries: Whether to respect boundaries
Returns:
Tuple of (chunk_text, start_pos, end_pos) or None
Chunk text string
"""
end_pos = min(position + chunk_size, text_length)
chunk_text = text[position:end_pos]
if respect_boundaries and end_pos < text_length:
chunk_text = self._adjust_to_boundary(text, position, end_pos)
end_pos = position + len(chunk_text)
return (chunk_text, position, end_pos)
return chunk_text
def _adjust_to_boundary(
self,
@ -258,17 +255,15 @@ class FixedSizeChunker(IChunker):
global_sequence = 0
for section_index, section in enumerate(document.sections):
# Split this section's content into segments
segments = self._split_into_segments(section.content, strategy)
# Split this section's content into chunks
chunk_texts = self._split_into_segments(section.content, strategy)
# Create chunks for this section
for text, start_char, end_char in segments:
for text in chunk_texts:
chunk = Chunk(
document_id=document.id,
content=text,
sequence_number=global_sequence,
start_char=start_char,
end_char=end_char,
section_title=section.title,
section_index=section_index,
)
@ -282,16 +277,16 @@ class FixedSizeChunker(IChunker):
def _create_chunks(
self,
segments: List[tuple[str, int, int]],
chunk_texts: List[str],
document_id,
section_title: Optional[str] = None,
section_index: Optional[int] = None,
) -> List[Chunk]:
"""
Create Chunk entities from text segments.
Create Chunk entities from text strings.
Args:
segments: List of (text, start_pos, end_pos) tuples
chunk_texts: List of chunk text strings
document_id: ID of parent document
section_title: Optional section title
section_index: Optional section index
@ -301,13 +296,11 @@ class FixedSizeChunker(IChunker):
"""
chunks = []
for sequence_number, (text, start_char, end_char) in enumerate(segments):
for sequence_number, text in enumerate(chunk_texts):
chunk = Chunk(
document_id=document_id,
content=text,
sequence_number=sequence_number,
start_char=start_char,
end_char=end_char,
section_title=section_title,
section_index=section_index,
)

View File

@ -70,8 +70,8 @@ class ParagraphChunker(IChunker):
chunks = self._chunk_by_sections(document, strategy)
else:
# Standard chunking: process entire raw_markdown
segments = self._split_and_group_paragraphs(document.raw_markdown, strategy)
chunks = self._create_chunks(segments, document.id)
chunk_texts = self._split_and_group_paragraphs(document.raw_markdown, strategy)
chunks = self._create_chunks(chunk_texts, document.id)
logger.info(f"Created {len(chunks)} paragraph-based chunks")
return chunks
@ -136,7 +136,7 @@ class ParagraphChunker(IChunker):
self,
text: str,
strategy: ChunkingStrategy,
) -> List[tuple[str, int, int]]:
) -> List[str]:
"""
Split text into paragraphs and group them into chunks.
@ -145,14 +145,14 @@ class ParagraphChunker(IChunker):
strategy: Chunking strategy configuration
Returns:
List of (chunk_text, start_position, end_position) tuples
List of chunk text strings
"""
# Split into paragraphs
paragraphs = logic_utils.split_into_paragraphs(text)
if not paragraphs:
# No paragraphs found, return whole text as single chunk
return [(text, 0, len(text))]
return [text]
# Group paragraphs into chunks
return self._group_paragraphs(paragraphs, strategy)
@ -161,7 +161,7 @@ class ParagraphChunker(IChunker):
self,
paragraphs: List[str],
strategy: ChunkingStrategy,
) -> List[tuple[str, int, int]]:
) -> List[str]:
"""
Group paragraphs into chunks based on target size.
@ -170,12 +170,11 @@ class ParagraphChunker(IChunker):
strategy: Chunking strategy
Returns:
List of (chunk_text, start_pos, end_pos) tuples
List of chunk text strings
"""
segments = []
current_paragraphs = []
current_size = 0
current_start = 0
for paragraph in paragraphs:
para_size = len(paragraph)
@ -185,13 +184,11 @@ class ParagraphChunker(IChunker):
current_size, para_size, strategy.chunk_size, current_paragraphs
):
# Create chunk from accumulated paragraphs
segment = self._create_segment(
current_paragraphs, current_start
)
segment = self._create_segment(current_paragraphs)
segments.append(segment)
# Handle overlap
current_paragraphs, current_start, current_size = (
current_paragraphs, current_size = (
self._handle_overlap(
segment, paragraph, para_size, strategy.overlap_size
)
@ -203,7 +200,7 @@ class ParagraphChunker(IChunker):
# Add final chunk
if current_paragraphs:
segment = self._create_segment(current_paragraphs, current_start)
segment = self._create_segment(current_paragraphs)
segments.append(segment)
logger.debug(
@ -237,56 +234,49 @@ class ParagraphChunker(IChunker):
def _create_segment(
self,
paragraphs: List[str],
start_pos: int,
) -> tuple[str, int, int]:
) -> str:
"""
Create a segment from paragraphs.
Args:
paragraphs: List of paragraph strings
start_pos: Starting position
Returns:
Tuple of (chunk_text, start_pos, end_pos)
Chunk text string
"""
chunk_text = "\n\n".join(paragraphs)
end_pos = start_pos + len(chunk_text)
return (chunk_text, start_pos, end_pos)
return "\n\n".join(paragraphs)
def _handle_overlap(
self,
previous_segment: tuple[str, int, int],
previous_segment: str,
new_paragraph: str,
new_para_size: int,
overlap_size: int,
) -> tuple[List[str], int, int]:
) -> tuple[List[str], int]:
"""
Handle overlap between chunks.
Args:
previous_segment: Previous chunk segment
previous_segment: Previous chunk text
new_paragraph: New paragraph to start with
new_para_size: Size of new paragraph
overlap_size: Desired overlap size
Returns:
Tuple of (new_paragraphs, new_start, new_size)
Tuple of (new_paragraphs, new_size)
"""
if overlap_size > 0:
prev_text, _, prev_end = previous_segment
overlap_text = logic_utils.calculate_overlap_text(
text=prev_text,
text=previous_segment,
overlap_size=overlap_size,
from_start=False,
)
return (
[overlap_text, new_paragraph],
prev_end - len(overlap_text),
len(overlap_text) + new_para_size,
)
else:
_, _, prev_end = previous_segment
return ([new_paragraph], prev_end, new_para_size)
return ([new_paragraph], new_para_size)
def _chunk_by_sections(
self,
@ -297,6 +287,7 @@ class ParagraphChunker(IChunker):
Chunk document by processing each section independently.
This prevents chunks from spanning across section boundaries.
Each chunk is prefixed with the document title and section title.
Args:
document: Document with sections
@ -308,18 +299,22 @@ class ParagraphChunker(IChunker):
all_chunks = []
global_sequence = 0
for section_index, section in enumerate(document.sections):
# Split this section's content into paragraph-based segments
segments = self._split_and_group_paragraphs(section.content, strategy)
# Get document title from metadata
document_title = document.metadata.display_name
for section_index, section in enumerate(document.sections):
# Split this section's content into paragraph-based chunks
chunk_texts = self._split_and_group_paragraphs(section.content, strategy)
# Create chunks for this section with title prefix
for text in chunk_texts:
# Prepend document title and section title to chunk content
prefixed_content = f"{document_title}\n{section.title}\n{text}"
# Create chunks for this section
for text, start_char, end_char in segments:
chunk = Chunk(
document_id=document.id,
content=text,
content=prefixed_content,
sequence_number=global_sequence,
start_char=start_char,
end_char=end_char,
section_title=section.title,
section_index=section_index,
)
@ -333,16 +328,16 @@ class ParagraphChunker(IChunker):
def _create_chunks(
self,
segments: List[tuple[str, int, int]],
chunk_texts: List[str],
document_id,
section_title: Optional[str] = None,
section_index: Optional[int] = None,
) -> List[Chunk]:
"""
Create Chunk entities from text segments.
Create Chunk entities from text strings.
Args:
segments: List of (text, start_pos, end_pos) tuples
chunk_texts: List of chunk text strings
document_id: ID of parent document
section_title: Optional section title
section_index: Optional section index
@ -352,13 +347,11 @@ class ParagraphChunker(IChunker):
"""
chunks = []
for sequence_number, (text, start_char, end_char) in enumerate(segments):
for sequence_number, text in enumerate(chunk_texts):
chunk = Chunk(
document_id=document_id,
content=text,
sequence_number=sequence_number,
start_char=start_char,
end_char=end_char,
section_title=section_title,
section_index=section_index,
)

View File

@ -1,13 +1,15 @@
"""
DOCX Extractor - Concrete implementation for Word document extraction.
This adapter implements the IExtractor port using python-docx library.
It maps python-docx exceptions to domain exceptions.
This adapter implements the IExtractor port using Docling library.
It maps Docling exceptions to domain exceptions.
"""
import logging
from pathlib import Path
from typing import List
from docling.document_converter import DocumentConverter
from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
@ -21,22 +23,23 @@ logger = logging.getLogger(__name__)
class DocxExtractor(IExtractor):
"""
Concrete DOCX extractor using python-docx.
Concrete DOCX extractor using Docling.
This adapter:
1. Extracts text from DOCX files using python-docx
2. Handles paragraphs and tables
3. Maps exceptions to domain exceptions
1. Extracts text from DOCX files using Docling's DocumentConverter
2. Converts DOCX to Markdown format
3. Extracts metadata from document
"""
def __init__(self) -> None:
"""Initialize DOCX extractor."""
"""Initialize DOCX extractor with Docling converter."""
self._supported_extensions = ['docx']
logger.debug("DocxExtractor initialized")
self._converter = DocumentConverter()
logger.debug("DocxExtractor initialized with Docling")
def extract(self, file_path: Path) -> Document:
"""
Extract text and metadata from DOCX file.
Extract text and metadata from DOCX file using Docling.
Args:
file_path: Path to the DOCX file
@ -54,21 +57,22 @@ class DocxExtractor(IExtractor):
# Validate file
self._validate_file(file_path)
# Extract text
text = self._extract_text_from_docx(file_path)
# Convert DOCX to markdown using Docling
result = self._converter.convert(str(file_path))
markdown_text = result.document.export_to_markdown()
# Validate content
if not text or not text.strip():
if not markdown_text or not markdown_text.strip():
raise EmptyContentError(file_path=str(file_path))
# Create metadata
metadata = self._create_metadata(file_path)
# Build document with raw_markdown
document = Document(raw_markdown=text, metadata=metadata)
document = Document(raw_markdown=markdown_text, metadata=metadata)
logger.info(
f"Successfully extracted {len(text)} characters from {file_path.name}"
f"Successfully extracted {len(markdown_text)} characters from {file_path.name}"
)
return document
@ -130,83 +134,6 @@ class DocxExtractor(IExtractor):
if file_path.stat().st_size == 0:
raise EmptyContentError(file_path=str(file_path))
def _extract_text_from_docx(self, file_path: Path) -> str:
"""
Extract text from DOCX using python-docx.
Args:
file_path: Path to DOCX file
Returns:
Extracted text content
Raises:
ExtractionError: If DOCX extraction fails
"""
try:
import docx
logger.debug(f"Reading DOCX: {file_path}")
document = docx.Document(file_path)
# Extract paragraphs
text_parts = self._extract_paragraphs(document)
# Extract tables
table_text = self._extract_tables(document)
if table_text:
text_parts.extend(table_text)
return "\n".join(text_parts)
except ImportError:
raise ExtractionError(
message="python-docx library not installed",
details="Install with: pip install python-docx",
file_path=str(file_path),
)
except Exception as e:
raise ExtractionError(
message=f"DOCX extraction failed: {str(e)}",
file_path=str(file_path),
)
def _extract_paragraphs(self, document) -> List[str]:
"""
Extract text from all paragraphs.
Args:
document: python-docx Document object
Returns:
List of paragraph texts
"""
paragraphs = []
for paragraph in document.paragraphs:
text = paragraph.text.strip()
if text:
paragraphs.append(text)
return paragraphs
def _extract_tables(self, document) -> List[str]:
"""
Extract text from all tables.
Args:
document: python-docx Document object
Returns:
List of table cell texts
"""
table_texts = []
for table in document.tables:
for row in table.rows:
for cell in row.cells:
text = cell.text.strip()
if text:
table_texts.append(text)
return table_texts
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create source-neutral document metadata from file.
@ -222,6 +149,6 @@ class DocxExtractor(IExtractor):
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
display_name=file_path.stem,
size_bytes=stat.st_size,
)

View File

@ -0,0 +1,154 @@
"""
Excel Extractor - Concrete implementation for Excel file extraction.
This adapter implements the IExtractor port using Docling library.
It maps Docling exceptions to domain exceptions.
"""
import logging
from pathlib import Path
from typing import List
from docling.document_converter import DocumentConverter
from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
)
from ....core.domain.models import Document, DocumentMetadata, SourceType
from ....core.ports.outgoing.extractor import IExtractor
logger = logging.getLogger(__name__)
class ExcelExtractor(IExtractor):
"""
Concrete Excel extractor using Docling.
This adapter:
1. Extracts text from Excel files (.xlsx, .xls) using Docling's DocumentConverter
2. Converts Excel to Markdown format
3. Extracts metadata from spreadsheet
"""
def __init__(self) -> None:
"""Initialize Excel extractor with Docling converter."""
self._supported_extensions = ['xlsx', 'xls']
self._converter = DocumentConverter()
logger.debug("ExcelExtractor initialized with Docling")
def extract(self, file_path: Path) -> Document:
"""
Extract text and metadata from Excel file using Docling.
Args:
file_path: Path to the Excel file
Returns:
Document entity with extracted content and metadata
Raises:
ExtractionError: If extraction fails
EmptyContentError: If no text could be extracted
"""
try:
logger.info(f"Extracting text from Excel: {file_path}")
# Validate file
self._validate_file(file_path)
# Convert Excel to markdown using Docling
result = self._converter.convert(str(file_path))
markdown_text = result.document.export_to_markdown()
# Validate content
if not markdown_text or not markdown_text.strip():
raise EmptyContentError(file_path=str(file_path))
# Create metadata
metadata = self._create_metadata(file_path)
# Build document with raw_markdown
document = Document(raw_markdown=markdown_text, metadata=metadata)
logger.info(
f"Successfully extracted {len(markdown_text)} characters from {file_path.name}"
)
return document
except EmptyContentError:
raise
except ExtractionError:
raise
except Exception as e:
logger.error(f"Excel extraction failed for {file_path}: {str(e)}")
raise ExtractionError(
message=f"Failed to extract text from {file_path.name}",
details=str(e),
file_path=str(file_path),
)
def supports_file_type(self, file_extension: str) -> bool:
"""
Check if this extractor supports Excel files.
Args:
file_extension: File extension (e.g., 'xlsx', 'xls')
Returns:
True if Excel files are supported
"""
return file_extension.lower() in self._supported_extensions
def get_supported_types(self) -> List[str]:
"""
Get list of supported file extensions.
Returns:
List containing 'xlsx' and 'xls'
"""
return self._supported_extensions.copy()
def _validate_file(self, file_path: Path) -> None:
"""
Validate file exists and is readable.
Args:
file_path: Path to validate
Raises:
ExtractionError: If file is invalid
"""
if not file_path.exists():
raise ExtractionError(
message=f"File not found: {file_path}",
file_path=str(file_path),
)
if not file_path.is_file():
raise ExtractionError(
message=f"Path is not a file: {file_path}",
file_path=str(file_path),
)
if file_path.stat().st_size == 0:
raise EmptyContentError(file_path=str(file_path))
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create document metadata from Excel file.
Args:
file_path: Path to the Excel file
Returns:
DocumentMetadata entity
"""
stat = file_path.stat()
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.stem,
size_bytes=stat.st_size,
)

View File

@ -181,6 +181,6 @@ class MarkdownExtractor(IExtractor):
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
display_name=file_path.stem,
size_bytes=stat.st_size,
)

View File

@ -1,13 +1,15 @@
"""
PDF Extractor - Concrete implementation for PDF text extraction.
This adapter implements the IExtractor port using PyPDF2 library.
It maps PyPDF2 exceptions to domain exceptions.
This adapter implements the IExtractor port using Docling library.
It maps Docling exceptions to domain exceptions.
"""
import logging
from pathlib import Path
from typing import List
from docling.document_converter import DocumentConverter
from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
@ -21,22 +23,23 @@ logger = logging.getLogger(__name__)
class PDFExtractor(IExtractor):
"""
Concrete PDF extractor using PyPDF2.
Concrete PDF extractor using Docling.
This adapter:
1. Extracts text from PDF files using PyPDF2
2. Maps PyPDF2 exceptions to domain exceptions
3. Creates Document entities with metadata
1. Extracts text from PDF files using Docling's DocumentConverter
2. Converts PDF to Markdown format
3. Extracts metadata including page count
"""
def __init__(self) -> None:
"""Initialize PDF extractor."""
"""Initialize PDF extractor with Docling converter."""
self._supported_extensions = ['pdf']
logger.debug("PDFExtractor initialized")
self._converter = DocumentConverter()
logger.debug("PDFExtractor initialized with Docling")
def extract(self, file_path: Path) -> Document:
"""
Extract text and metadata from PDF file.
Extract text and metadata from PDF file using Docling.
Args:
file_path: Path to the PDF file
@ -54,21 +57,22 @@ class PDFExtractor(IExtractor):
# Validate file
self._validate_file(file_path)
# Extract text
text = self._extract_text_from_pdf(file_path)
# Convert PDF to markdown using Docling
result = self._converter.convert(str(file_path))
markdown_text = result.document.export_to_markdown()
# Validate content
if not text or not text.strip():
if not markdown_text or not markdown_text.strip():
raise EmptyContentError(file_path=str(file_path))
# Create metadata
metadata = self._create_metadata(file_path)
# Create metadata with page count from Docling result
metadata = self._create_metadata(file_path, result)
# Build document with raw_markdown
document = Document(raw_markdown=text, metadata=metadata)
document = Document(raw_markdown=markdown_text, metadata=metadata)
logger.info(
f"Successfully extracted {len(text)} characters from {file_path.name}"
f"Successfully extracted {len(markdown_text)} characters from {file_path.name}"
)
return document
@ -130,89 +134,35 @@ class PDFExtractor(IExtractor):
if file_path.stat().st_size == 0:
raise EmptyContentError(file_path=str(file_path))
def _extract_text_from_pdf(self, file_path: Path) -> str:
def _create_metadata(self, file_path: Path, result) -> DocumentMetadata:
"""
Extract text from PDF using PyPDF2.
Create document metadata from PDF file and Docling result.
Args:
file_path: Path to PDF file
Returns:
Extracted text content
Raises:
ExtractionError: If PDF extraction fails
"""
try:
import PyPDF2
logger.debug(f"Reading PDF: {file_path}")
text_parts = []
with open(file_path, 'rb') as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
num_pages = len(pdf_reader.pages)
logger.debug(f"PDF has {num_pages} pages")
for page_num, page in enumerate(pdf_reader.pages, start=1):
page_text = self._extract_page_text(page, page_num)
if page_text:
text_parts.append(page_text)
return "\n\n".join(text_parts)
except ImportError:
raise ExtractionError(
message="PyPDF2 library not installed",
details="Install with: pip install PyPDF2",
file_path=str(file_path),
)
except Exception as e:
raise ExtractionError(
message=f"PDF extraction failed: {str(e)}",
file_path=str(file_path),
)
def _extract_page_text(self, page, page_num: int) -> str:
"""
Extract text from a single page.
Args:
page: PyPDF2 page object
page_num: Page number for logging
Returns:
Extracted page text
"""
try:
import PyPDF2
text = page.extract_text()
logger.debug(f"Extracted page {page_num}")
return text
except PyPDF2.errors.PdfReadError as e:
logger.warning(f"Failed to extract page {page_num}: {str(e)}")
return ""
except Exception as e:
logger.warning(f"Error on page {page_num}: {str(e)}")
return ""
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create source-neutral document metadata from file.
Args:
file_path: Path to the file
file_path: Path to the PDF file
result: Docling conversion result
Returns:
DocumentMetadata entity
"""
stat = file_path.stat()
# Extract page count from Docling result
page_count = None
try:
if hasattr(result.document, 'pages'):
page_count = len(result.document.pages)
except Exception as e:
logger.warning(f"Could not extract page count: {str(e)}")
extra_metadata = {}
if page_count is not None:
extra_metadata['page_count'] = str(page_count)
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
display_name=file_path.stem,
size_bytes=stat.st_size,
extra_metadata=extra_metadata,
)

View File

@ -200,6 +200,6 @@ class TxtExtractor(IExtractor):
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
display_name=file_path.stem,
size_bytes=stat.st_size,
)

View File

@ -227,7 +227,7 @@ class ZipExtractor(IExtractor):
continue
# Skip files with 'nohf' in their name
if 'nohf' in filename.lower():
if 'nohf' not in filename.lower():
logger.debug(f"Skipping 'nohf' file: {filename}")
continue
@ -312,6 +312,6 @@ class ZipExtractor(IExtractor):
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
display_name=file_path.stem,
size_bytes=stat.st_size,
)

View File

@ -15,6 +15,7 @@ from .adapters.outgoing.chunkers.context import ChunkingContext
from .adapters.outgoing.chunkers.fixed_size_chunker import FixedSizeChunker
from .adapters.outgoing.chunkers.paragraph_chunker import ParagraphChunker
from .adapters.outgoing.extractors.docx_extractor import DocxExtractor
from .adapters.outgoing.extractors.excel_extractor import ExcelExtractor
from .adapters.outgoing.extractors.factory import ExtractorFactory
from .adapters.outgoing.extractors.markdown_extractor import MarkdownExtractor
from .adapters.outgoing.extractors.pdf_extractor import PDFExtractor
@ -118,6 +119,7 @@ class ApplicationContainer:
# Register all extractors
factory.register_extractor(PDFExtractor())
factory.register_extractor(DocxExtractor())
factory.register_extractor(ExcelExtractor())
factory.register_extractor(TxtExtractor())
factory.register_extractor(MarkdownExtractor())
factory.register_extractor(ZipExtractor())

View File

@ -14,6 +14,13 @@ class Settings(BaseSettings):
S3_ENDPOINT_URL: Optional[str] = "https://cdn.d.aiengines.ir"
S3_PRESIGNED_URL_EXPIRATION: int = 3600
S3_UPLOAD_PATH_PREFIX: str = "extractions"
API_KEY: str = "some-secret-api-key"
API_KEY_NAME: str = "API-Key"
DOCS_USERNAME: str = "admin"
DOCS_PASSWORD: str = "admin"
LOG_LEVEL: str = "INFO"
model_config = SettingsConfigDict(

View File

@ -126,7 +126,7 @@ class DocumentSection(BaseModel):
level: Header level (1-6 for h1-h6, 0 for Introduction)
content: Section content with preserved Markdown formatting
"""
title: str = Field(..., min_length=1, description="Section title")
title: Optional[str] = Field(None, min_length=1, description="Section title")
level: int = Field(..., ge=0, le=6, description="Header level (0=intro)")
content: str = Field(..., description="Section content with formatting")
@ -138,7 +138,9 @@ class DocumentSection(BaseModel):
@classmethod
def normalize_title(cls, value: str) -> str:
"""Normalize title by stripping whitespace."""
return value.strip()
if value:
return value.strip()
return value
def is_introduction(self) -> bool:
"""Check if this is the introduction section."""
@ -358,8 +360,6 @@ class Chunk(BaseModel):
document_id: ID of the parent document
content: Text content of the chunk
sequence_number: Order of this chunk in the document
start_char: Starting character position in original document
end_char: Ending character position in original document
section_title: Title of the section this chunk belongs to
section_index: Index of the section in document.sections
metadata: Optional metadata specific to this chunk
@ -368,8 +368,6 @@ class Chunk(BaseModel):
document_id: UUID = Field(..., description="Parent document ID")
content: str = Field(..., min_length=1, description="Chunk text content")
sequence_number: int = Field(..., ge=0, description="Chunk order in document")
start_char: int = Field(..., ge=0, description="Start position in document")
end_char: int = Field(..., gt=0, description="End position in document")
section_title: Optional[str] = Field(None, description="Section title")
section_index: Optional[int] = Field(None, ge=0, description="Section index")
metadata: Dict[str, str] = Field(default_factory=dict)
@ -378,27 +376,6 @@ class Chunk(BaseModel):
"frozen": True, # Chunks are immutable
}
@model_validator(mode='after')
def validate_position_consistency(self) -> 'Chunk':
"""Ensure end position is after start position."""
if self.end_char <= self.start_char:
raise ValueError(
f"end_char ({self.end_char}) must be greater than "
f"start_char ({self.start_char})"
)
# Validate content length matches position range
content_length = len(self.content)
position_range = self.end_char - self.start_char
if abs(content_length - position_range) > 10: # Allow small variance
raise ValueError(
f"Content length ({content_length}) doesn't match "
f"position range ({position_range})"
)
return self
def get_length(self) -> int:
"""Get the length of the chunk content."""
return len(self.content)

View File

@ -50,40 +50,25 @@ def parse_markdown(text: str) -> List[DocumentSection]:
sections: List[DocumentSection] = []
current_heading: str | None = None
current_level: int = 0
current_content_parts: List[str] = []
def finalize_section() -> None:
"""Helper to finalize and append the current section."""
if current_heading is not None or current_content_parts:
content = "".join(current_content_parts).strip()
if content: # Only add sections with actual content
title = current_heading if current_heading else "Introduction"
sections.append(
DocumentSection(
title=title,
level=current_level,
content=content,
)
)
# Walk through all children of the document
for child in doc.children:
if isinstance(child, Heading):
# Finalize previous section before starting new one
finalize_section()
# Start new section
# Update current heading context
current_heading = _extract_heading_text(child)
current_level = child.level
current_content_parts = []
else:
# Render content back to markdown format instead of HTML
rendered = md_renderer.render(child).strip()
if rendered:
current_content_parts.append(rendered + "\n\n")
# Finalize the last section
finalize_section()
# Create a separate section for each paragraph/block
sections.append(
DocumentSection(
title=current_heading,
level=current_level,
content=rendered,
)
)
return sections