""" FSM utility functions for backfilling and managing state transitions. This module contains reusable functions for FSM state management that are used across different parts of the codebase. """ import logging logger = logging.getLogger(__name__) def backfill_fsm_states_for_tasks(storage_id, tasks_created, link_class): """ Backfill initial FSM states for tasks created during storage sync. This function creates initial CREATED state records for all tasks that were created during a storage sync operation. It's designed to be called after tasks have been successfully created and linked to storage. Args: storage_id: The ID of the storage that created the tasks tasks_created: Number of tasks that were created link_class: The link model class (e.g., S3ImportStorageLink) to query tasks Note: - CurrentContext must be available before calling this function - This function is safe to call in both LSO and LSE environments - Failures are logged but don't propagate to prevent breaking storage sync """ if tasks_created <= 0: return try: from lse_fsm.state_inference import backfill_state_for_entity from tasks.models import Task # Get tasks created in this sync task_ids = list( link_class.objects.filter(storage=storage_id) .order_by('-created_at')[:tasks_created] .values_list('task_id', flat=True) ) tasks = Task.objects.filter(id__in=task_ids) logger.info(f'Storage sync: creating initial FSM states for {len(task_ids)} tasks') # Backfill initial CREATED state for each task for task in tasks: backfill_state_for_entity(task, 'task', create_record=True) logger.info(f'Storage sync: FSM states created for {len(task_ids)} tasks') except ImportError: # LSE not available (OSS), skip FSM sync logger.debug('LSE not available, skipping FSM state backfill for storage sync') except Exception as e: # Don't fail storage sync if FSM sync fails logger.error(f'FSM sync after storage sync failed: {e}', exc_info=True) def update_task_state_after_annotation_deletion(task, project): """ Update task FSM state after an annotation has been deleted. This function ensures that the task's FSM state reflects its current labeled status after an annotation has been deleted. It will: 1. Check if FSM is enabled 2. Get the current task state 3. Determine the expected state based on task.is_labeled 4. Execute appropriate transition if state doesn't match 5. Update project state if task state was changed Args: task: The Task instance whose annotation was deleted project: The Project instance containing the task Note: - Requires CurrentContext to be set with a valid user - Failures are logged but don't propagate to prevent breaking annotation deletion - Will initialize state if task has no FSM state record yet """ from core.current_request import CurrentContext from fsm.project_transitions import update_project_state_after_task_change from fsm.state_choices import TaskStateChoices from fsm.state_manager import get_state_manager from fsm.utils import is_fsm_enabled # Get user from context for FSM user = CurrentContext.get_user() if not is_fsm_enabled(user=user): return try: StateManager = get_state_manager() # Get current state - may be None if entity has no state record yet current_task_state = StateManager.get_current_state_value(task) # Determine what the state should be based on task's labeled status expected_state = TaskStateChoices.COMPLETED if task.is_labeled else TaskStateChoices.IN_PROGRESS # If no state exists, initialize it based on current condition if current_task_state is None: # Initialize state for entities that existed before FSM was deployed if task.is_labeled: StateManager.execute_transition(entity=task, transition_name='task_completed', user=user) else: StateManager.execute_transition(entity=task, transition_name='task_in_progress', user=user) # Update project state based on task changes update_project_state_after_task_change(project, user=user) # If state exists but doesn't match the task's labeled status, fix it elif current_task_state != expected_state: if expected_state == TaskStateChoices.IN_PROGRESS: StateManager.execute_transition(entity=task, transition_name='task_in_progress', user=user) else: StateManager.execute_transition(entity=task, transition_name='task_completed', user=user) # Update project state based on task changes update_project_state_after_task_change(project, user=user) except Exception as e: # Final safety net - log but don't break annotation deletion logger.warning( f'FSM state update failed during annotation deletion: {str(e)}', extra={'task_id': task.id, 'project_id': project.id}, )