"""
|
FSM utility functions.
|
|
This module provides:
|
1. UUID7 utilities for time-series optimization (uses uuid-utils library)
|
2. FSM-specific helper functions for organization resolution and state management
|
|
UUID7 provides natural time ordering and global uniqueness, making it ideal
|
for INSERT-only architectures with millions of records.
|
"""
|
|
import logging
|
import uuid
|
from datetime import datetime, timezone
|
from typing import Optional, Tuple
|
|
import uuid_utils
|
from core.current_request import CurrentContext
|
|
logger = logging.getLogger(__name__)
|
|
|
# =============================================================================
|
# UUID7 Utilities (using uuid-utils library)
|
# =============================================================================
|
|
|
def generate_uuid7() -> uuid.UUID:
|
"""
|
Generate a UUID7 with embedded timestamp for natural time ordering.
|
|
UUID7 embeds the timestamp in the first 48 bits, providing:
|
- Natural chronological ordering without additional indexes
|
- Global uniqueness across distributed systems
|
- Time-based partitioning capabilities
|
|
Returns:
|
UUID7 instance with embedded timestamp
|
"""
|
# Use uuid-utils library for RFC 9562 compliant UUID7 generation
|
# Convert to standard uuid.UUID to maintain type consistency
|
uuid7_obj = uuid_utils.uuid7()
|
return uuid.UUID(str(uuid7_obj))
|
|
|
def timestamp_from_uuid7(uuid7_id: uuid.UUID) -> datetime:
|
"""
|
Extract timestamp from UUID7 ID.
|
|
Args:
|
uuid7_id: UUID7 instance to extract timestamp from
|
|
Returns:
|
datetime: Timestamp embedded in the UUID7
|
|
Example:
|
uuid7_id = generate_uuid7()
|
timestamp = timestamp_from_uuid7(uuid7_id)
|
# timestamp is when the UUID7 was generated
|
"""
|
# UUID7 embeds timestamp in first 48 bits
|
timestamp_ms = (uuid7_id.int >> 80) & ((1 << 48) - 1)
|
# Return with millisecond precision (UUID7 spec)
|
return datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
|
|
|
def uuid7_time_range(start_time: datetime, end_time: Optional[datetime] = None) -> Tuple[uuid.UUID, uuid.UUID]:
|
"""
|
Generate UUID7 range for time-based queries.
|
|
Creates UUID7 boundaries for efficient time-range filtering without
|
requiring timestamp indexes.
|
|
Args:
|
start_time: Start of time range
|
end_time: End of time range (defaults to now)
|
|
Returns:
|
Tuple of (start_uuid, end_uuid) for range queries
|
|
Example:
|
start_uuid, end_uuid = uuid7_time_range(
|
datetime(2024, 1, 1),
|
datetime(2024, 1, 2)
|
)
|
# Query: WHERE id >= start_uuid AND id <= end_uuid
|
"""
|
if end_time is None:
|
end_time = datetime.now(timezone.utc)
|
|
# Add a small buffer to account for timing precision issues
|
start_timestamp_ms = int(start_time.timestamp() * 1000) - 1 # 1ms buffer before
|
end_timestamp_ms = int(end_time.timestamp() * 1000) + 1 # 1ms buffer after
|
|
# Create UUID7 with specific timestamp using proper bit layout
|
# UUID7 format: timestamp_ms(48) + ver(4) + rand_a(12) + var(2) + rand_b(62)
|
start_uuid = uuid.UUID(int=(start_timestamp_ms << 80) | (0x7 << 76) | (0b10 << 62))
|
end_uuid = uuid.UUID(int=(end_timestamp_ms << 80) | (0x7 << 76) | (0b10 << 62) | ((1 << 62) - 1))
|
|
return start_uuid, end_uuid
|
|
|
def uuid7_from_timestamp(timestamp: datetime) -> uuid.UUID:
|
"""
|
Generate UUID7 from specific timestamp for range queries.
|
|
Args:
|
timestamp: Timestamp to embed in UUID7
|
|
Returns:
|
UUID7 with embedded timestamp
|
|
Example:
|
# Get all states from the last hour
|
start_time = timezone.now() - timedelta(hours=1)
|
start_uuid = uuid7_from_timestamp(start_time)
|
states = StateModel.objects.filter(id__gte=start_uuid)
|
"""
|
# Convert to milliseconds since epoch as uuid-utils expects
|
timestamp_ms = int(timestamp.timestamp() * 1000)
|
|
# Use uuid-utils with specific timestamp for range queries
|
# This creates a UUID7 with the given timestamp and minimal random bits
|
# for consistent range boundaries
|
return uuid.UUID(int=(timestamp_ms << 80) | (0x7 << 76) | (0b10 << 62))
|
|
|
def validate_uuid7(uuid_value: uuid.UUID) -> bool:
|
"""
|
Validate that a UUID is a valid UUID7.
|
|
Args:
|
uuid_value: UUID to validate
|
|
Returns:
|
True if valid UUID7, False otherwise
|
"""
|
return uuid_value.version == 7
|
|
|
class UUID7Field:
|
"""
|
Custom field utilities for UUID7 handling in Django models.
|
|
Provides helper methods for UUID7-specific operations that can be
|
used by models inheriting from BaseState.
|
"""
|
|
@staticmethod
|
def get_latest_by_uuid7(queryset):
|
"""Get latest record using UUID7 natural ordering"""
|
return queryset.order_by('-id').first()
|
|
@staticmethod
|
def filter_by_time_range(queryset, start_time: datetime, end_time: Optional[datetime] = None):
|
"""Filter queryset by time range using UUID7 embedded timestamps"""
|
start_uuid, end_uuid = uuid7_time_range(start_time, end_time)
|
return queryset.filter(id__gte=start_uuid, id__lte=end_uuid)
|
|
@staticmethod
|
def filter_since_time(queryset, since: datetime):
|
"""Filter queryset for records since a specific time"""
|
start_uuid = uuid7_from_timestamp(since)
|
return queryset.filter(id__gte=start_uuid)
|
|
|
class UUID7Generator:
|
"""
|
UUID7 generator with optional custom timestamp.
|
|
Useful for testing or when you need to generate UUIDs with specific timestamps.
|
"""
|
|
def __init__(self, base_timestamp: Optional[datetime] = None):
|
"""
|
Initialize generator with optional base timestamp.
|
|
Args:
|
base_timestamp: Base timestamp to use (defaults to current time)
|
"""
|
self.base_timestamp = base_timestamp or datetime.now(timezone.utc)
|
self._counter = 0
|
|
def generate(self, offset_ms: int = 0) -> uuid.UUID:
|
"""
|
Generate UUID7 with timestamp offset.
|
|
Args:
|
offset_ms: Millisecond offset from base timestamp
|
|
Returns:
|
UUID7 with adjusted timestamp
|
"""
|
# For offset timestamps, use manual construction for precise control
|
timestamp_ms = int(self.base_timestamp.timestamp() * 1000) + offset_ms
|
self._counter += 1
|
|
# Create UUID7 with specific timestamp and counter for monotonicity
|
# UUID7 format: timestamp_ms(48) + ver(4) + rand_a(12) + var(2) + rand_b(62)
|
uuid_int = (timestamp_ms << 80) | (0x7 << 76) | ((self._counter & 0xFFF) << 64) | (0b10 << 62)
|
return uuid.UUID(int=uuid_int)
|
|
|
# =============================================================================
|
# FSM Helper Utilities
|
# =============================================================================
|
|
|
def resolve_organization_id(entity=None, user=None):
|
"""
|
Resolve organization_id using consistent logic without additional queries.
|
|
This provides organization_id resolution for logging and state tracking
|
without duplicating database queries.
|
|
Args:
|
entity: The entity to resolve organization_id for
|
user: Optional user for fallback organization resolution
|
|
Returns:
|
organization_id or None
|
"""
|
# Try context cache first
|
organization_id = CurrentContext.get_organization_id()
|
if organization_id:
|
return organization_id
|
|
# Allow for function calls without entity
|
if entity is None:
|
return None
|
|
# Try direct organization_id attribute first
|
organization_id = getattr(entity, 'organization_id', None)
|
|
# If entity doesn't have direct organization_id, try relationships
|
if not organization_id:
|
# For entities with project relationship (most common case)
|
if hasattr(entity, 'project') and entity.project:
|
organization_id = getattr(entity.project, 'organization_id', None)
|
# For entities with task.project relationship
|
elif hasattr(entity, 'task') and entity.task and hasattr(entity.task, 'project') and entity.task.project:
|
organization_id = getattr(entity.task.project, 'organization_id', None)
|
|
# Fallback to user's active organization
|
if not organization_id and user and hasattr(user, 'active_organization') and user.active_organization:
|
organization_id = user.active_organization.id
|
|
# Cache the result in current context if we found an organization_id
|
if organization_id is not None:
|
CurrentContext.set_organization_id(organization_id)
|
|
return organization_id
|
|
|
def is_fsm_enabled(user=None) -> bool:
|
"""
|
Check if FSM is enabled via feature flags and thread-local override.
|
|
PERFORMANCE: This function now checks the cached FSM state that was set
|
when the user was first initialized in CurrentContext. This avoids repeated
|
feature flag lookups throughout the request.
|
|
The check order is:
|
1. Check thread-local override (for test cleanup, bulk operations)
|
2. Check cached feature flag state (set once per request)
|
3. Fallback to direct feature flag check (for edge cases without context)
|
|
Args:
|
user: User for feature flag evaluation (optional, used as fallback only)
|
|
Returns:
|
True if FSM should be active
|
"""
|
# Fast path: Check cached state from CurrentContext
|
# This is set once per request when user is initialized
|
return CurrentContext.is_fsm_enabled()
|
|
|
def get_current_state_safe(entity, user=None) -> Optional[str]:
|
"""
|
Safely get current state with error handling.
|
|
Args:
|
entity: The entity to get state for
|
user: The user making the request (for feature flag checking)
|
|
Returns:
|
Current state string or None if failed
|
"""
|
if not is_fsm_enabled(user):
|
return None
|
|
try:
|
from fsm.state_manager import get_state_manager
|
|
StateManager = get_state_manager()
|
return StateManager.get_current_state_value(entity)
|
except Exception as e:
|
logger.warning(
|
f'Failed to get current state for {entity._meta.label_lower} {entity.pk}: {str(e)}',
|
extra={
|
'event': 'fsm.get_state_error',
|
'entity_type': entity._meta.label_lower,
|
'entity_id': entity.pk,
|
'organization_id': resolve_organization_id(entity, user),
|
'error': str(e),
|
},
|
)
|
return None
|
|
|
def infer_entity_state_from_data(entity) -> Optional[str]:
|
"""
|
Infer what the FSM state should be based on entity's current data.
|
|
This is used for "cold start" scenarios where entities exist in the database
|
but don't have FSM state records yet (e.g., after FSM deployment to production
|
with pre-existing data).
|
|
Args:
|
entity: The entity to infer state for (Task, Project, or Annotation)
|
|
Returns:
|
Inferred state value, or None if entity type not supported
|
|
Examples:
|
>>> task = Task.objects.get(id=123)
|
>>> task.is_labeled = True
|
>>> infer_entity_state_from_data(task)
|
'COMPLETED'
|
|
>>> project = Project.objects.get(id=456)
|
>>> infer_entity_state_from_data(project)
|
'CREATED'
|
"""
|
from fsm.state_choices import AnnotationStateChoices, ProjectStateChoices, TaskStateChoices
|
|
entity_type = entity._meta.model_name.lower()
|
|
if entity_type == 'task':
|
# Task state depends on whether it has been labeled
|
return TaskStateChoices.COMPLETED if entity.is_labeled else TaskStateChoices.CREATED
|
elif entity_type == 'project':
|
# Project state depends on task completion
|
# If no tasks exist, project is CREATED
|
# If any tasks are completed, project is at least IN_PROGRESS
|
# If all tasks are completed, project is COMPLETED
|
tasks = entity.tasks.all()
|
if not tasks.exists():
|
return ProjectStateChoices.CREATED
|
|
# Count labeled tasks to determine project state
|
total_tasks = tasks.count()
|
labeled_tasks = tasks.filter(is_labeled=True).count()
|
|
if labeled_tasks == 0:
|
return ProjectStateChoices.CREATED
|
elif labeled_tasks == total_tasks:
|
return ProjectStateChoices.COMPLETED
|
else:
|
return ProjectStateChoices.IN_PROGRESS
|
elif entity_type == 'annotation':
|
# Annotations are SUBMITTED when created
|
return AnnotationStateChoices.SUBMITTED
|
else:
|
logger.warning(
|
f'Cannot infer state for unknown entity type: {entity_type}',
|
extra={
|
'event': 'fsm.infer_state_unknown_type',
|
'entity_type': entity_type,
|
'entity_id': entity.pk,
|
},
|
)
|
return None
|
|
|
def get_or_initialize_state(entity, user=None, inferred_state=None, reason=None, context_data=None) -> Optional[str]:
|
"""
|
Get current state, or initialize it if it doesn't exist.
|
|
This function handles "cold start" scenarios where pre-existing entities
|
don't have FSM state records. It will:
|
1. Try to get the current state
|
2. If None, infer the state from entity data
|
3. Initialize the state with an appropriate transition
|
4. Return the state value (never returns None if initialization succeeds)
|
|
Args:
|
entity: The entity to get or initialize state for
|
user: User for FSM context (optional)
|
inferred_state: Pre-computed inferred state (optional, will compute if not provided)
|
reason: Custom reason for the state initialization (optional, overrides default)
|
context_data: Additional context data to store with state record (optional)
|
|
Returns:
|
Current or newly initialized state value, or None if FSM disabled or failed
|
|
Examples:
|
>>> task = Task.objects.get(id=123) # Pre-existing task without state
|
>>> state = get_or_initialize_state(task, user=request.user)
|
>>> # state is now 'COMPLETED' or 'CREATED' based on task.is_labeled
|
>>> # and a state record has been created
|
"""
|
if not is_fsm_enabled(user):
|
return None
|
|
# Skip if FSM is temporarily disabled (e.g., during cleanup or bulk operations)
|
if CurrentContext.is_fsm_disabled():
|
return inferred_state # Return inferred state without persisting
|
|
try:
|
from fsm.state_manager import get_state_manager
|
|
StateManager = get_state_manager()
|
|
# Try to get existing state
|
current_state = StateManager.get_current_state_value(entity)
|
|
if current_state is not None:
|
# State already exists, return it
|
return current_state
|
|
# No state exists - need to initialize it
|
if inferred_state is None:
|
inferred_state = infer_entity_state_from_data(entity)
|
|
if inferred_state is None:
|
logger.warning(
|
f'Cannot initialize state for {entity._meta.model_name} {entity.pk} - inference failed',
|
extra={
|
'event': 'fsm.initialize_state_failed',
|
'entity_type': entity._meta.model_name,
|
'entity_id': entity.pk,
|
},
|
)
|
return None
|
|
# Initialize state with appropriate transition
|
entity_type = entity._meta.model_name.lower()
|
transition_name = _get_initialization_transition_name(entity_type, inferred_state)
|
|
if transition_name:
|
logger.info(
|
f'Initializing FSM state for pre-existing {entity_type} {entity.pk}',
|
extra={
|
'event': 'fsm.cold_start_initialization',
|
'entity_type': entity_type,
|
'entity_id': entity.pk,
|
'inferred_state': inferred_state,
|
'transition_name': transition_name,
|
},
|
)
|
# Pass reason and context_data if provided (flow through to TransitionContext)
|
StateManager.execute_transition(
|
entity=entity,
|
transition_name=transition_name,
|
user=user,
|
reason=reason,
|
context_data=context_data or {},
|
)
|
return inferred_state
|
else:
|
logger.warning(
|
f'No initialization transition found for {entity_type} -> {inferred_state}',
|
extra={
|
'event': 'fsm.no_initialization_transition',
|
'entity_type': entity_type,
|
'entity_id': entity.pk,
|
'inferred_state': inferred_state,
|
},
|
)
|
return None
|
|
except Exception as e:
|
logger.error(
|
f'Failed to get or initialize state for {entity._meta.model_name} {entity.pk}: {str(e)}',
|
extra={
|
'event': 'fsm.get_or_initialize_error',
|
'entity_type': entity._meta.model_name,
|
'entity_id': entity.pk,
|
'error': str(e),
|
},
|
exc_info=True,
|
)
|
return None
|
|
|
def _get_initialization_transition_name(entity_type: str, target_state: str) -> Optional[str]:
|
"""
|
Get the appropriate transition name for initializing an entity to a target state.
|
|
Args:
|
entity_type: Type of entity ('task', 'project', 'annotation')
|
target_state: The target state to initialize to
|
|
Returns:
|
Transition name, or None if no appropriate transition exists
|
"""
|
from fsm.state_choices import AnnotationStateChoices, ProjectStateChoices, TaskStateChoices
|
|
if entity_type == 'task':
|
if target_state == TaskStateChoices.CREATED:
|
return 'task_created'
|
elif target_state == TaskStateChoices.COMPLETED:
|
return 'task_completed'
|
elif target_state == TaskStateChoices.IN_PROGRESS:
|
return 'task_in_progress'
|
elif entity_type == 'project':
|
if target_state == ProjectStateChoices.CREATED:
|
return 'project_created'
|
elif target_state == ProjectStateChoices.IN_PROGRESS:
|
return 'project_in_progress'
|
elif target_state == ProjectStateChoices.COMPLETED:
|
return 'project_completed'
|
elif entity_type == 'annotation':
|
if target_state == AnnotationStateChoices.SUBMITTED:
|
return 'annotation_submitted'
|
elif target_state == AnnotationStateChoices.COMPLETED:
|
return 'annotation_submitted' # Use submitted transition for initialization
|
|
return None
|