diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/requirements.txt b/requirements.txt index 3782aed..8645bb6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,10 +8,22 @@ marko==2.1.2 # Web Framework fastapi==0.115.6 uvicorn[standard]==0.34.0 - -# Utilities python-multipart==0.0.20 +# Document Processing - Extractors +PyPDF2==3.0.1 # PDF extraction +python-docx==1.1.2 # DOCX extraction + +# Cloud Storage +boto3==1.35.94 # AWS S3 integration +botocore==1.35.94 # AWS SDK core (installed with boto3) + +# Environment Variables +python-dotenv==1.0.1 # Load .env files + +# HTTP Client (for testing) +requests==2.32.3 # API testing scripts + # Development Dependencies (optional) pytest==8.3.4 pytest-asyncio==0.24.0 @@ -19,3 +31,7 @@ httpx==0.28.1 black==24.10.0 ruff==0.8.5 mypy==1.14.0 + +# Type Stubs for Development +types-requests==2.32.0.20241016 +boto3-stubs[s3]==1.35.94 diff --git a/src/adapters/incoming/api_routes.py b/src/adapters/incoming/api_routes.py index 7d815c3..8391315 100644 --- a/src/adapters/incoming/api_routes.py +++ b/src/adapters/incoming/api_routes.py @@ -20,6 +20,7 @@ from typing import Iterator, List, Optional from fastapi import APIRouter, Depends, FastAPI, File, Form, HTTPException, UploadFile, status from fastapi.responses import JSONResponse +from ...core.config import get_settings from ...core.domain.exceptions import ( ChunkingError, DomainException, @@ -45,6 +46,9 @@ logger = logging.getLogger(__name__) # Application Setup # ============================================================================= +# Load settings +settings = get_settings() + app = FastAPI( title="Text Processor API", description="Text extraction and chunking system using Hexagonal Architecture", @@ -194,6 +198,7 @@ def to_document_response(document: Document) -> DocumentResponse: ), is_processed=document.is_processed, content_preview=document.get_content_preview(200), + download_url=document.download_url, ) diff --git a/src/adapters/incoming/api_schemas.py b/src/adapters/incoming/api_schemas.py index 2be5914..616fb59 100644 --- a/src/adapters/incoming/api_schemas.py +++ b/src/adapters/incoming/api_schemas.py @@ -88,6 +88,10 @@ class DocumentResponse(BaseModel): ..., description="Preview of content (first 200 chars)", ) + download_url: Optional[str] = Field( + None, + description="Presigned URL for downloading the markdown file (expires in 1 hour)", + ) class ChunkResponse(BaseModel): diff --git a/src/adapters/outgoing/extractors/markdown_extractor.py b/src/adapters/outgoing/extractors/markdown_extractor.py new file mode 100644 index 0000000..1c12c9a --- /dev/null +++ b/src/adapters/outgoing/extractors/markdown_extractor.py @@ -0,0 +1,186 @@ +""" +Markdown Extractor - Concrete implementation for Markdown file extraction. + +This adapter implements the IExtractor port for .md files. +""" +import logging +from pathlib import Path +from typing import List + +from ....core.domain.exceptions import ( + EmptyContentError, + ExtractionError, +) +from ....core.domain.models import Document, DocumentMetadata, SourceType +from ....core.ports.outgoing.extractor import IExtractor + + +logger = logging.getLogger(__name__) + + +class MarkdownExtractor(IExtractor): + """ + Concrete Markdown extractor for .md files. + + This adapter: + 1. Reads .md files directly + 2. Handles multiple encodings + 3. Returns Document with raw markdown content + """ + + def __init__(self) -> None: + """Initialize Markdown extractor.""" + self._supported_extensions = ['md', 'markdown'] + self._encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252'] + logger.debug("MarkdownExtractor initialized") + + def extract(self, file_path: Path) -> Document: + """ + Extract text content from Markdown file. + + Args: + file_path: Path to the .md file + + Returns: + Document entity with raw markdown and metadata + + Raises: + ExtractionError: If extraction fails + EmptyContentError: If file is empty + """ + try: + logger.info(f"Extracting markdown file: {file_path}") + + # Validate file + self._validate_file(file_path) + + # Read markdown content + markdown_text = self._read_file(file_path) + + # Validate content + if not markdown_text or not markdown_text.strip(): + raise EmptyContentError(file_path=str(file_path)) + + # Create metadata + metadata = self._create_metadata(file_path) + + # Build document with raw_markdown + document = Document(raw_markdown=markdown_text, metadata=metadata) + + logger.info( + f"Successfully extracted {len(markdown_text)} characters from {file_path.name}" + ) + return document + + except EmptyContentError: + raise + except ExtractionError: + raise + except Exception as e: + logger.error(f"Markdown extraction failed for {file_path}: {str(e)}") + raise ExtractionError( + message=f"Failed to extract markdown from {file_path.name}", + details=str(e), + file_path=str(file_path), + ) + + def supports_file_type(self, file_extension: str) -> bool: + """ + Check if this extractor supports Markdown files. + + Args: + file_extension: File extension (e.g., 'md', 'markdown') + + Returns: + True if Markdown files are supported + """ + return file_extension.lower() in self._supported_extensions + + def get_supported_types(self) -> List[str]: + """ + Get list of supported file extensions. + + Returns: + List containing 'md' and 'markdown' + """ + return self._supported_extensions.copy() + + def _validate_file(self, file_path: Path) -> None: + """ + Validate file exists and is readable. + + Args: + file_path: Path to validate + + Raises: + ExtractionError: If file is invalid + """ + if not file_path.exists(): + raise ExtractionError( + message=f"File not found: {file_path}", + file_path=str(file_path), + ) + + if not file_path.is_file(): + raise ExtractionError( + message=f"Path is not a file: {file_path}", + file_path=str(file_path), + ) + + if file_path.stat().st_size == 0: + raise EmptyContentError(file_path=str(file_path)) + + def _read_file(self, file_path: Path) -> str: + """ + Read file content with encoding detection. + + Args: + file_path: Path to markdown file + + Returns: + File content as string + + Raises: + ExtractionError: If reading fails + """ + # Try multiple encodings + for encoding in self._encodings: + try: + with open(file_path, 'r', encoding=encoding) as f: + text = f.read() + logger.debug(f"Successfully read {file_path.name} with {encoding}") + return text + except UnicodeDecodeError: + continue + except Exception as e: + logger.error(f"Error reading {file_path.name}: {str(e)}") + raise ExtractionError( + message=f"Failed to read file: {file_path.name}", + details=str(e), + file_path=str(file_path), + ) + + # If all encodings fail + raise ExtractionError( + message=f"Failed to decode {file_path.name} with any supported encoding", + file_path=str(file_path), + ) + + def _create_metadata(self, file_path: Path) -> DocumentMetadata: + """ + Create document metadata from markdown file. + + Args: + file_path: Path to the markdown file + + Returns: + DocumentMetadata entity + """ + stat = file_path.stat() + + return DocumentMetadata( + source_id=str(file_path.absolute()), + source_type=SourceType.FILE, + display_name=file_path.name, + size_bytes=stat.st_size, + ) diff --git a/src/adapters/outgoing/storage/__init__.py b/src/adapters/outgoing/storage/__init__.py new file mode 100644 index 0000000..db1336b --- /dev/null +++ b/src/adapters/outgoing/storage/__init__.py @@ -0,0 +1 @@ +"""Outgoing storage adapters.""" diff --git a/src/adapters/outgoing/storage/s3_storage_adapter.py b/src/adapters/outgoing/storage/s3_storage_adapter.py new file mode 100644 index 0000000..0f1db3b --- /dev/null +++ b/src/adapters/outgoing/storage/s3_storage_adapter.py @@ -0,0 +1,216 @@ +""" +S3 Storage Adapter - Concrete implementation using AWS S3. + +This adapter implements the IFileStorage port using boto3 for AWS S3 operations. +""" +import logging +from typing import Optional + +import boto3 +from botocore.exceptions import ClientError + +from ....core.config import Settings +from ....core.domain.exceptions import ProcessingError +from ....core.ports.outgoing.file_storage import IFileStorage + + +logger = logging.getLogger(__name__) + + +class S3StorageAdapter(IFileStorage): + """ + Concrete S3 storage adapter implementation. + + This adapter: + 1. Uploads content to AWS S3 + 2. Generates presigned URLs for secure downloads + 3. Handles S3 operations with proper error handling + """ + + def __init__(self, settings: Settings) -> None: + """ + Initialize S3 storage adapter. + + Args: + settings: Application settings with S3 configuration + + Raises: + ValueError: If S3 bucket is not configured + """ + if not settings.S3_BUCKET: + raise ValueError("S3_BUCKET must be configured in settings") + + self.bucket_name = settings.S3_BUCKET + self.region = settings.S3_REGION + self.expiration_seconds = settings.S3_PRESIGNED_URL_EXPIRATION + self.upload_path_prefix = settings.S3_UPLOAD_PATH_PREFIX + + # Build boto3 client config + client_kwargs = {"region_name": self.region} + + # Add custom endpoint (for MinIO) + if settings.S3_ENDPOINT_URL: + client_kwargs["endpoint_url"] = settings.S3_ENDPOINT_URL + logger.debug(f"Using custom S3 endpoint: {settings.S3_ENDPOINT_URL}") + + # Add credentials if provided (otherwise boto3 uses default credential chain) + if settings.S3_ACCESS_KEY and settings.S3_SECRET_KEY: + client_kwargs["aws_access_key_id"] = settings.S3_ACCESS_KEY + client_kwargs["aws_secret_access_key"] = settings.S3_SECRET_KEY + logger.debug("Using explicit S3 credentials from settings") + else: + logger.debug("Using default AWS credential chain (IAM role, AWS CLI, etc.)") + + # Initialize S3 client + self.s3_client = boto3.client("s3", **client_kwargs) + + logger.info( + f"S3StorageAdapter initialized: bucket={self.bucket_name}, " + f"region={self.region}, expiration={self.expiration_seconds}s" + ) + + def upload_content(self, content: str, destination_path: str) -> str: + """ + Upload text content to S3 and return presigned download URL. + + Args: + content: Text content to upload (markdown) + destination_path: S3 object key (e.g., "extractions/doc123.md") + + Returns: + Presigned download URL valid for 1 hour + + Raises: + ProcessingError: If upload fails + """ + try: + logger.info(f"Uploading content to S3: {destination_path}") + + # Upload content to S3 + self.s3_client.put_object( + Bucket=self.bucket_name, + Key=destination_path, + Body=content.encode("utf-8"), + ContentType="text/markdown", + ContentDisposition=f'attachment; filename="{self._get_filename(destination_path)}"', + ) + + logger.info(f"Successfully uploaded to S3: s3://{self.bucket_name}/{destination_path}") + + # Generate presigned URL + presigned_url = self._generate_presigned_url(destination_path) + + logger.info(f"Generated presigned URL (expires in {self.expiration_seconds}s)") + return presigned_url + + except ClientError as e: + error_code = e.response.get("Error", {}).get("Code", "Unknown") + logger.error(f"S3 upload failed: {error_code} - {str(e)}") + raise ProcessingError( + message=f"Failed to upload to S3: {error_code}", + details=str(e), + ) + except Exception as e: + logger.error(f"Unexpected error during S3 upload: {str(e)}") + raise ProcessingError( + message="Failed to upload to S3", + details=str(e), + ) + + def delete_file(self, file_path: str) -> bool: + """ + Delete a file from S3. + + Args: + file_path: S3 object key to delete + + Returns: + True if deleted successfully, False otherwise + """ + try: + logger.info(f"Deleting file from S3: {file_path}") + + self.s3_client.delete_object( + Bucket=self.bucket_name, + Key=file_path, + ) + + logger.info(f"Successfully deleted from S3: {file_path}") + return True + + except ClientError as e: + logger.error(f"S3 delete failed: {str(e)}") + return False + except Exception as e: + logger.error(f"Unexpected error during S3 delete: {str(e)}") + return False + + def file_exists(self, file_path: str) -> bool: + """ + Check if a file exists in S3. + + Args: + file_path: S3 object key to check + + Returns: + True if file exists, False otherwise + """ + try: + self.s3_client.head_object( + Bucket=self.bucket_name, + Key=file_path, + ) + return True + + except ClientError as e: + if e.response["Error"]["Code"] == "404": + return False + logger.error(f"Error checking S3 file existence: {str(e)}") + return False + except Exception as e: + logger.error(f"Unexpected error checking S3 file: {str(e)}") + return False + + def _generate_presigned_url(self, object_key: str) -> str: + """ + Generate presigned URL for S3 object. + + Args: + object_key: S3 object key + + Returns: + Presigned download URL + + Raises: + ProcessingError: If URL generation fails + """ + try: + presigned_url = self.s3_client.generate_presigned_url( + "get_object", + Params={ + "Bucket": self.bucket_name, + "Key": object_key, + }, + ExpiresIn=self.expiration_seconds, + ) + + return presigned_url + + except ClientError as e: + logger.error(f"Failed to generate presigned URL: {str(e)}") + raise ProcessingError( + message="Failed to generate download URL", + details=str(e), + ) + + def _get_filename(self, path: str) -> str: + """ + Extract filename from path. + + Args: + path: Full path (e.g., "extractions/doc123.md") + + Returns: + Filename only (e.g., "doc123.md") + """ + return path.split("/")[-1] if "/" in path else path diff --git a/src/bootstrap.py b/src/bootstrap.py index 6187a55..b87c886 100644 --- a/src/bootstrap.py +++ b/src/bootstrap.py @@ -7,6 +7,7 @@ The Core never imports Adapters - only the Bootstrap does. The ApplicationContainer manages ONLY: - Core Services - Outgoing Adapters (Extractors, Chunkers, Repository) +- Configuration (Settings) """ import logging @@ -22,6 +23,8 @@ from .adapters.outgoing.extractors.zip_extractor import ZipExtractor from .adapters.outgoing.persistence.in_memory_repository import ( InMemoryDocumentRepository, ) +from .adapters.outgoing.storage.s3_storage_adapter import S3StorageAdapter +from .core.config import Settings, get_settings from .core.ports.incoming.text_processor import ITextProcessor from .core.services.document_processor_service import DocumentProcessorService from .shared.logging_config import setup_logging @@ -39,32 +42,47 @@ class ApplicationContainer: Dependency Injection Container for Core and Outgoing Adapters. This container manages the lifecycle and dependencies of: + - Configuration (Settings) - Core Domain Services - - Outgoing Adapters (Extractors, Chunkers, Repository) - + - Outgoing Adapters (Extractors, Chunkers, Repository, Storage) """ - def __init__(self, log_level: str = "INFO") -> None: + def __init__(self, settings: Settings | None = None) -> None: """ Initialize the application container. Args: - log_level: Logging level for the application + settings: Application settings (uses singleton if not provided) """ - # Setup logging first - setup_logging(level=log_level) + # Load settings (singleton) + self._settings = settings or get_settings() + + # Setup logging + setup_logging(level=self._settings.LOG_LEVEL) logger.info("Initializing ApplicationContainer") + logger.debug(f"Configuration: bucket={self._settings.S3_BUCKET}, region={self._settings.S3_REGION}") # Create Outgoing Adapters self._repository = self._create_repository() self._extractor_factory = self._create_extractor_factory() self._chunking_context = self._create_chunking_context() + self._file_storage = self._create_file_storage() # Create Core Service (depends only on Ports) self._text_processor_service = self._create_text_processor_service() logger.info("ApplicationContainer initialized successfully") + @property + def settings(self) -> Settings: + """ + Get application settings. + + Returns: + Settings: Application configuration + """ + return self._settings + @property def text_processor_service(self) -> ITextProcessor: """ @@ -132,6 +150,16 @@ class ApplicationContainer: return context + def _create_file_storage(self) -> S3StorageAdapter: + """ + Create and configure the file storage adapter. + + Returns: + Configured S3 storage adapter + """ + logger.debug("Creating S3StorageAdapter") + return S3StorageAdapter(settings=self._settings) + def _create_text_processor_service(self) -> DocumentProcessorService: """ Create the core text processor service. @@ -146,6 +174,8 @@ class ApplicationContainer: extractor_factory=self._extractor_factory, chunking_context=self._chunking_context, repository=self._repository, + file_storage=self._file_storage, + settings=self._settings, ) @@ -167,12 +197,12 @@ def get_processor_service() -> ITextProcessor: if _container is None: logger.info("Lazy initializing ApplicationContainer (first access)") - _container = ApplicationContainer(log_level="INFO") + _container = ApplicationContainer() return _container.text_processor_service -def create_application(log_level: str = "INFO") -> ApplicationContainer: +def create_application(settings: Settings | None = None) -> ApplicationContainer: """ Factory function to create a fully wired application container. @@ -180,14 +210,14 @@ def create_application(log_level: str = "INFO") -> ApplicationContainer: For API routes, use get_processor_service() instead. Args: - log_level: Logging level for the application + settings: Application settings (uses singleton if not provided) Returns: Configured application container Example: - >>> container = create_application(log_level="DEBUG") + >>> container = create_application() >>> service = container.text_processor_service """ logger.info("Creating application container via factory") - return ApplicationContainer(log_level=log_level) + return ApplicationContainer(settings=settings) diff --git a/src/core/config.py b/src/core/config.py new file mode 100644 index 0000000..a1dc03c --- /dev/null +++ b/src/core/config.py @@ -0,0 +1,41 @@ +"""S3 Configuration.""" +from typing import Optional + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """S3/MinIO settings loaded from environment variables or .env file.""" + + S3_BUCKET: str = "bi-chatbot" + S3_REGION: str = "us-east-1" + S3_ACCESS_KEY: Optional[str] = "bi-chatbot" + S3_SECRET_KEY: Optional[str] = "9ixloSaqtYTkfmrJzE" + S3_ENDPOINT_URL: Optional[str] = "https://cdn.d.aiengines.ir" + S3_PRESIGNED_URL_EXPIRATION: int = 3600 + S3_UPLOAD_PATH_PREFIX: str = "extractions" + LOG_LEVEL: str = "INFO" + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=True, + extra="ignore", + ) + + def get_s3_path(self, document_id: str) -> str: + """Generate S3 path for a document.""" + return f"{self.S3_UPLOAD_PATH_PREFIX}/{document_id}.md" + + +_settings_instance: Optional[Settings] = None + + +def get_settings() -> Settings: + """Get or create singleton settings instance.""" + global _settings_instance + + if _settings_instance is None: + _settings_instance = Settings() + + return _settings_instance diff --git a/src/core/domain/models.py b/src/core/domain/models.py index 56f17db..10d016b 100644 --- a/src/core/domain/models.py +++ b/src/core/domain/models.py @@ -231,6 +231,8 @@ class Document(BaseModel): sections: Parsed structured sections from Markdown metadata: Associated metadata is_processed: Flag indicating if document has been processed + + download_url: Optional presigned URL for downloading the markdown file """ id: UUID = Field(default_factory=uuid4, description="Unique document ID") raw_markdown: str = Field(..., description="Raw Markdown content") @@ -240,6 +242,7 @@ class Document(BaseModel): ) metadata: DocumentMetadata = Field(..., description="Document metadata") is_processed: bool = Field(default=False, description="Processing status") + download_url: Optional[str] = Field(None, description="Presigned download URL") model_config = { "frozen": False, # Allow mutation for processing status diff --git a/src/core/ports/outgoing/file_storage.py b/src/core/ports/outgoing/file_storage.py new file mode 100644 index 0000000..acd2104 --- /dev/null +++ b/src/core/ports/outgoing/file_storage.py @@ -0,0 +1,59 @@ +""" +Outgoing Port - File Storage Interface. + +This port defines the contract for storing files and generating download URLs. +Implementations can use S3, Azure Blob, GCS, or local filesystem. +""" +from abc import ABC, abstractmethod + + +class IFileStorage(ABC): + """ + Port interface for file storage operations. + + This abstraction allows the core domain to store files without + depending on specific storage implementations (S3, Azure, etc.). + """ + + @abstractmethod + def upload_content(self, content: str, destination_path: str) -> str: + """ + Upload text content to storage and return a download URL. + + Args: + content: Text content to upload (e.g., markdown) + destination_path: Destination path in storage (e.g., "extractions/doc123.md") + + Returns: + Secure download URL (presigned URL for cloud storage) + + Raises: + StorageError: If upload fails + """ + pass + + @abstractmethod + def delete_file(self, file_path: str) -> bool: + """ + Delete a file from storage. + + Args: + file_path: Path to file in storage + + Returns: + True if deleted successfully, False otherwise + """ + pass + + @abstractmethod + def file_exists(self, file_path: str) -> bool: + """ + Check if a file exists in storage. + + Args: + file_path: Path to file in storage + + Returns: + True if file exists, False otherwise + """ + pass diff --git a/src/core/services/document_processor_service.py b/src/core/services/document_processor_service.py index d281043..36c2610 100644 --- a/src/core/services/document_processor_service.py +++ b/src/core/services/document_processor_service.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import List from uuid import UUID +from ..config import Settings from ..domain import logic_utils from ..domain.exceptions import ( DocumentNotFoundError, @@ -20,6 +21,7 @@ from ..domain.models import Chunk, ChunkingStrategy, Document, SourceFile from ..ports.incoming.text_processor import ITextProcessor from ..ports.outgoing.chunking_context import IChunkingContext from ..ports.outgoing.extractor_factory import IExtractorFactory +from ..ports.outgoing.file_storage import IFileStorage from ..ports.outgoing.repository import IDocumentRepository @@ -39,6 +41,8 @@ class DocumentProcessorService(ITextProcessor): extractor_factory: IExtractorFactory, chunking_context: IChunkingContext, repository: IDocumentRepository, + file_storage: IFileStorage, + settings: Settings, ) -> None: """ Initialize the document processor service. @@ -47,10 +51,14 @@ class DocumentProcessorService(ITextProcessor): extractor_factory: Factory for creating appropriate extractors chunking_context: Context for managing chunking strategies repository: Repository for document persistence + file_storage: File storage for uploading extracted content + settings: Application settings for configuration """ self._extractor_factory = extractor_factory self._chunking_context = chunking_context self._repository = repository + self._file_storage = file_storage + self._settings = settings logger.info("DocumentProcessorService initialized") def extract_and_chunk( @@ -102,28 +110,45 @@ class DocumentProcessorService(ITextProcessor): def extract_document(self, file_path: Path) -> Document: """ - Extract text content from document without parsing or chunking. + Extract text content from document and upload to S3. - This method only performs extraction: + This method: 1. Extracts raw text content from file - 2. Creates Document entity with metadata - 3. Returns Document with raw_markdown (no sections) + 2. Uploads markdown to S3 + 3. Generates presigned download URL + 4. Returns Document with raw_markdown and download_url Args: file_path: Path to the document file Returns: - Document entity with raw markdown + Document entity with raw markdown and download URL Raises: ExtractionError: If text extraction fails UnsupportedFileTypeError: If file type is not supported + ProcessingError: If S3 upload fails """ try: logger.info(f"Extracting document: {file_path}") + + # Extract document document = self._extract_document(file_path) logger.info(f"Successfully extracted {len(document.raw_markdown)} characters") + + # Upload to S3 and get download URL + destination_path = self._settings.get_s3_path(str(document.id)) + download_url = self._file_storage.upload_content( + content=document.raw_markdown, + destination_path=destination_path, + ) + + # Update document with download URL + document = document.model_copy(update={"download_url": download_url}) + logger.info(f"Uploaded to S3 and generated download URL") + return document + except Exception as e: logger.error(f"Failed to extract document: {str(e)}") raise