"""
|
Integration tests for declarative transitions with real Django models.
|
These tests demonstrate how the transition system integrates with actual
|
Django models and the StateManager, providing realistic usage examples.
|
"""
|
|
from datetime import datetime, timezone
|
from typing import Any, Dict, Optional
|
from unittest.mock import Mock, patch
|
|
from core.current_request import CurrentContext
|
from django.contrib.auth import get_user_model
|
from django.test import TestCase
|
from fsm.registry import register_state_transition
|
from fsm.state_choices import AnnotationStateChoices, TaskStateChoices
|
from fsm.state_models import TaskState
|
from fsm.transitions import BaseTransition, TransitionContext, TransitionValidationError
|
from organizations.models import Organization
|
from projects.models import Project
|
from pydantic import Field
|
|
|
# Mock Django models for integration testing
|
class MockDjangoTask:
|
"""Mock Django Task model with realistic attributes"""
|
|
def __init__(self, pk=1, project_id=1, organization_id=1):
|
self.pk = pk
|
self.id = pk
|
self.project_id = project_id
|
self.organization_id = organization_id
|
self._meta = Mock()
|
self._meta.model_name = 'task'
|
self._meta.label_lower = 'tasks.task'
|
|
# Mock task attributes
|
self.data = {'text': 'Sample task data'}
|
self.created_at = datetime.now()
|
self.updated_at = datetime.now()
|
|
|
class MockDjangoAnnotation:
|
"""Mock Django Annotation model with realistic attributes"""
|
|
def __init__(self, pk=1, task_id=1, project_id=1, organization_id=1):
|
self.pk = pk
|
self.id = pk
|
self.task_id = task_id
|
self.project_id = project_id
|
self.organization_id = organization_id
|
self._meta = Mock()
|
self._meta.model_name = 'annotation'
|
self._meta.label_lower = 'tasks.annotation'
|
|
# Mock annotation attributes
|
self.result = [{'value': {'text': ['Sample annotation']}}]
|
self.completed_by_id = None
|
self.created_at = datetime.now()
|
self.updated_at = datetime.now()
|
|
|
User = get_user_model()
|
|
|
class DjangoModelIntegrationTests(TestCase):
|
"""
|
Integration tests demonstrating realistic usage with Django models.
|
These tests show how to implement transitions that work with actual
|
Django model patterns and the StateManager integration.
|
"""
|
|
def setUp(self):
|
self.task = MockDjangoTask()
|
self.annotation = MockDjangoAnnotation()
|
self.user = Mock()
|
self.user.id = 123
|
self.user.username = 'integration_test_user'
|
|
@patch('fsm.registry.get_state_model_for_entity')
|
@patch('fsm.state_manager.StateManager.get_current_state_object')
|
@patch('fsm.state_manager.StateManager.transition_state')
|
def test_task_workflow_integration(self, mock_transition_state, mock_get_state_obj, mock_get_state_model):
|
"""
|
INTEGRATION TEST: Complete task workflow using Django models
|
Demonstrates a realistic task lifecycle from creation through completion
|
using the declarative transition system with Django model integration.
|
"""
|
|
# Setup mocks to simulate Django model behavior
|
mock_get_state_model.return_value = TaskState
|
mock_get_state_obj.return_value = None # No existing state (initial transition)
|
mock_transition_state.return_value = True
|
|
# Define task workflow transitions
|
@register_state_transition('task', 'create_task')
|
class CreateTaskTransition(BaseTransition):
|
"""Initial task creation transition"""
|
|
created_by_id: int = Field(..., description='User creating the task')
|
initial_priority: str = Field('normal', description='Initial task priority')
|
|
def get_target_state(self, context: Optional[TransitionContext] = None) -> str:
|
return TaskStateChoices.CREATED
|
|
def validate_transition(self, context: TransitionContext) -> bool:
|
# Validate initial creation
|
if not context.is_initial_transition:
|
raise TransitionValidationError('CreateTask can only be used for initial state')
|
return True
|
|
def transition(self, context: TransitionContext) -> Dict[str, Any]:
|
return {
|
'created_by_id': self.created_by_id,
|
'initial_priority': self.initial_priority,
|
'task_data': getattr(context.entity, 'data', {}),
|
'project_id': getattr(context.entity, 'project_id', None),
|
'creation_method': 'declarative_transition',
|
}
|
|
@register_state_transition('task', 'assign_and_start')
|
class AssignAndStartTaskTransition(BaseTransition):
|
"""Assign task to user and start work"""
|
|
assignee_id: int = Field(..., description='User assigned to task')
|
estimated_hours: float = Field(None, ge=0.1, description='Estimated work hours')
|
priority: str = Field('normal', description='Task priority')
|
|
def get_target_state(self, context: Optional[TransitionContext] = None) -> str:
|
return TaskStateChoices.IN_PROGRESS
|
|
def validate_transition(self, context: TransitionContext) -> bool:
|
valid_from_states = [TaskStateChoices.CREATED]
|
if context.current_state not in valid_from_states:
|
raise TransitionValidationError(
|
f'Can only assign tasks from states: {valid_from_states}',
|
{'current_state': context.current_state, 'valid_states': valid_from_states},
|
)
|
|
# Business rule: Can't assign to the same user who created it
|
if hasattr(context, 'current_state_object') and context.current_state_object:
|
creator_id = context.current_state_object.context_data.get('created_by_id')
|
if creator_id == self.assignee_id:
|
raise TransitionValidationError(
|
'Cannot assign task to the same user who created it',
|
{'creator_id': creator_id, 'assignee_id': self.assignee_id},
|
)
|
|
return True
|
|
def transition(self, context: TransitionContext) -> Dict[str, Any]:
|
return {
|
'assignee_id': self.assignee_id,
|
'estimated_hours': self.estimated_hours,
|
'priority': self.priority,
|
'assigned_at': context.timestamp.isoformat(),
|
'assigned_by_id': context.current_user.id if context.current_user else None,
|
'work_started': True,
|
}
|
|
@register_state_transition('task', 'complete_with_quality')
|
class CompleteTaskWithQualityTransition(BaseTransition):
|
"""Complete task with quality metrics"""
|
|
quality_score: float = Field(..., ge=0.0, le=1.0, description='Quality score')
|
completion_notes: str = Field('', description='Completion notes')
|
actual_hours: float = Field(None, ge=0.0, description='Actual hours worked')
|
|
def get_target_state(self, context: Optional[TransitionContext] = None) -> str:
|
return TaskStateChoices.COMPLETED
|
|
def validate_transition(self, context: TransitionContext) -> bool:
|
if context.current_state != TaskStateChoices.IN_PROGRESS:
|
raise TransitionValidationError(
|
'Can only complete tasks that are in progress', {'current_state': context.current_state}
|
)
|
|
# Quality check
|
if self.quality_score < 0.6:
|
raise TransitionValidationError(
|
f'Quality score too low: {self.quality_score}. Minimum required: 0.6'
|
)
|
|
return True
|
|
def post_transition_hook(self, context: TransitionContext, state_record) -> None:
|
"""Post-completion tasks like notifications"""
|
# Mock notification system
|
if hasattr(self, '_notifications'):
|
self._notifications.append(f'Task {context.entity.pk} completed with quality {self.quality_score}')
|
|
def transition(self, context: TransitionContext) -> Dict[str, Any]:
|
# Calculate metrics
|
start_data = context.current_state_object.context_data if context.current_state_object else {}
|
estimated_hours = start_data.get('estimated_hours')
|
|
return {
|
'quality_score': self.quality_score,
|
'completion_notes': self.completion_notes,
|
'actual_hours': self.actual_hours,
|
'estimated_hours': estimated_hours,
|
'completed_at': context.timestamp.isoformat(),
|
'completed_by_id': context.current_user.id if context.current_user else None,
|
'efficiency_ratio': (estimated_hours / self.actual_hours)
|
if (estimated_hours and self.actual_hours)
|
else None,
|
}
|
|
# Execute the complete workflow
|
|
# Step 1: Create task
|
create_transition = CreateTaskTransition(created_by_id=100, initial_priority='high')
|
|
# Test with StateManager integration
|
with patch('fsm.state_manager.StateManager.get_current_state_value') as mock_get_current:
|
mock_get_current.return_value = None # No current state
|
|
context = TransitionContext(
|
entity=self.task,
|
current_user=self.user,
|
current_state=None,
|
target_state=create_transition.get_target_state(),
|
)
|
|
# Validate and execute creation
|
assert create_transition.validate_transition(context) is True
|
creation_data = create_transition.transition(context)
|
|
assert creation_data['created_by_id'] == 100
|
assert creation_data['initial_priority'] == 'high'
|
assert creation_data['creation_method'] == 'declarative_transition'
|
|
# Step 2: Assign and start task
|
mock_current_state = Mock()
|
mock_current_state.context_data = creation_data
|
mock_get_state_obj.return_value = mock_current_state
|
|
assign_transition = AssignAndStartTaskTransition(
|
assignee_id=200, estimated_hours=4.5, priority='urgent' # Different from creator
|
)
|
|
context = TransitionContext(
|
entity=self.task,
|
current_user=self.user,
|
current_state=TaskStateChoices.CREATED,
|
current_state_object=mock_current_state,
|
target_state=assign_transition.get_target_state(),
|
)
|
|
assert assign_transition.validate_transition(context) is True
|
assignment_data = assign_transition.transition(context)
|
|
assert assignment_data['assignee_id'] == 200
|
assert assignment_data['estimated_hours'] == 4.5
|
assert assignment_data['work_started'] is True
|
|
# Step 3: Complete task
|
mock_current_state.context_data = assignment_data
|
|
complete_transition = CompleteTaskWithQualityTransition(
|
quality_score=0.85, completion_notes='Task completed successfully with minor revisions', actual_hours=5.2
|
)
|
complete_transition._notifications = [] # Mock notification system
|
|
context = TransitionContext(
|
entity=self.task,
|
current_user=self.user,
|
current_state=TaskStateChoices.IN_PROGRESS,
|
current_state_object=mock_current_state,
|
target_state=complete_transition.get_target_state(),
|
)
|
|
assert complete_transition.validate_transition(context) is True
|
completion_data = complete_transition.transition(context)
|
|
assert completion_data['quality_score'] == 0.85
|
assert completion_data['actual_hours'] == 5.2
|
assert abs(completion_data['efficiency_ratio'] - (4.5 / 5.2)) < 0.01
|
|
# Test post-hook
|
mock_state_record = Mock()
|
complete_transition.post_transition_hook(context, mock_state_record)
|
assert len(complete_transition._notifications) == 1
|
|
# Verify StateManager calls
|
assert mock_transition_state.call_count == 0 # Not called in our test setup
|
|
def test_annotation_review_workflow_integration(self):
|
"""
|
INTEGRATION TEST: Annotation review workflow
|
Demonstrates a realistic annotation review process using
|
enterprise-grade validation and approval logic.
|
"""
|
|
@register_state_transition('annotation', 'submit_for_review')
|
class SubmitAnnotationForReview(BaseTransition):
|
"""Submit annotation for quality review"""
|
|
annotator_confidence: float = Field(..., ge=0.0, le=1.0, description='Annotator confidence')
|
annotation_time_seconds: int = Field(..., ge=1, description='Time spent annotating')
|
review_requested: bool = Field(True, description='Whether review is requested')
|
|
def get_target_state(self, context: Optional[TransitionContext] = None) -> str:
|
return AnnotationStateChoices.SUBMITTED
|
|
def validate_transition(self, context: TransitionContext) -> bool:
|
# Check annotation has content
|
if not hasattr(context.entity, 'result') or not context.entity.result:
|
raise TransitionValidationError('Cannot submit empty annotation')
|
|
# Business rule: Low confidence annotations must request review
|
if self.annotator_confidence < 0.7 and not self.review_requested:
|
raise TransitionValidationError(
|
'Low confidence annotations must request review',
|
{'confidence': self.annotator_confidence, 'threshold': 0.7},
|
)
|
|
return True
|
|
def transition(self, context: TransitionContext) -> Dict[str, Any]:
|
return {
|
'annotator_confidence': self.annotator_confidence,
|
'annotation_time_seconds': self.annotation_time_seconds,
|
'review_requested': self.review_requested,
|
'annotation_complexity': len(context.entity.result) if context.entity.result else 0,
|
'submitted_at': context.timestamp.isoformat(),
|
'submitted_by_id': context.current_user.id if context.current_user else None,
|
}
|
|
@register_state_transition('annotation', 'review_and_approve')
|
class ReviewAndApproveAnnotation(BaseTransition):
|
"""Review annotation and approve/reject"""
|
|
reviewer_decision: str = Field(..., description='approve, reject, or request_changes')
|
quality_score: float = Field(..., ge=0.0, le=1.0, description='Reviewer quality assessment')
|
review_comments: str = Field('', description='Review comments')
|
corrections_made: bool = Field(False, description='Whether reviewer made corrections')
|
|
def get_target_state(self, context: Optional[TransitionContext] = None) -> str:
|
if self.reviewer_decision == 'approve':
|
return AnnotationStateChoices.COMPLETED
|
else:
|
return AnnotationStateChoices.SUBMITTED # Back to submitted for changes
|
|
def validate_transition(self, context: TransitionContext) -> bool:
|
if context.current_state != AnnotationStateChoices.SUBMITTED:
|
raise TransitionValidationError('Can only review submitted annotations')
|
|
valid_decisions = ['approve', 'reject', 'request_changes']
|
if self.reviewer_decision not in valid_decisions:
|
raise TransitionValidationError(
|
f'Invalid decision: {self.reviewer_decision}', {'valid_decisions': valid_decisions}
|
)
|
|
# Quality score validation based on decision
|
if self.reviewer_decision == 'approve' and self.quality_score < 0.6:
|
raise TransitionValidationError(
|
'Cannot approve annotation with low quality score',
|
{'quality_score': self.quality_score, 'decision': self.reviewer_decision},
|
)
|
|
return True
|
|
def transition(self, context: TransitionContext) -> Dict[str, Any]:
|
# Get submission data for metrics
|
submission_data = context.current_state_object.context_data if context.current_state_object else {}
|
|
return {
|
'reviewer_decision': self.reviewer_decision,
|
'quality_score': self.quality_score,
|
'review_comments': self.review_comments,
|
'corrections_made': self.corrections_made,
|
'reviewed_at': context.timestamp.isoformat(),
|
'reviewed_by_id': context.current_user.id if context.current_user else None,
|
'original_confidence': submission_data.get('annotator_confidence'),
|
'confidence_vs_quality_diff': abs(
|
submission_data.get('annotator_confidence', 0) - self.quality_score
|
),
|
}
|
|
# Execute annotation workflow
|
|
# Step 1: Submit annotation
|
submit_transition = SubmitAnnotationForReview(
|
annotator_confidence=0.9, annotation_time_seconds=300, review_requested=True # 5 minutes
|
)
|
|
context = TransitionContext(
|
entity=self.annotation,
|
current_user=self.user,
|
current_state=AnnotationStateChoices.SUBMITTED,
|
target_state=submit_transition.get_target_state(),
|
)
|
|
assert submit_transition.validate_transition(context) is True
|
submit_data = submit_transition.transition(context)
|
|
assert submit_data['annotator_confidence'] == 0.9
|
assert submit_data['annotation_time_seconds'] == 300
|
assert submit_data['review_requested'] is True
|
assert submit_data['annotation_complexity'] == 1 # Based on mock result
|
|
# Step 2: Review and approve
|
mock_submission_state = Mock()
|
mock_submission_state.context_data = submit_data
|
|
review_transition = ReviewAndApproveAnnotation(
|
reviewer_decision='approve',
|
quality_score=0.85,
|
review_comments='High quality annotation with good coverage',
|
corrections_made=False,
|
)
|
|
context = TransitionContext(
|
entity=self.annotation,
|
current_user=self.user,
|
current_state=AnnotationStateChoices.SUBMITTED,
|
current_state_object=mock_submission_state,
|
target_state=review_transition.get_target_state(),
|
)
|
|
assert review_transition.validate_transition(context) is True
|
assert review_transition.get_target_state() == AnnotationStateChoices.COMPLETED
|
|
review_data = review_transition.transition(context)
|
|
assert review_data['reviewer_decision'] == 'approve'
|
assert review_data['quality_score'] == 0.85
|
assert review_data['original_confidence'] == 0.9
|
assert abs(review_data['confidence_vs_quality_diff'] - 0.05) < 0.01
|
|
# Test rejection scenario
|
reject_transition = ReviewAndApproveAnnotation(
|
reviewer_decision='reject',
|
quality_score=0.3,
|
review_comments='Insufficient annotation quality',
|
corrections_made=False,
|
)
|
|
assert reject_transition.get_target_state() == AnnotationStateChoices.SUBMITTED
|
|
# Test validation failure
|
invalid_review = ReviewAndApproveAnnotation(
|
reviewer_decision='approve', # Trying to approve
|
quality_score=0.5, # But quality too low
|
review_comments='Test',
|
)
|
|
import pytest
|
|
with pytest.raises(TransitionValidationError) as cm:
|
invalid_review.validate_transition(context)
|
|
assert 'Cannot approve annotation with low quality score' in str(cm.value)
|
|
@patch('fsm.state_manager.StateManager.execute_transition')
|
def test_state_manager_bulk_update_integration(self, mock_execute):
|
"""
|
INTEGRATION TEST: StateManager bulk update with Django model integration
|
Shows how to use the StateManager to execute transitions with
|
real Django models and complex business logic.
|
"""
|
|
@register_state_transition('task', 'bulk_update_status')
|
class BulkUpdateTaskStatusTransition(BaseTransition):
|
"""Bulk update task status with metadata"""
|
|
new_status: str = Field(..., description='New status for tasks')
|
update_reason: str = Field(..., description='Reason for bulk update')
|
updated_by_system: bool = Field(False, description='Whether updated by automated system')
|
batch_id: str = Field(None, description='Batch operation ID')
|
|
def get_target_state(self, context: Optional[TransitionContext] = None) -> str:
|
return self.new_status
|
|
def validate_transition(self, context: TransitionContext) -> bool:
|
valid_statuses = [TaskStateChoices.CREATED, TaskStateChoices.IN_PROGRESS, TaskStateChoices.COMPLETED]
|
if self.new_status not in valid_statuses:
|
raise TransitionValidationError(f'Invalid status: {self.new_status}')
|
|
# Can't bulk update to the same status
|
if context.current_state == self.new_status:
|
raise TransitionValidationError('Cannot update to the same status')
|
|
return True
|
|
def transition(self, context: TransitionContext) -> Dict[str, Any]:
|
return {
|
'new_status': self.new_status,
|
'update_reason': self.update_reason,
|
'updated_by_system': self.updated_by_system,
|
'batch_id': self.batch_id,
|
'bulk_update_timestamp': context.timestamp.isoformat(),
|
'previous_status': context.current_state,
|
}
|
|
# Mock successful execution
|
mock_state_record = Mock()
|
mock_state_record.id = 'mock-uuid'
|
mock_execute.return_value = mock_state_record
|
|
# Test StateManager.execute_transition
|
from fsm.state_manager import StateManager
|
|
result = StateManager.execute_transition(
|
entity=self.task,
|
transition_name='bulk_update_status',
|
transition_data={
|
'new_status': TaskStateChoices.IN_PROGRESS,
|
'update_reason': 'Project priority change',
|
'updated_by_system': True,
|
'batch_id': 'batch_2024_001',
|
},
|
user=self.user,
|
project_update=True,
|
notification_level='high',
|
)
|
|
# Verify the call
|
mock_execute.assert_called_once()
|
call_args, call_kwargs = mock_execute.call_args
|
|
# Check call parameters
|
assert call_kwargs['entity'] == self.task
|
assert call_kwargs['transition_name'] == 'bulk_update_status'
|
assert call_kwargs['user'] == self.user
|
|
# Check transition data
|
transition_data = call_kwargs['transition_data']
|
assert transition_data['new_status'] == TaskStateChoices.IN_PROGRESS
|
assert transition_data['update_reason'] == 'Project priority change'
|
assert transition_data['updated_by_system'] is True
|
assert transition_data['batch_id'] == 'batch_2024_001'
|
|
# Check context
|
assert call_kwargs['project_update'] is True
|
assert call_kwargs['notification_level'] == 'high'
|
|
# Check return value
|
assert result == mock_state_record
|
|
def test_error_handling_with_django_models(self):
|
"""
|
INTEGRATION TEST: Error handling with Django model validation
|
Tests comprehensive error handling scenarios that might occur
|
in real Django model integration.
|
"""
|
|
@register_state_transition('task', 'assign_with_constraints')
|
class AssignTaskWithConstraints(BaseTransition):
|
"""Task assignment with business constraints"""
|
|
assignee_id: int = Field(..., description='User to assign to')
|
max_concurrent_tasks: int = Field(5, description='Max concurrent tasks per user')
|
skill_requirements: list = Field(default_factory=list, description='Required skills')
|
|
def get_target_state(self, context: Optional[TransitionContext] = None) -> str:
|
return TaskStateChoices.IN_PROGRESS
|
|
def validate_transition(self, context: TransitionContext) -> bool:
|
errors = []
|
|
# Mock database checks (in real scenario, these would be actual queries)
|
|
# 1. Check user exists and is active
|
if self.assignee_id <= 0:
|
errors.append('Invalid user ID')
|
|
# 2. Check user's current task load
|
if self.max_concurrent_tasks < 1:
|
errors.append('Max concurrent tasks must be at least 1')
|
|
# 3. Check skill requirements
|
if self.skill_requirements:
|
# Mock skill validation
|
available_skills = ['python', 'labeling', 'review']
|
missing_skills = [skill for skill in self.skill_requirements if skill not in available_skills]
|
if missing_skills:
|
errors.append(f'Missing required skills: {missing_skills}')
|
|
# 4. Check project-level constraints
|
if hasattr(context.entity, 'project_id'):
|
# Mock project validation
|
if context.entity.project_id <= 0:
|
errors.append('Invalid project configuration')
|
|
# 5. Check organization permissions
|
if hasattr(context.entity, 'organization_id'):
|
if not context.current_user:
|
errors.append('User authentication required for assignment')
|
|
if errors:
|
raise TransitionValidationError(
|
f"Assignment validation failed: {'; '.join(errors)}",
|
{
|
'validation_errors': errors,
|
'assignee_id': self.assignee_id,
|
'task_id': context.entity.pk,
|
'skill_requirements': self.skill_requirements,
|
},
|
)
|
|
return True
|
|
def transition(self, context: TransitionContext) -> Dict[str, Any]:
|
return {
|
'assignee_id': self.assignee_id,
|
'max_concurrent_tasks': self.max_concurrent_tasks,
|
'skill_requirements': self.skill_requirements,
|
'assignment_validated': True,
|
}
|
|
# Test successful validation
|
valid_transition = AssignTaskWithConstraints(
|
assignee_id=123, max_concurrent_tasks=3, skill_requirements=['python', 'labeling']
|
)
|
|
context = TransitionContext(
|
entity=self.task,
|
current_user=self.user,
|
current_state=TaskStateChoices.CREATED,
|
target_state=valid_transition.get_target_state(),
|
)
|
|
assert valid_transition.validate_transition(context) is True
|
|
# Test multiple validation errors
|
invalid_transition = AssignTaskWithConstraints(
|
assignee_id=-1, # Invalid user ID
|
max_concurrent_tasks=0, # Invalid max tasks
|
skill_requirements=['nonexistent_skill'], # Missing skill
|
)
|
|
import pytest
|
|
with pytest.raises(TransitionValidationError) as cm:
|
invalid_transition.validate_transition(context)
|
|
error = cm.value
|
error_msg = str(error)
|
|
# Check all validation errors are included
|
assert 'Invalid user ID' in error_msg
|
assert 'Max concurrent tasks must be at least 1' in error_msg
|
assert 'Missing required skills' in error_msg
|
|
# Check error context
|
assert 'validation_errors' in error.context
|
assert len(error.context['validation_errors']) == 3
|
assert error.context['assignee_id'] == -1
|
|
# Test authentication requirement
|
context_no_user = TransitionContext(
|
entity=self.task,
|
current_user=None, # No user
|
current_state=TaskStateChoices.CREATED,
|
target_state=valid_transition.get_target_state(),
|
)
|
|
import pytest
|
|
with pytest.raises(TransitionValidationError) as cm:
|
valid_transition.validate_transition(context_no_user)
|
|
assert 'User authentication required' in str(cm.value)
|
|
|
class TestBaseStatePropertiesCoverage(TestCase):
|
"""Test coverage for BaseState model properties and methods"""
|
|
def setUp(self):
|
"""Set up test fixtures"""
|
self.user = User.objects.create(email='test_coverage@example.com')
|
self.org = Organization.objects.create(title='Test Org Coverage', created_by=self.user)
|
|
# Set CurrentContext BEFORE creating entities that need FSM
|
CurrentContext.set_user(self.user)
|
CurrentContext.set_organization_id(self.org.id)
|
|
self.project = Project.objects.create(
|
title='Test Project Coverage', created_by=self.user, organization=self.org
|
)
|
|
def tearDown(self):
|
"""Clean up after tests"""
|
CurrentContext.clear()
|
|
def test_base_state_entity_property(self):
|
"""Test BaseState.entity property retrieves related entity"""
|
from fsm.state_models import ProjectState
|
|
# Get the auto-created state
|
state_record = ProjectState.objects.filter(project=self.project).first()
|
assert state_record is not None
|
|
# Test entity property
|
retrieved_entity = state_record.entity
|
assert retrieved_entity.id == self.project.id
|
|
def test_base_state_timestamp_from_uuid(self):
|
"""Test BaseState.timestamp_from_uuid property extracts timestamp from UUID7"""
|
from fsm.state_models import ProjectState
|
|
before = datetime.now(timezone.utc)
|
state_record = ProjectState.objects.filter(project=self.project).first()
|
datetime.now(timezone.utc)
|
|
# Test timestamp extraction
|
timestamp = state_record.timestamp_from_uuid
|
assert isinstance(timestamp, datetime)
|
# Timestamp should be within reasonable range
|
assert timestamp.year == before.year
|