fix potential race condition in DocumentProcessorService._chunk_document by making the context stateless

This commit is contained in:
m.dabbagh 2026-01-07 21:57:22 +03:30
parent fd39184c0c
commit 10a619494b
3 changed files with 31 additions and 60 deletions

View File

@ -21,14 +21,14 @@ class ChunkingContext(IChunkingContext):
""" """
Context for managing chunking strategies (Strategy Pattern). Context for managing chunking strategies (Strategy Pattern).
This class allows switching between different chunking strategies This class provides thread-safe, stateless chunking by selecting
at runtime, providing flexibility in how text is split. the appropriate strategy based on each request's configuration.
No shared mutable state is maintained between requests.
""" """
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize chunking context with empty strategy registry.""" """Initialize chunking context with empty strategy registry."""
self._chunkers: Dict[str, IChunker] = {} self._chunkers: Dict[str, IChunker] = {}
self._current_chunker: IChunker | None = None
logger.info("ChunkingContext initialized") logger.info("ChunkingContext initialized")
def register_chunker(self, chunker: IChunker) -> None: def register_chunker(self, chunker: IChunker) -> None:
@ -44,30 +44,6 @@ class ChunkingContext(IChunkingContext):
f"Registered {chunker.__class__.__name__} as '{strategy_name}'" f"Registered {chunker.__class__.__name__} as '{strategy_name}'"
) )
def set_strategy(self, strategy_name: str) -> None:
"""
Set the active chunking strategy.
Args:
strategy_name: Name of the strategy to use
Raises:
ChunkingError: If strategy is not registered
"""
normalized_name = strategy_name.lower()
chunker = self._chunkers.get(normalized_name)
if chunker is None:
available = list(self._chunkers.keys())
raise ChunkingError(
message=f"Unknown chunking strategy: {strategy_name}",
details=f"Available strategies: {', '.join(available)}",
strategy_name=strategy_name,
)
self._current_chunker = chunker
logger.debug(f"Set chunking strategy to: {strategy_name}")
def execute_chunking( def execute_chunking(
self, self,
text: str, text: str,
@ -75,30 +51,38 @@ class ChunkingContext(IChunkingContext):
strategy: ChunkingStrategy, strategy: ChunkingStrategy,
) -> List[Chunk]: ) -> List[Chunk]:
""" """
Execute chunking with the current strategy. Execute chunking using the specified strategy.
This method is stateless and thread-safe. It selects the appropriate
chunker based on the strategy configuration for each call.
Args: Args:
text: Text to chunk text: Text to chunk
document_id: ID of parent document document_id: ID of parent document
strategy: Chunking strategy configuration strategy: Chunking strategy configuration (includes strategy_name)
Returns: Returns:
List of chunks List of chunks
Raises: Raises:
ChunkingError: If no strategy is set or chunking fails ChunkingError: If strategy is not registered or chunking fails
""" """
if self._current_chunker is None: normalized_name = strategy.strategy_name.lower()
chunker = self._chunkers.get(normalized_name)
if chunker is None:
available = list(self._chunkers.keys())
raise ChunkingError( raise ChunkingError(
message="No chunking strategy set", message=f"Unknown chunking strategy: {strategy.strategy_name}",
details="Call set_strategy() before executing chunking", details=f"Available strategies: {', '.join(available)}",
strategy_name=strategy.strategy_name,
) )
logger.debug( logger.debug(
f"Executing chunking with {self._current_chunker.get_strategy_name()}" f"Executing chunking with strategy: {strategy.strategy_name}"
) )
return self._current_chunker.chunk( return chunker.chunk(
text=text, text=text,
document_id=document_id, document_id=document_id,
strategy=strategy, strategy=strategy,

View File

@ -16,22 +16,9 @@ class IChunkingContext(ABC):
Interface for chunking context (Strategy Pattern). Interface for chunking context (Strategy Pattern).
Implementations of this interface manage the selection and Implementations of this interface manage the selection and
execution of chunking strategies. execution of chunking strategies in a thread-safe, stateless manner.
""" """
@abstractmethod
def set_strategy(self, strategy_name: str) -> None:
"""
Set the active chunking strategy.
Args:
strategy_name: Name of the strategy to use
Raises:
ChunkingError: If strategy is not registered
"""
pass
@abstractmethod @abstractmethod
def execute_chunking( def execute_chunking(
self, self,
@ -40,18 +27,21 @@ class IChunkingContext(ABC):
strategy: ChunkingStrategy, strategy: ChunkingStrategy,
) -> List[Chunk]: ) -> List[Chunk]:
""" """
Execute chunking with the current strategy. Execute chunking using the specified strategy.
This method is stateless and thread-safe. It selects the appropriate
chunker based on the strategy configuration and executes chunking.
Args: Args:
text: Text to chunk text: Text to chunk
document_id: ID of parent document document_id: ID of parent document
strategy: Chunking strategy configuration strategy: Chunking strategy configuration (includes strategy_name)
Returns: Returns:
List of chunks List of chunks
Raises: Raises:
ChunkingError: If no strategy is set or chunking fails ChunkingError: If strategy is not registered or chunking fails
""" """
pass pass

View File

@ -17,8 +17,8 @@ from ..domain.exceptions import (
) )
from ..domain.models import Chunk, ChunkingStrategy, Document from ..domain.models import Chunk, ChunkingStrategy, Document
from ..ports.incoming.text_processor import ITextProcessor from ..ports.incoming.text_processor import ITextProcessor
from ..ports.outgoing.chunker import IChunker from ..ports.outgoing.chunking_context import IChunkingContext
from ..ports.outgoing.extractor import IExtractor from ..ports.outgoing.extractor_factory import IExtractorFactory
from ..ports.outgoing.repository import IDocumentRepository from ..ports.outgoing.repository import IDocumentRepository
@ -247,6 +247,9 @@ class DocumentProcessorService(ITextProcessor):
""" """
Chunk document using specified strategy. Chunk document using specified strategy.
This method is thread-safe as it delegates to a stateless
chunking context that selects the strategy based on configuration.
Args: Args:
document: Document to chunk document: Document to chunk
strategy: Chunking strategy configuration strategy: Chunking strategy configuration
@ -254,14 +257,8 @@ class DocumentProcessorService(ITextProcessor):
Returns: Returns:
List of chunks List of chunks
""" """
self._chunking_context.set_strategy(strategy.strategy_name)
return self._chunking_context.execute_chunking( return self._chunking_context.execute_chunking(
text=document.content, text=document.content,
document_id=document.id, document_id=document.id,
strategy=strategy, strategy=strategy,
) )
# Import interfaces from ports (proper hexagonal architecture)
from ..ports.outgoing.chunking_context import IChunkingContext
from ..ports.outgoing.extractor_factory import IExtractorFactory