add s3 storage

This commit is contained in:
m.dabbagh 2026-01-20 12:46:47 +03:30
parent 0c09c79a2e
commit 91f8035043
12 changed files with 607 additions and 18 deletions

3
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@ -8,10 +8,22 @@ marko==2.1.2
# Web Framework
fastapi==0.115.6
uvicorn[standard]==0.34.0
# Utilities
python-multipart==0.0.20
# Document Processing - Extractors
PyPDF2==3.0.1 # PDF extraction
python-docx==1.1.2 # DOCX extraction
# Cloud Storage
boto3==1.35.94 # AWS S3 integration
botocore==1.35.94 # AWS SDK core (installed with boto3)
# Environment Variables
python-dotenv==1.0.1 # Load .env files
# HTTP Client (for testing)
requests==2.32.3 # API testing scripts
# Development Dependencies (optional)
pytest==8.3.4
pytest-asyncio==0.24.0
@ -19,3 +31,7 @@ httpx==0.28.1
black==24.10.0
ruff==0.8.5
mypy==1.14.0
# Type Stubs for Development
types-requests==2.32.0.20241016
boto3-stubs[s3]==1.35.94

View File

@ -20,6 +20,7 @@ from typing import Iterator, List, Optional
from fastapi import APIRouter, Depends, FastAPI, File, Form, HTTPException, UploadFile, status
from fastapi.responses import JSONResponse
from ...core.config import get_settings
from ...core.domain.exceptions import (
ChunkingError,
DomainException,
@ -45,6 +46,9 @@ logger = logging.getLogger(__name__)
# Application Setup
# =============================================================================
# Load settings
settings = get_settings()
app = FastAPI(
title="Text Processor API",
description="Text extraction and chunking system using Hexagonal Architecture",
@ -194,6 +198,7 @@ def to_document_response(document: Document) -> DocumentResponse:
),
is_processed=document.is_processed,
content_preview=document.get_content_preview(200),
download_url=document.download_url,
)

View File

@ -88,6 +88,10 @@ class DocumentResponse(BaseModel):
...,
description="Preview of content (first 200 chars)",
)
download_url: Optional[str] = Field(
None,
description="Presigned URL for downloading the markdown file (expires in 1 hour)",
)
class ChunkResponse(BaseModel):

View File

@ -0,0 +1,186 @@
"""
Markdown Extractor - Concrete implementation for Markdown file extraction.
This adapter implements the IExtractor port for .md files.
"""
import logging
from pathlib import Path
from typing import List
from ....core.domain.exceptions import (
EmptyContentError,
ExtractionError,
)
from ....core.domain.models import Document, DocumentMetadata, SourceType
from ....core.ports.outgoing.extractor import IExtractor
logger = logging.getLogger(__name__)
class MarkdownExtractor(IExtractor):
"""
Concrete Markdown extractor for .md files.
This adapter:
1. Reads .md files directly
2. Handles multiple encodings
3. Returns Document with raw markdown content
"""
def __init__(self) -> None:
"""Initialize Markdown extractor."""
self._supported_extensions = ['md', 'markdown']
self._encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252']
logger.debug("MarkdownExtractor initialized")
def extract(self, file_path: Path) -> Document:
"""
Extract text content from Markdown file.
Args:
file_path: Path to the .md file
Returns:
Document entity with raw markdown and metadata
Raises:
ExtractionError: If extraction fails
EmptyContentError: If file is empty
"""
try:
logger.info(f"Extracting markdown file: {file_path}")
# Validate file
self._validate_file(file_path)
# Read markdown content
markdown_text = self._read_file(file_path)
# Validate content
if not markdown_text or not markdown_text.strip():
raise EmptyContentError(file_path=str(file_path))
# Create metadata
metadata = self._create_metadata(file_path)
# Build document with raw_markdown
document = Document(raw_markdown=markdown_text, metadata=metadata)
logger.info(
f"Successfully extracted {len(markdown_text)} characters from {file_path.name}"
)
return document
except EmptyContentError:
raise
except ExtractionError:
raise
except Exception as e:
logger.error(f"Markdown extraction failed for {file_path}: {str(e)}")
raise ExtractionError(
message=f"Failed to extract markdown from {file_path.name}",
details=str(e),
file_path=str(file_path),
)
def supports_file_type(self, file_extension: str) -> bool:
"""
Check if this extractor supports Markdown files.
Args:
file_extension: File extension (e.g., 'md', 'markdown')
Returns:
True if Markdown files are supported
"""
return file_extension.lower() in self._supported_extensions
def get_supported_types(self) -> List[str]:
"""
Get list of supported file extensions.
Returns:
List containing 'md' and 'markdown'
"""
return self._supported_extensions.copy()
def _validate_file(self, file_path: Path) -> None:
"""
Validate file exists and is readable.
Args:
file_path: Path to validate
Raises:
ExtractionError: If file is invalid
"""
if not file_path.exists():
raise ExtractionError(
message=f"File not found: {file_path}",
file_path=str(file_path),
)
if not file_path.is_file():
raise ExtractionError(
message=f"Path is not a file: {file_path}",
file_path=str(file_path),
)
if file_path.stat().st_size == 0:
raise EmptyContentError(file_path=str(file_path))
def _read_file(self, file_path: Path) -> str:
"""
Read file content with encoding detection.
Args:
file_path: Path to markdown file
Returns:
File content as string
Raises:
ExtractionError: If reading fails
"""
# Try multiple encodings
for encoding in self._encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
text = f.read()
logger.debug(f"Successfully read {file_path.name} with {encoding}")
return text
except UnicodeDecodeError:
continue
except Exception as e:
logger.error(f"Error reading {file_path.name}: {str(e)}")
raise ExtractionError(
message=f"Failed to read file: {file_path.name}",
details=str(e),
file_path=str(file_path),
)
# If all encodings fail
raise ExtractionError(
message=f"Failed to decode {file_path.name} with any supported encoding",
file_path=str(file_path),
)
def _create_metadata(self, file_path: Path) -> DocumentMetadata:
"""
Create document metadata from markdown file.
Args:
file_path: Path to the markdown file
Returns:
DocumentMetadata entity
"""
stat = file_path.stat()
return DocumentMetadata(
source_id=str(file_path.absolute()),
source_type=SourceType.FILE,
display_name=file_path.name,
size_bytes=stat.st_size,
)

View File

@ -0,0 +1 @@
"""Outgoing storage adapters."""

View File

@ -0,0 +1,216 @@
"""
S3 Storage Adapter - Concrete implementation using AWS S3.
This adapter implements the IFileStorage port using boto3 for AWS S3 operations.
"""
import logging
from typing import Optional
import boto3
from botocore.exceptions import ClientError
from ....core.config import Settings
from ....core.domain.exceptions import ProcessingError
from ....core.ports.outgoing.file_storage import IFileStorage
logger = logging.getLogger(__name__)
class S3StorageAdapter(IFileStorage):
"""
Concrete S3 storage adapter implementation.
This adapter:
1. Uploads content to AWS S3
2. Generates presigned URLs for secure downloads
3. Handles S3 operations with proper error handling
"""
def __init__(self, settings: Settings) -> None:
"""
Initialize S3 storage adapter.
Args:
settings: Application settings with S3 configuration
Raises:
ValueError: If S3 bucket is not configured
"""
if not settings.S3_BUCKET:
raise ValueError("S3_BUCKET must be configured in settings")
self.bucket_name = settings.S3_BUCKET
self.region = settings.S3_REGION
self.expiration_seconds = settings.S3_PRESIGNED_URL_EXPIRATION
self.upload_path_prefix = settings.S3_UPLOAD_PATH_PREFIX
# Build boto3 client config
client_kwargs = {"region_name": self.region}
# Add custom endpoint (for MinIO)
if settings.S3_ENDPOINT_URL:
client_kwargs["endpoint_url"] = settings.S3_ENDPOINT_URL
logger.debug(f"Using custom S3 endpoint: {settings.S3_ENDPOINT_URL}")
# Add credentials if provided (otherwise boto3 uses default credential chain)
if settings.S3_ACCESS_KEY and settings.S3_SECRET_KEY:
client_kwargs["aws_access_key_id"] = settings.S3_ACCESS_KEY
client_kwargs["aws_secret_access_key"] = settings.S3_SECRET_KEY
logger.debug("Using explicit S3 credentials from settings")
else:
logger.debug("Using default AWS credential chain (IAM role, AWS CLI, etc.)")
# Initialize S3 client
self.s3_client = boto3.client("s3", **client_kwargs)
logger.info(
f"S3StorageAdapter initialized: bucket={self.bucket_name}, "
f"region={self.region}, expiration={self.expiration_seconds}s"
)
def upload_content(self, content: str, destination_path: str) -> str:
"""
Upload text content to S3 and return presigned download URL.
Args:
content: Text content to upload (markdown)
destination_path: S3 object key (e.g., "extractions/doc123.md")
Returns:
Presigned download URL valid for 1 hour
Raises:
ProcessingError: If upload fails
"""
try:
logger.info(f"Uploading content to S3: {destination_path}")
# Upload content to S3
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=destination_path,
Body=content.encode("utf-8"),
ContentType="text/markdown",
ContentDisposition=f'attachment; filename="{self._get_filename(destination_path)}"',
)
logger.info(f"Successfully uploaded to S3: s3://{self.bucket_name}/{destination_path}")
# Generate presigned URL
presigned_url = self._generate_presigned_url(destination_path)
logger.info(f"Generated presigned URL (expires in {self.expiration_seconds}s)")
return presigned_url
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "Unknown")
logger.error(f"S3 upload failed: {error_code} - {str(e)}")
raise ProcessingError(
message=f"Failed to upload to S3: {error_code}",
details=str(e),
)
except Exception as e:
logger.error(f"Unexpected error during S3 upload: {str(e)}")
raise ProcessingError(
message="Failed to upload to S3",
details=str(e),
)
def delete_file(self, file_path: str) -> bool:
"""
Delete a file from S3.
Args:
file_path: S3 object key to delete
Returns:
True if deleted successfully, False otherwise
"""
try:
logger.info(f"Deleting file from S3: {file_path}")
self.s3_client.delete_object(
Bucket=self.bucket_name,
Key=file_path,
)
logger.info(f"Successfully deleted from S3: {file_path}")
return True
except ClientError as e:
logger.error(f"S3 delete failed: {str(e)}")
return False
except Exception as e:
logger.error(f"Unexpected error during S3 delete: {str(e)}")
return False
def file_exists(self, file_path: str) -> bool:
"""
Check if a file exists in S3.
Args:
file_path: S3 object key to check
Returns:
True if file exists, False otherwise
"""
try:
self.s3_client.head_object(
Bucket=self.bucket_name,
Key=file_path,
)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
logger.error(f"Error checking S3 file existence: {str(e)}")
return False
except Exception as e:
logger.error(f"Unexpected error checking S3 file: {str(e)}")
return False
def _generate_presigned_url(self, object_key: str) -> str:
"""
Generate presigned URL for S3 object.
Args:
object_key: S3 object key
Returns:
Presigned download URL
Raises:
ProcessingError: If URL generation fails
"""
try:
presigned_url = self.s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": self.bucket_name,
"Key": object_key,
},
ExpiresIn=self.expiration_seconds,
)
return presigned_url
except ClientError as e:
logger.error(f"Failed to generate presigned URL: {str(e)}")
raise ProcessingError(
message="Failed to generate download URL",
details=str(e),
)
def _get_filename(self, path: str) -> str:
"""
Extract filename from path.
Args:
path: Full path (e.g., "extractions/doc123.md")
Returns:
Filename only (e.g., "doc123.md")
"""
return path.split("/")[-1] if "/" in path else path

View File

@ -7,6 +7,7 @@ The Core never imports Adapters - only the Bootstrap does.
The ApplicationContainer manages ONLY:
- Core Services
- Outgoing Adapters (Extractors, Chunkers, Repository)
- Configuration (Settings)
"""
import logging
@ -22,6 +23,8 @@ from .adapters.outgoing.extractors.zip_extractor import ZipExtractor
from .adapters.outgoing.persistence.in_memory_repository import (
InMemoryDocumentRepository,
)
from .adapters.outgoing.storage.s3_storage_adapter import S3StorageAdapter
from .core.config import Settings, get_settings
from .core.ports.incoming.text_processor import ITextProcessor
from .core.services.document_processor_service import DocumentProcessorService
from .shared.logging_config import setup_logging
@ -39,32 +42,47 @@ class ApplicationContainer:
Dependency Injection Container for Core and Outgoing Adapters.
This container manages the lifecycle and dependencies of:
- Configuration (Settings)
- Core Domain Services
- Outgoing Adapters (Extractors, Chunkers, Repository)
- Outgoing Adapters (Extractors, Chunkers, Repository, Storage)
"""
def __init__(self, log_level: str = "INFO") -> None:
def __init__(self, settings: Settings | None = None) -> None:
"""
Initialize the application container.
Args:
log_level: Logging level for the application
settings: Application settings (uses singleton if not provided)
"""
# Setup logging first
setup_logging(level=log_level)
# Load settings (singleton)
self._settings = settings or get_settings()
# Setup logging
setup_logging(level=self._settings.LOG_LEVEL)
logger.info("Initializing ApplicationContainer")
logger.debug(f"Configuration: bucket={self._settings.S3_BUCKET}, region={self._settings.S3_REGION}")
# Create Outgoing Adapters
self._repository = self._create_repository()
self._extractor_factory = self._create_extractor_factory()
self._chunking_context = self._create_chunking_context()
self._file_storage = self._create_file_storage()
# Create Core Service (depends only on Ports)
self._text_processor_service = self._create_text_processor_service()
logger.info("ApplicationContainer initialized successfully")
@property
def settings(self) -> Settings:
"""
Get application settings.
Returns:
Settings: Application configuration
"""
return self._settings
@property
def text_processor_service(self) -> ITextProcessor:
"""
@ -132,6 +150,16 @@ class ApplicationContainer:
return context
def _create_file_storage(self) -> S3StorageAdapter:
"""
Create and configure the file storage adapter.
Returns:
Configured S3 storage adapter
"""
logger.debug("Creating S3StorageAdapter")
return S3StorageAdapter(settings=self._settings)
def _create_text_processor_service(self) -> DocumentProcessorService:
"""
Create the core text processor service.
@ -146,6 +174,8 @@ class ApplicationContainer:
extractor_factory=self._extractor_factory,
chunking_context=self._chunking_context,
repository=self._repository,
file_storage=self._file_storage,
settings=self._settings,
)
@ -167,12 +197,12 @@ def get_processor_service() -> ITextProcessor:
if _container is None:
logger.info("Lazy initializing ApplicationContainer (first access)")
_container = ApplicationContainer(log_level="INFO")
_container = ApplicationContainer()
return _container.text_processor_service
def create_application(log_level: str = "INFO") -> ApplicationContainer:
def create_application(settings: Settings | None = None) -> ApplicationContainer:
"""
Factory function to create a fully wired application container.
@ -180,14 +210,14 @@ def create_application(log_level: str = "INFO") -> ApplicationContainer:
For API routes, use get_processor_service() instead.
Args:
log_level: Logging level for the application
settings: Application settings (uses singleton if not provided)
Returns:
Configured application container
Example:
>>> container = create_application(log_level="DEBUG")
>>> container = create_application()
>>> service = container.text_processor_service
"""
logger.info("Creating application container via factory")
return ApplicationContainer(log_level=log_level)
return ApplicationContainer(settings=settings)

41
src/core/config.py Normal file
View File

@ -0,0 +1,41 @@
"""S3 Configuration."""
from typing import Optional
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""S3/MinIO settings loaded from environment variables or .env file."""
S3_BUCKET: str = "bi-chatbot"
S3_REGION: str = "us-east-1"
S3_ACCESS_KEY: Optional[str] = "bi-chatbot"
S3_SECRET_KEY: Optional[str] = "9ixloSaqtYTkfmrJzE"
S3_ENDPOINT_URL: Optional[str] = "https://cdn.d.aiengines.ir"
S3_PRESIGNED_URL_EXPIRATION: int = 3600
S3_UPLOAD_PATH_PREFIX: str = "extractions"
LOG_LEVEL: str = "INFO"
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True,
extra="ignore",
)
def get_s3_path(self, document_id: str) -> str:
"""Generate S3 path for a document."""
return f"{self.S3_UPLOAD_PATH_PREFIX}/{document_id}.md"
_settings_instance: Optional[Settings] = None
def get_settings() -> Settings:
"""Get or create singleton settings instance."""
global _settings_instance
if _settings_instance is None:
_settings_instance = Settings()
return _settings_instance

View File

@ -231,6 +231,8 @@ class Document(BaseModel):
sections: Parsed structured sections from Markdown
metadata: Associated metadata
is_processed: Flag indicating if document has been processed
download_url: Optional presigned URL for downloading the markdown file
"""
id: UUID = Field(default_factory=uuid4, description="Unique document ID")
raw_markdown: str = Field(..., description="Raw Markdown content")
@ -240,6 +242,7 @@ class Document(BaseModel):
)
metadata: DocumentMetadata = Field(..., description="Document metadata")
is_processed: bool = Field(default=False, description="Processing status")
download_url: Optional[str] = Field(None, description="Presigned download URL")
model_config = {
"frozen": False, # Allow mutation for processing status

View File

@ -0,0 +1,59 @@
"""
Outgoing Port - File Storage Interface.
This port defines the contract for storing files and generating download URLs.
Implementations can use S3, Azure Blob, GCS, or local filesystem.
"""
from abc import ABC, abstractmethod
class IFileStorage(ABC):
"""
Port interface for file storage operations.
This abstraction allows the core domain to store files without
depending on specific storage implementations (S3, Azure, etc.).
"""
@abstractmethod
def upload_content(self, content: str, destination_path: str) -> str:
"""
Upload text content to storage and return a download URL.
Args:
content: Text content to upload (e.g., markdown)
destination_path: Destination path in storage (e.g., "extractions/doc123.md")
Returns:
Secure download URL (presigned URL for cloud storage)
Raises:
StorageError: If upload fails
"""
pass
@abstractmethod
def delete_file(self, file_path: str) -> bool:
"""
Delete a file from storage.
Args:
file_path: Path to file in storage
Returns:
True if deleted successfully, False otherwise
"""
pass
@abstractmethod
def file_exists(self, file_path: str) -> bool:
"""
Check if a file exists in storage.
Args:
file_path: Path to file in storage
Returns:
True if file exists, False otherwise
"""
pass

View File

@ -9,6 +9,7 @@ from pathlib import Path
from typing import List
from uuid import UUID
from ..config import Settings
from ..domain import logic_utils
from ..domain.exceptions import (
DocumentNotFoundError,
@ -20,6 +21,7 @@ from ..domain.models import Chunk, ChunkingStrategy, Document, SourceFile
from ..ports.incoming.text_processor import ITextProcessor
from ..ports.outgoing.chunking_context import IChunkingContext
from ..ports.outgoing.extractor_factory import IExtractorFactory
from ..ports.outgoing.file_storage import IFileStorage
from ..ports.outgoing.repository import IDocumentRepository
@ -39,6 +41,8 @@ class DocumentProcessorService(ITextProcessor):
extractor_factory: IExtractorFactory,
chunking_context: IChunkingContext,
repository: IDocumentRepository,
file_storage: IFileStorage,
settings: Settings,
) -> None:
"""
Initialize the document processor service.
@ -47,10 +51,14 @@ class DocumentProcessorService(ITextProcessor):
extractor_factory: Factory for creating appropriate extractors
chunking_context: Context for managing chunking strategies
repository: Repository for document persistence
file_storage: File storage for uploading extracted content
settings: Application settings for configuration
"""
self._extractor_factory = extractor_factory
self._chunking_context = chunking_context
self._repository = repository
self._file_storage = file_storage
self._settings = settings
logger.info("DocumentProcessorService initialized")
def extract_and_chunk(
@ -102,28 +110,45 @@ class DocumentProcessorService(ITextProcessor):
def extract_document(self, file_path: Path) -> Document:
"""
Extract text content from document without parsing or chunking.
Extract text content from document and upload to S3.
This method only performs extraction:
This method:
1. Extracts raw text content from file
2. Creates Document entity with metadata
3. Returns Document with raw_markdown (no sections)
2. Uploads markdown to S3
3. Generates presigned download URL
4. Returns Document with raw_markdown and download_url
Args:
file_path: Path to the document file
Returns:
Document entity with raw markdown
Document entity with raw markdown and download URL
Raises:
ExtractionError: If text extraction fails
UnsupportedFileTypeError: If file type is not supported
ProcessingError: If S3 upload fails
"""
try:
logger.info(f"Extracting document: {file_path}")
# Extract document
document = self._extract_document(file_path)
logger.info(f"Successfully extracted {len(document.raw_markdown)} characters")
# Upload to S3 and get download URL
destination_path = self._settings.get_s3_path(str(document.id))
download_url = self._file_storage.upload_content(
content=document.raw_markdown,
destination_path=destination_path,
)
# Update document with download URL
document = document.model_copy(update={"download_url": download_url})
logger.info(f"Uploaded to S3 and generated download URL")
return document
except Exception as e:
logger.error(f"Failed to extract document: {str(e)}")
raise