""" Label Studio Open Source FSM Integration Tests. Tests the core FSM functionality with real Django models in LSO, focusing on coverage of state_manager.py and utils modules. """ import logging from datetime import datetime, timezone from unittest.mock import patch import pytest from core.current_request import CurrentContext from django.contrib.auth import get_user_model from django.core.cache import cache from fsm.state_choices import AnnotationStateChoices, ProjectStateChoices, TaskStateChoices from fsm.state_manager import StateManager, StateManagerError from fsm.state_models import AnnotationState, ProjectState, TaskState from fsm.utils import get_current_state_safe, is_fsm_enabled, resolve_organization_id from organizations.tests.factories import OrganizationFactory from projects.tests.factories import ProjectFactory from tasks.models import Annotation from tasks.tests.factories import AnnotationFactory, TaskFactory from users.tests.factories import UserFactory User = get_user_model() logger = logging.getLogger(__name__) @pytest.mark.django_db class TestLSOFSMIntegration: """ Test LSO FSM integration with real models. Focuses on improving coverage of state_manager.py and utils.py by testing error paths, cache behavior, and bulk operations. """ @pytest.fixture(autouse=True) def setup_test_data(self): """Set up test data.""" cache.clear() self.org = OrganizationFactory() self.user = UserFactory() CurrentContext.set_user(self.user) CurrentContext.set_organization_id(self.org.id) yield cache.clear() CurrentContext.clear() def test_project_creation_generates_state(self): """ Test that creating a project automatically generates a state record. Validates: - Project model extends FsmHistoryStateModel - Automatic state transition on creation - State is CREATED for new projects """ project = ProjectFactory(organization=self.org) # Check state was created state = StateManager.get_current_state_value(project) assert state == ProjectStateChoices.CREATED, f'Expected CREATED, got {state}' # Check state history exists history = list(ProjectState.objects.filter(project=project).order_by('created_at')) assert len(history) == 1 assert history[0].state == ProjectStateChoices.CREATED assert history[0].transition_name == 'project_created' def test_task_creation_generates_state(self): """ Test that creating a task automatically generates a state record. Validates: - Task model extends FsmHistoryStateModel - Automatic state transition on creation - State is CREATED for new tasks """ project = ProjectFactory(organization=self.org) task = TaskFactory(project=project) # Check state was created state = StateManager.get_current_state_value(task) assert state == TaskStateChoices.CREATED, f'Expected CREATED, got {state}' # Check state history exists history = list(TaskState.objects.filter(task=task).order_by('created_at')) assert len(history) == 1 assert history[0].state == TaskStateChoices.CREATED def test_annotation_creation_generates_state(self): """ Test that creating an annotation automatically generates a state record. Validates: - Annotation model extends FsmHistoryStateModel - Automatic state transition on creation - State is SUBMITTED for new annotations in LSO """ project = ProjectFactory(organization=self.org) task = TaskFactory(project=project) annotation = AnnotationFactory(task=task, completed_by=self.user) # Check state was created state = StateManager.get_current_state_value(annotation) assert state == AnnotationStateChoices.SUBMITTED, f'Expected SUBMITTED, got {state}' # Check state history exists history = list(AnnotationState.objects.filter(annotation=annotation).order_by('created_at')) assert len(history) >= 1 assert history[0].state == AnnotationStateChoices.SUBMITTED def test_cache_functionality(self): """ Test that StateManager caching works correctly. Validates: - State retrieval works consistently - Cache doesn't cause incorrect state returns - Multiple accesses return same state """ project = ProjectFactory(organization=self.org) # First access cache.clear() state1 = StateManager.get_current_state_value(project) assert state1 == ProjectStateChoices.CREATED # Second access - should return same state (whether from cache or DB) state2 = StateManager.get_current_state_value(project) assert state2 == ProjectStateChoices.CREATED # States should match assert state1 == state2 def test_get_current_state_safe_with_no_state(self): """ Test get_current_state_safe returns None for entities without states. Validates: - Utility function handles entities with no state records - No exceptions raised - Returns None gracefully """ # Create a project but delete its state records project = ProjectFactory(organization=self.org) ProjectState.objects.filter(project=project).delete() cache.clear() # Should return None, not raise state = get_current_state_safe(project) assert state is None def test_resolve_organization_id_from_entity(self): """ Test resolve_organization_id utility function. Validates: - Extracts organization_id from entity with direct attribute - Extracts organization_id from entity with project relation - Falls back to CurrentContext """ project = ProjectFactory(organization=self.org) task = TaskFactory(project=project) # Test direct attribute org_id = resolve_organization_id(project) assert org_id == self.org.id # Test via project relation org_id = resolve_organization_id(task) assert org_id == self.org.id def test_is_fsm_enabled_in_lso(self): """ Test is_fsm_enabled checks feature flag correctly. Validates: - Feature flag check works - Returns True when enabled """ result = is_fsm_enabled(user=self.user) assert result is True def test_state_manager_error_handling(self): """ Test StateManager error handling for invalid entities. Validates: - Proper error raised for entities without state model - Error includes helpful message """ # Create a mock entity that doesn't have a state model from unittest.mock import Mock mock_entity = Mock() mock_entity._meta = Mock() mock_entity._meta.model_name = 'nonexistent_model' mock_entity._meta.label_lower = 'test.nonexistent' mock_entity.pk = 1 # Should raise StateManagerError with pytest.raises(StateManagerError) as exc_info: StateManager.get_current_state_value(mock_entity) assert 'No state model found' in str(exc_info.value) def test_warm_cache_bulk_operation(self): """ Test bulk cache warming for multiple entities. Validates: - Warm cache operation works for multiple entities - Subsequent state retrievals are faster (cached) - Correct states for all entities """ project = ProjectFactory(organization=self.org) tasks = [TaskFactory(project=project) for _ in range(5)] # Warm cache for all tasks cache.clear() StateManager.warm_cache(tasks) # Verify all states can be retrieved and are correct for task in tasks: state = StateManager.get_current_state_value(task) assert state == TaskStateChoices.CREATED def test_get_state_history(self): """ Test retrieving state history for an entity. Validates: - History retrieval works - States are in chronological order - All transition metadata captured """ project = ProjectFactory(organization=self.org) # Get history history = StateManager.get_state_history(project) # Should have at least creation state assert len(history) >= 1 assert history[0].state == ProjectStateChoices.CREATED assert history[0].transition_name == 'project_created' def test_get_state_history_ordering(self): """ Test that state history is returned in chronological order. Validates: - History is ordered by creation time - Oldest states appear first - All state records are included """ project = ProjectFactory(organization=self.org) # Get history history = StateManager.get_state_history(project) # Should have at least one state (creation) assert len(history) >= 1 # Verify ordering - each timestamp should be >= the previous timestamps = [h.created_at for h in history] assert timestamps == sorted(timestamps) def test_annotation_from_draft_workflow(self): """ Test annotation created from draft has correct state. Validates: - Draft-based annotation workflow - Correct transition triggered - State is SUBMITTED in LSO """ project = ProjectFactory(organization=self.org) task = TaskFactory(project=project) # Create annotation with draft flag annotation = Annotation( task=task, completed_by=self.user, result=[{'test': 'data'}], was_cancelled=False, ) annotation.save() # Should be in SUBMITTED state state = StateManager.get_current_state_value(annotation) assert state == AnnotationStateChoices.SUBMITTED def test_state_manager_with_multiple_transitions(self): """ Test that multiple state transitions are recorded correctly. Validates: - Multiple transitions create multiple records - History ordering is correct - Each transition has correct metadata """ project = ProjectFactory(organization=self.org) # Create multiple state changes by updating project StateManager.get_current_state_value(project) # Update project settings (should create a state record in LSE, but not in LSO) project.maximum_annotations = 5 project.save() # Get history history = list(ProjectState.objects.filter(project=project).order_by('created_at')) # In LSO, settings changes don't create new state records # so we should still have just the creation record assert len(history) == 1 assert history[0].state == ProjectStateChoices.CREATED @pytest.mark.django_db class TestLSOFSMUtilities: """ Test LSO FSM utility functions. Focuses on improving coverage of utils.py module. """ @pytest.fixture(autouse=True) def setup_test_data(self): """Set up test data.""" cache.clear() self.org = OrganizationFactory() self.user = UserFactory() CurrentContext.set_user(self.user) CurrentContext.set_organization_id(self.org.id) yield cache.clear() CurrentContext.clear() def test_resolve_organization_id_with_user(self): """ Test resolve_organization_id with user parameter. Validates: - User parameter takes precedence - Correct organization extracted """ project = ProjectFactory(organization=self.org) org_id = resolve_organization_id(project, user=self.user) assert org_id is not None def test_resolve_organization_id_fallback_to_context(self): """ Test resolve_organization_id falls back to CurrentContext. Validates: - CurrentContext used when no other source available - Correct ID returned """ CurrentContext.set_organization_id(self.org.id) # Create entity without organization_id attribute from unittest.mock import Mock mock_entity = Mock(spec=[]) # No attributes org_id = resolve_organization_id(mock_entity) assert org_id == self.org.id def test_get_current_state_safe_with_state(self): """ Test get_current_state_safe returns correct state value. Validates: - Function returns state string (not None) - State value is correct - Works correctly with database query """ project = ProjectFactory(organization=self.org) cache.clear() # Should return state value string, not None state_value = get_current_state_safe(project) assert state_value is not None assert state_value == ProjectStateChoices.CREATED def test_state_manager_handles_concurrent_access(self): """ Test StateManager handles concurrent access correctly. Validates: - No race conditions in state retrieval - Cache consistency - Multiple simultaneous requests work correctly """ project = ProjectFactory(organization=self.org) # Simulate multiple concurrent accesses states = [StateManager.get_current_state_value(project) for _ in range(10)] # All should return the same state assert all(s == ProjectStateChoices.CREATED for s in states) def test_get_current_state_value_when_fsm_disabled(self): """ Test get_current_state_value returns None when FSM is disabled. Validates: - Returns None instead of raising when feature flag is off - Handles disabled state gracefully """ project = ProjectFactory(organization=self.org) with patch.object(StateManager, '_is_fsm_enabled', return_value=False): result = StateManager.get_current_state_value(project) assert result is None def test_get_states_in_time_range(self): """ Test get_states_in_time_range for time-based queries. Validates: - UUID7-based time range queries work - Returns states within specified time range """ from datetime import timedelta project = ProjectFactory(organization=self.org) # Get states from the last day start_time = datetime.now(timezone.utc) - timedelta(days=1) end_time = datetime.now(timezone.utc) states = StateManager.get_states_in_time_range(project, start_time, end_time) # Should have at least the creation state assert len(states) >= 1 def test_invalidate_cache(self): """ Test cache invalidation for entity state. Validates: - Cache is cleared for specific entity - Subsequent lookups hit database """ project = ProjectFactory(organization=self.org) # Get state to populate cache state = StateManager.get_current_state_value(project) assert state == ProjectStateChoices.CREATED # Invalidate cache StateManager.invalidate_cache(project) # Next lookup should work (will hit DB) state_after = StateManager.get_current_state_value(project) assert state_after == ProjectStateChoices.CREATED def test_get_current_state_object(self): """ Test get_current_state_object returns full state record. Validates: - Returns BaseState instance with full audit information - Contains all expected fields """ project = ProjectFactory(organization=self.org) # Get current state object state_object = StateManager.get_current_state_object(project) assert state_object is not None assert state_object.state == ProjectStateChoices.CREATED assert hasattr(state_object, 'triggered_by') assert hasattr(state_object, 'transition_name') def test_transition_state_fsm_disabled(self): """ Test transition_state returns True when FSM is disabled. Validates: - Returns True without creating state record - Handles disabled state gracefully """ project = ProjectFactory(organization=self.org) with patch.object(StateManager, '_is_fsm_enabled', return_value=False): result = StateManager.transition_state( entity=project, new_state='NEW_STATE', transition_name='test', user=self.user ) assert result is True def test_warm_cache_multiple_entities(self): """ Test warm_cache with multiple entities for bulk operations. Validates: - Cache is populated for all entities - Subsequent get_current_state_value calls are fast (from cache) """ projects = [ProjectFactory(organization=self.org) for _ in range(3)] # Warm cache for all projects StateManager.warm_cache(projects) # Verify all are cached (should not hit DB) for project in projects: state = StateManager.get_current_state_value(project) assert state == ProjectStateChoices.CREATED def test_fsm_disabled_via_current_context(self): """ Test CurrentContext.set_fsm_disabled() directly. Validates: - Can disable FSM via CurrentContext - is_fsm_enabled() respects the flag - State is properly restored """ from core.current_request import CurrentContext # Initially enabled assert is_fsm_enabled() is True # Disable FSM CurrentContext.set_fsm_disabled(True) assert is_fsm_enabled() is False assert CurrentContext.is_fsm_disabled() is True # Re-enable FSM CurrentContext.set_fsm_disabled(False) assert is_fsm_enabled() is True assert CurrentContext.is_fsm_disabled() is False