Bin
2025-12-17 dcf780a91c16b6be28635b6e2e0e702060ee19f2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
"""
FSM State Models for Label Studio.
 
This module contains the state model definitions (BaseState and concrete state models).
These are separated from models.py to avoid registration issues in LSE where
extended state models need to be registered instead of the base OSS models.
 
When importing FsmHistoryStateModel, these state models won't be automatically
imported and registered, allowing LSE to register its own extended versions.
"""
 
import logging
from datetime import datetime
from typing import Any, Dict, Optional
 
from django.conf import settings
from django.db import models
from django.db.models import QuerySet, UUIDField
from fsm.registry import register_state_model
from fsm.state_choices import (
    AnnotationStateChoices,
    ProjectStateChoices,
    TaskStateChoices,
)
from fsm.utils import UUID7Field, generate_uuid7, timestamp_from_uuid7
 
logger = logging.getLogger(__name__)
 
 
class BaseState(models.Model):
    """
    Abstract base class for all state models using UUID7 for optimal time-series performance.
 
    This is the core of the FSM system, providing:
    - UUID7 primary key with natural time ordering
    - Standard state transition metadata
    - Audit trail information
    - Context data storage
    - Performance-optimized helper methods
 
    Benefits of this architecture:
    - INSERT-only operations for maximum concurrency
    - Natural time ordering eliminates need for created_at indexes
    - Global uniqueness enables distributed system support
    - Time-based partitioning for large amounts of state records with consistent performance
    - Complete audit trail by design
    """
 
    # UUID7 Primary Key - provides natural time ordering and global uniqueness
    id = UUIDField(
        primary_key=True,
        default=generate_uuid7,
        editable=False,
        help_text='UUID7 provides natural time ordering and global uniqueness',
    )
 
    # Optional organization field - can be overridden or left null
    # Applications can add their own organization/tenant fields as needed
    organization_id = models.PositiveIntegerField(
        null=True,
        blank=True,
        db_index=True,
        help_text='Organization ID that owns this state record (for multi-tenant applications)',
    )
 
    # Core State Fields
    state = models.CharField(max_length=50, db_index=True, help_text='Current state of the entity')
    previous_state = models.CharField(
        max_length=50, null=True, blank=True, help_text='Previous state before this transition'
    )
 
    # Transition Metadata
    transition_name = models.CharField(
        max_length=100,
        null=True,
        blank=True,
        help_text='Name of the transition method that triggered this state change',
    )
    triggered_by = models.ForeignKey(
        settings.AUTH_USER_MODEL,
        on_delete=models.SET_NULL,
        null=True,
        help_text='User who triggered this state transition',
    )
 
    # Context & Audit
    context_data = models.JSONField(
        default=dict, help_text='Additional context data for this transition (e.g., validation results, external IDs)'
    )
    reason = models.TextField(blank=True, help_text='Human-readable reason for this state transition')
 
    # Timestamp (redundant with UUID7 but useful for human readability)
    created_at = models.DateTimeField(
        auto_now_add=True,
        db_index=False,  # UUID7 provides natural ordering, no index needed
        help_text='Human-readable timestamp for debugging (UUID7 id contains precise timestamp)',
    )
 
    class Meta:
        abstract = True
        # UUID7 provides natural ordering, reducing index requirements
        ordering = ['-id']  # Most recent first
        get_latest_by = 'id'
 
    def __str__(self):
        entity_id = getattr(self, f'{self._get_entity_name()}_id', 'unknown')
        return f'{self._get_entity_name().title()} {entity_id}: {self.previous_state} → {self.state}'
 
    @property
    def entity(self):
        """Get the related entity object"""
        entity_name = self._get_entity_name()
        return getattr(self, entity_name)
 
    @property
    def timestamp_from_uuid(self) -> datetime:
        """Extract timestamp from UUID7 ID"""
        return timestamp_from_uuid7(self.id)
 
    @property
    def is_terminal_state(self) -> bool:
        """
        Check if this is a terminal state (no outgoing transitions).
 
        Override in subclasses with specific terminal states.
        """
        return False
 
    def _get_entity_name(self) -> str:
        """Extract entity name from model name (e.g., TaskState → task)"""
        model_name = self.__class__.__name__
        if model_name.endswith('State'):
            return model_name[:-5].lower()
        return 'entity'
 
    @classmethod
    def get_current_state(cls, entity) -> Optional['BaseState']:
        """
        Get current state using UUID7 natural ordering.
 
        Uses UUID7's natural time ordering to efficiently find the latest state
        without requiring created_at indexes or complex queries.
        """
        entity_field = f'{cls._get_entity_field_name()}'
        return cls.objects.filter(**{entity_field: entity}).order_by('-id').first()
 
    @classmethod
    def get_current_state_value(cls, entity) -> Optional[str]:
        """
        Get current state value as string using UUID7 natural ordering.
 
        Uses UUID7's natural time ordering to efficiently find the latest state
        without requiring created_at indexes or complex queries.
        """
        entity_field = f'{cls._get_entity_field_name()}'
        current_state = cls.objects.filter(**{entity_field: entity}).order_by('-id').first()
        return current_state.state if current_state else None
 
    @classmethod
    def get_state_history(cls, entity) -> QuerySet['BaseState']:
        """Get complete state history for an entity"""
        entity_field = f'{cls._get_entity_field_name()}'
        return cls.objects.filter(**{entity_field: entity}).order_by('-id')
 
    @classmethod
    def get_states_in_range(cls, entity, start_time: datetime, end_time: datetime) -> QuerySet['BaseState']:
        """
        Efficient time-range queries using UUID7.
 
        Uses UUID7's embedded timestamp for direct time-based filtering
        without requiring timestamp indexes.
        """
        entity_field = f'{cls._get_entity_field_name()}'
        queryset = cls.objects.filter(**{entity_field: entity})
        return UUID7Field.filter_by_time_range(queryset, start_time, end_time).order_by('id')
 
    @classmethod
    def get_states_since(cls, entity, since: datetime):
        """Get all states since a specific timestamp"""
        entity_field = f'{cls._get_entity_field_name()}'
        queryset = cls.objects.filter(**{entity_field: entity})
        return UUID7Field.filter_since_time(queryset, since).order_by('id')
 
    @classmethod
    def get_denormalized_fields(cls, entity) -> Dict[str, Any]:
        """
        Get denormalized fields to include in the state record.
 
        Override this method in subclasses to provide denormalized data
        that should be stored with each state transition for performance
        optimization and auditing purposes.
 
        Args:
            entity: The entity instance being transitioned
 
        Returns:
            Dictionary of field names to values that should be stored
            in the state record
 
        Example:
            @classmethod
            def get_denormalized_fields(cls, entity):
                return {
                    'project_id': entity.project_id,
                    'organization_id': entity.project.organization_id,
                    'task_type': entity.task_type,
                    'priority': entity.priority
                }
        """
        return {}
 
    @classmethod
    def get_entity_model(cls) -> models.Model:
        """Get the entity model for the state model"""
        field_name = cls._get_entity_field_name()
        return cls._meta.get_field(field_name).related_model
 
    @classmethod
    def _get_entity_field_name(cls) -> str:
        """Get the foreign key field name for the entity"""
        model_name = cls.__name__
        if model_name.endswith('State'):
            return model_name[:-5].lower()
        return 'entity'
 
 
# =============================================================================
# Core State Models for Label Studio OSS
# =============================================================================
# Note: These are registered here for OSS. LSE will register its own extended
# versions in lse_fsm/models.py instead of importing these.
 
 
@register_state_model('task')
class TaskState(BaseState):
    """
    Core task state tracking for Label Studio.
    Provides basic task state management with:
    - Simple 3-state workflow (CREATED → IN_PROGRESS → COMPLETED)
    - High-performance queries with UUID7 ordering
    """
 
    # Entity Relationship
    task = models.ForeignKey('tasks.Task', related_name='fsm_states', on_delete=models.CASCADE)
 
    # Override state field to add choices constraint
    state = models.CharField(max_length=50, choices=TaskStateChoices.choices, db_index=True)
 
    project_id = models.PositiveIntegerField(
        db_index=True, help_text='From task.project_id - denormalized for performance'
    )
 
    class Meta:
        app_label = 'fsm'
        indexes = [
            # Critical: Latest state lookup (current state determined by latest UUID7 id)
            # Index with DESC order explicitly supports ORDER BY id DESC queries
            models.Index(fields=['task_id', '-id'], name='task_current_state_idx'),
            # Reporting and filtering
            models.Index(fields=['project_id', 'state', '-id'], name='task_project_state_idx'),
            models.Index(fields=['organization_id', 'state', '-id'], name='task_org_reporting_idx'),
            # History queries
            models.Index(fields=['task_id', 'id'], name='task_history_idx'),
        ]
        # No constraints needed - INSERT-only approach
        ordering = ['-id']
 
    @classmethod
    def get_denormalized_fields(cls, entity):
        """Get denormalized fields for TaskState creation"""
        return {
            'project_id': entity.project_id,
        }
 
    @property
    def is_terminal_state(self) -> bool:
        """Check if this is a terminal task state"""
        return self.state == TaskStateChoices.COMPLETED
 
 
@register_state_model('annotation')
class AnnotationState(BaseState):
    """
    Core annotation state tracking for Label Studio.
    Provides basic annotation state management with:
    - Simple 3-state workflow (DRAFT → SUBMITTED → COMPLETED)
    """
 
    # Entity Relationship
    annotation = models.ForeignKey('tasks.Annotation', on_delete=models.CASCADE, related_name='fsm_states')
 
    # Override state field to add choices constraint
    state = models.CharField(max_length=50, choices=AnnotationStateChoices.choices, db_index=True)
 
    # Denormalized fields for performance (avoid JOINs in common queries)
    task_id = models.PositiveIntegerField(
        db_index=True, help_text='From annotation.task_id - denormalized for performance'
    )
    project_id = models.PositiveIntegerField(
        db_index=True, help_text='From annotation.task.project_id - denormalized for performance'
    )
    completed_by_id = models.PositiveIntegerField(
        null=True, db_index=True, help_text='From annotation.completed_by_id - denormalized for performance'
    )
 
    class Meta:
        app_label = 'fsm'
        indexes = [
            # Critical: Latest state lookup
            models.Index(fields=['annotation_id', '-id'], name='anno_current_state_idx'),
            # Filtering and reporting
            models.Index(fields=['task_id', 'state', '-id'], name='anno_task_state_idx'),
            models.Index(fields=['completed_by_id', 'state', '-id'], name='anno_user_report_idx'),
            models.Index(fields=['project_id', 'state', '-id'], name='anno_project_report_idx'),
        ]
        ordering = ['-id']
 
    @classmethod
    def get_denormalized_fields(cls, entity):
        """Get denormalized fields for AnnotationState creation"""
        return {
            'task_id': entity.task.id,
            'project_id': entity.task.project_id,
            'completed_by_id': entity.completed_by_id if entity.completed_by_id else None,
        }
 
    @property
    def is_terminal_state(self) -> bool:
        """Check if this is a terminal annotation state"""
        return self.state == AnnotationStateChoices.COMPLETED
 
 
@register_state_model('project')
class ProjectState(BaseState):
    """
    Core project state tracking for Label Studio.
    Provides basic project state management with:
    - Simple 3-state workflow (CREATED → IN_PROGRESS → COMPLETED)
    - Project lifecycle tracking
    """
 
    # Entity Relationship
    project = models.ForeignKey('projects.Project', on_delete=models.CASCADE, related_name='fsm_states')
 
    # Override state field to add choices constraint
    state = models.CharField(max_length=50, choices=ProjectStateChoices.choices, db_index=True)
 
    created_by_id = models.PositiveIntegerField(
        null=True, db_index=True, help_text='From project.created_by_id - denormalized for performance'
    )
 
    class Meta:
        app_label = 'fsm'
        indexes = [
            # Critical: Latest state lookup
            models.Index(fields=['project_id', '-id'], name='project_current_state_idx'),
            # Filtering and reporting
            models.Index(fields=['organization_id', 'state', '-id'], name='project_org_state_idx'),
            models.Index(fields=['organization_id', '-id'], name='project_org_reporting_idx'),
        ]
        ordering = ['-id']
 
    @classmethod
    def get_denormalized_fields(cls, entity):
        """Get denormalized fields for ProjectState creation"""
        return {
            'created_by_id': entity.created_by_id if entity.created_by_id else None,
        }
 
    @property
    def is_terminal_state(self) -> bool:
        """Check if this is a terminal project state"""
        return self.state == ProjectStateChoices.COMPLETED