""" Performance and concurrency tests for the declarative transition system. These tests validate that the transition system performs well under load and handles concurrent operations correctly, which is critical for production FSM systems. """ import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import Any, Dict, Optional from unittest.mock import Mock from django.test import TestCase, TransactionTestCase from fsm.registry import transition_registry from fsm.transitions import BaseTransition, TransitionContext, TransitionValidationError from pydantic import Field class PerformanceTestTransition(BaseTransition): """Simple transition for performance testing""" operation_id: int = Field(..., description='Operation identifier') data_size: int = Field(1, description='Size of data to process') def get_target_state(self, context: Optional[TransitionContext] = None) -> str: return 'PROCESSED' @classmethod def can_transition_from_state(cls, context: TransitionContext) -> bool: return True def validate_transition(self, context: TransitionContext) -> bool: # Simulate some validation work if self.data_size < 0: raise TransitionValidationError('Invalid data size') return True def transition(self, context: TransitionContext) -> Dict[str, Any]: # Simulate processing work return { 'operation_id': self.operation_id, 'data_size': self.data_size, 'processed_at': context.timestamp.isoformat(), 'processing_time_ms': 1, # Mock processing time } class ConcurrencyTestTransition(BaseTransition): """Transition for testing concurrent access patterns""" thread_id: int = Field(..., description='Thread identifier') shared_counter: int = Field(0, description='Shared counter for testing') sleep_duration: float = Field(0.0, description='Simulate processing delay') execution_order: list = Field(default_factory=list, description='Track execution order') def get_target_state(self, context: Optional[TransitionContext] = None) -> str: return f'PROCESSED_BY_THREAD_{self.thread_id}' @classmethod def can_transition_from_state(cls, context: TransitionContext) -> bool: return True def validate_transition(self, context: TransitionContext) -> bool: # Record validation timing for concurrency analysis self.execution_order.append(f'validate_{self.thread_id}_{time.time()}') return True def transition(self, context: TransitionContext) -> Dict[str, Any]: # Record transition timing self.execution_order.append(f'transition_{self.thread_id}_{time.time()}') # Simulate some processing delay if self.sleep_duration > 0: time.sleep(self.sleep_duration) return { 'thread_id': self.thread_id, 'shared_counter': self.shared_counter, 'execution_order': self.execution_order.copy(), 'processed_at': context.timestamp.isoformat(), } class PerformanceTests(TestCase): """ Performance tests for the declarative transition system. These tests measure execution time, memory usage patterns, and scalability characteristics. """ def setUp(self): self.mock_entity = Mock() self.mock_entity.pk = 1 self.mock_entity._meta.model_name = 'test_entity' self.mock_user = Mock() self.mock_user.id = 123 # Clear registry for clean tests transition_registry._transitions.clear() transition_registry.register('test_entity', 'performance_test', PerformanceTestTransition) def test_single_transition_performance(self): """ PERFORMANCE TEST: Measure single transition execution time Validates that individual transitions execute within acceptable time limits. """ transition = PerformanceTestTransition(operation_id=1, data_size=1000) context = TransitionContext( entity=self.mock_entity, current_user=self.mock_user, current_state='CREATED', target_state=transition.get_target_state(), ) # Measure validation performance start_time = time.perf_counter() result = transition.validate_transition(context) validation_time = time.perf_counter() - start_time assert result assert validation_time < 0.001 # Should be under 1ms # Measure transition execution performance start_time = time.perf_counter() transition_data = transition.transition(context) execution_time = time.perf_counter() - start_time assert isinstance(transition_data, dict) assert execution_time < 0.001 # Should be under 1ms # Measure total workflow performance start_time = time.perf_counter() transition.context = context transition.validate_transition(context) transition.transition(context) total_time = time.perf_counter() - start_time assert total_time < 0.005 # Total should be under 5ms def test_batch_transition_performance(self): """ PERFORMANCE TEST: Measure batch transition creation and validation Tests performance when creating many transition instances rapidly. """ batch_size = 1000 # Test batch creation performance start_time = time.perf_counter() transitions = [] for i in range(batch_size): transition = PerformanceTestTransition(operation_id=i, data_size=i * 10) transitions.append(transition) creation_time = time.perf_counter() - start_time creation_time_per_item = creation_time / batch_size assert len(transitions) == batch_size assert creation_time_per_item < 0.001 # Under 1ms per transition # Test batch validation performance context = TransitionContext( entity=self.mock_entity, current_user=self.mock_user, current_state='CREATED', target_state='PROCESSED' ) start_time = time.perf_counter() validation_results = [] for transition in transitions: result = transition.validate_transition(context) validation_results.append(result) validation_time = time.perf_counter() - start_time validation_time_per_item = validation_time / batch_size assert all(validation_results) assert validation_time_per_item < 0.001 # Under 1ms per validation assert validation_time < 0.5 # Total batch under 500ms def test_registry_performance(self): """ PERFORMANCE TEST: Registry operations under load Tests the performance of registry lookups and registrations. """ # Test registry lookup performance lookup_count = 10000 start_time = time.perf_counter() for i in range(lookup_count): retrieved_class = transition_registry.get_transition('test_entity', 'performance_test') lookup_time = time.perf_counter() - start_time lookup_time_per_operation = lookup_time / lookup_count assert retrieved_class == PerformanceTestTransition assert lookup_time_per_operation < 0.0001 # Under 0.1ms per lookup # Test registry registration performance registration_count = 1000 start_time = time.perf_counter() for i in range(registration_count): entity_name = f'entity_{i}' transition_name = f'transition_{i}' transition_registry.register(entity_name, transition_name, PerformanceTestTransition) registration_time = time.perf_counter() - start_time registration_time_per_operation = registration_time / registration_count assert registration_time_per_operation < 0.001 # Under 1ms per registration # Verify registrations worked test_class = transition_registry.get_transition('entity_500', 'transition_500') assert test_class == PerformanceTestTransition def test_pydantic_validation_performance(self): """ PERFORMANCE TEST: Pydantic validation performance Measures the overhead of Pydantic validation in transitions. """ # Test valid data performance valid_data = {'operation_id': 123, 'data_size': 1000} validation_count = 10000 start_time = time.perf_counter() for i in range(validation_count): PerformanceTestTransition(**valid_data) validation_time = time.perf_counter() - start_time validation_time_per_item = validation_time / validation_count assert validation_time_per_item < 0.001 # Under 1ms per validation # Test validation error performance invalid_data = {'operation_id': 'invalid', 'data_size': -1} error_count = 1000 start_time = time.perf_counter() errors = [] for i in range(error_count): try: PerformanceTestTransition(**invalid_data) except Exception as e: errors.append(e) error_time = time.perf_counter() - start_time error_time_per_item = error_time / error_count assert len(errors) == error_count assert error_time_per_item < 0.01 # Under 10ms per error (errors are slower) def test_memory_usage_patterns(self): """ PERFORMANCE TEST: Memory usage analysis Tests memory usage patterns for transition instances and contexts. """ import sys # Measure base memory usage base_transitions = [] for i in range(100): transition = PerformanceTestTransition(operation_id=i, data_size=i) base_transitions.append(transition) base_size = sys.getsizeof(base_transitions[0]) # Test memory usage with complex data complex_transitions = [] for i in range(100): transition = PerformanceTestTransition(operation_id=i, data_size=i * 1000) # Add context to transition context = TransitionContext( entity=self.mock_entity, current_user=self.mock_user, current_state='CREATED', target_state=transition.get_target_state(), metadata={'large_data': 'x' * 1000}, # Add some bulk ) transition.context = context complex_transitions.append(transition) complex_size = sys.getsizeof(complex_transitions[0]) # Memory usage should be reasonable memory_overhead = complex_size - base_size assert memory_overhead < 10000 # Under 10KB overhead per transition # Clean up contexts to test garbage collection for transition in complex_transitions: transition.context = None # Verify memory can be reclaimed (simplified test) assert complex_transitions[0].context is None class ConcurrencyTests(TransactionTestCase): """ Concurrency tests for the declarative transition system. These tests validate thread safety and concurrent execution patterns that are critical for production systems. """ def setUp(self): self.mock_entity = Mock() self.mock_entity.pk = 1 self.mock_entity._meta.model_name = 'test_entity' self.mock_user = Mock() self.mock_user.id = 123 # Clear registry for clean tests transition_registry._transitions.clear() transition_registry.register('test_entity', 'concurrency_test', ConcurrencyTestTransition) def test_concurrent_transition_creation(self): """ CONCURRENCY TEST: Thread-safe transition instance creation Validates that multiple threads can create transition instances concurrently without conflicts. """ thread_count = 10 transitions_per_thread = 100 all_transitions = [] thread_results = [] def create_transitions(thread_id): """Worker function to create transitions in a thread""" local_transitions = [] for i in range(transitions_per_thread): transition = ConcurrencyTestTransition( thread_id=thread_id, shared_counter=i, sleep_duration=0.001 # Small delay to increase contention ) local_transitions.append(transition) return local_transitions # Execute concurrent creation with ThreadPoolExecutor(max_workers=thread_count) as executor: futures = [] for thread_id in range(thread_count): future = executor.submit(create_transitions, thread_id) futures.append(future) for future in as_completed(futures): thread_transitions = future.result() thread_results.append(thread_transitions) all_transitions.extend(thread_transitions) # Validate results total_expected = thread_count * transitions_per_thread assert len(all_transitions) == total_expected # Check thread separation thread_ids = [t.thread_id for t in all_transitions] unique_threads = set(thread_ids) assert len(unique_threads) == thread_count # Validate each thread created correct number of transitions for thread_id in range(thread_count): thread_transitions = [t for t in all_transitions if t.thread_id == thread_id] assert len(thread_transitions) == transitions_per_thread def test_concurrent_transition_execution(self): """ CONCURRENCY TEST: Concurrent transition execution Tests that multiple transitions can be executed concurrently without race conditions in the execution logic. """ thread_count = 5 execution_results = [] def execute_transition(thread_id): """Worker function to execute a transition""" transition = ConcurrencyTestTransition( thread_id=thread_id, shared_counter=thread_id * 10, sleep_duration=0.01, # Small delay to test concurrency ) context = TransitionContext( entity=self.mock_entity, current_user=self.mock_user, current_state='CREATED', target_state=transition.get_target_state(), timestamp=datetime.now(), ) # Execute validation and transition validation_result = transition.validate_transition(context) transition_data = transition.transition(context) return { 'thread_id': thread_id, 'validation_result': validation_result, 'transition_data': transition_data, 'execution_order': transition.execution_order, } # Execute concurrent transitions with ThreadPoolExecutor(max_workers=thread_count) as executor: futures = [] for thread_id in range(thread_count): future = executor.submit(execute_transition, thread_id) futures.append(future) for future in as_completed(futures): result = future.result() execution_results.append(result) # Validate results assert len(execution_results) == thread_count for result in execution_results: assert result['validation_result'] assert 'thread_id' in result['transition_data'] assert isinstance(result['execution_order'], list) assert len(result['execution_order']) > 0 # Check thread isolation thread_ids = [r['transition_data']['thread_id'] for r in execution_results] assert set(thread_ids) == set(range(thread_count)) def test_registry_thread_safety(self): """ CONCURRENCY TEST: Registry thread safety Tests that the transition registry handles concurrent registration and lookup operations safely. """ thread_count = 10 operations_per_thread = 100 def registry_operations(thread_id): """Worker function for registry operations""" operations_completed = 0 for i in range(operations_per_thread): # Mix of registration and lookup operations if i % 3 == 0: # Register new transition entity_name = f'entity_{thread_id}_{i}' transition_name = f'transition_{i}' transition_registry.register(entity_name, transition_name, ConcurrencyTestTransition) operations_completed += 1 elif i % 3 == 1: # Lookup existing transition try: found_class = transition_registry.get_transition('test_entity', 'concurrency_test') if found_class == ConcurrencyTestTransition: operations_completed += 1 except Exception: pass else: # List operations try: entities = transition_registry.list_entities() if len(entities) >= 0: # Should always be non-negative operations_completed += 1 except Exception: pass return operations_completed # Execute concurrent registry operations with ThreadPoolExecutor(max_workers=thread_count) as executor: futures = [] for thread_id in range(thread_count): future = executor.submit(registry_operations, thread_id) futures.append(future) operation_counts = [] for future in as_completed(futures): count = future.result() operation_counts.append(count) # Validate no operations failed due to thread safety issues total_operations = sum(operation_counts) expected_minimum = thread_count * operations_per_thread * 0.9 # Allow some variance assert total_operations > expected_minimum # Registry should be in consistent state entities = transition_registry.list_entities() assert isinstance(entities, list) assert len(entities) > thread_count # Should have entities from all threads def test_context_isolation(self): """ CONCURRENCY TEST: Context isolation between threads Ensures that transition contexts remain isolated between concurrent executions and don't leak data. """ thread_count = 8 context_data = [] def context_isolation_test(thread_id): """Test context isolation in a thread""" # Create unique context data for this thread unique_data = { 'thread_specific_id': thread_id, 'random_data': f'thread_{thread_id}_data', 'timestamp': datetime.now().isoformat(), 'test_counter': thread_id * 1000, } transition = ConcurrencyTestTransition( thread_id=thread_id, shared_counter=thread_id, sleep_duration=0.005, # Small delay to increase chance of interference ) context = TransitionContext( entity=self.mock_entity, current_user=self.mock_user, current_state='CREATED', target_state=transition.get_target_state(), metadata=unique_data, ) # Set context on transition transition.context = context # Execute transition validation_result = transition.validate_transition(context) transition_data = transition.transition(context) # Retrieve context and verify isolation retrieved_context = transition.context return { 'thread_id': thread_id, 'original_metadata': unique_data, 'retrieved_metadata': retrieved_context.metadata, 'validation_result': validation_result, 'transition_data': transition_data, } # Execute with high concurrency with ThreadPoolExecutor(max_workers=thread_count) as executor: futures = [] for thread_id in range(thread_count): future = executor.submit(context_isolation_test, thread_id) futures.append(future) for future in as_completed(futures): result = future.result() context_data.append(result) # Validate context isolation assert len(context_data) == thread_count for result in context_data: thread_id = result['thread_id'] original_metadata = result['original_metadata'] retrieved_metadata = result['retrieved_metadata'] # Context should match exactly what was set for this thread assert original_metadata['thread_specific_id'] == thread_id assert retrieved_metadata['thread_specific_id'] == thread_id assert original_metadata['random_data'] == retrieved_metadata['random_data'] assert original_metadata['test_counter'] == thread_id * 1000 # Should not have data from other threads for other_result in context_data: if other_result['thread_id'] != thread_id: assert ( retrieved_metadata['thread_specific_id'] != other_result['original_metadata']['thread_specific_id'] ) def test_stress_test_mixed_operations(self): """ STRESS TEST: Mixed operations under load Combines multiple types of operations under high concurrency to test overall system stability. """ duration_seconds = 2 # Short duration for CI thread_count = 6 # Shared statistics stats = { 'transitions_created': 0, 'validations_performed': 0, 'transitions_executed': 0, 'registry_lookups': 0, 'errors_encountered': 0, } stats_lock = threading.Lock() def mixed_operations_worker(worker_id): """Worker that performs mixed operations""" local_stats = { 'transitions_created': 0, 'validations_performed': 0, 'transitions_executed': 0, 'registry_lookups': 0, 'errors_encountered': 0, } end_time = time.time() + duration_seconds operation_counter = 0 while time.time() < end_time: try: operation_type = operation_counter % 4 if operation_type == 0: # Create transition transition = ConcurrencyTestTransition(thread_id=worker_id, shared_counter=operation_counter) local_stats['transitions_created'] += 1 elif operation_type == 1: # Validate transition transition = ConcurrencyTestTransition(thread_id=worker_id, shared_counter=operation_counter) context = TransitionContext( entity=self.mock_entity, current_state='CREATED', target_state=transition.get_target_state(), ) transition.validate_transition(context) local_stats['validations_performed'] += 1 elif operation_type == 2: # Execute transition transition = ConcurrencyTestTransition(thread_id=worker_id, shared_counter=operation_counter) context = TransitionContext( entity=self.mock_entity, current_state='CREATED', target_state=transition.get_target_state(), ) transition.transition(context) local_stats['transitions_executed'] += 1 else: # Registry lookup found = transition_registry.get_transition('test_entity', 'concurrency_test') if found: local_stats['registry_lookups'] += 1 operation_counter += 1 except Exception: local_stats['errors_encountered'] += 1 # Small yield to allow other threads time.sleep(0.0001) # Update shared statistics with stats_lock: for key in stats: stats[key] += local_stats[key] return local_stats # Execute stress test with ThreadPoolExecutor(max_workers=thread_count) as executor: futures = [] for worker_id in range(thread_count): future = executor.submit(mixed_operations_worker, worker_id) futures.append(future) worker_results = [] for future in as_completed(futures): result = future.result() worker_results.append(result) # Validate stress test results total_operations = sum( [ stats['transitions_created'], stats['validations_performed'], stats['transitions_executed'], stats['registry_lookups'], ] ) # Should have performed substantial work assert total_operations > thread_count * 10 # Error rate should be very low (< 1%) error_rate = stats['errors_encountered'] / max(total_operations, 1) assert error_rate < 0.01 # All operation types should have been performed assert stats['transitions_created'] > 0 assert stats['validations_performed'] > 0 assert stats['transitions_executed'] > 0 assert stats['registry_lookups'] > 0