""" API usage examples and documentation tests for the declarative transition system. These tests serve as both validation and comprehensive documentation, showing how to integrate the transition system with APIs, handle JSON serialization, generate schemas, and implement real-world patterns. """ import json from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from unittest.mock import Mock import pytest from django.test import TestCase from fsm.registry import register_state_transition, transition_registry from fsm.transition_utils import ( get_transition_schema, ) from fsm.transitions import ( BaseTransition, TransitionContext, TransitionValidationError, ) from pydantic import Field, validator class APIIntegrationExampleTests(TestCase): """ API integration examples demonstrating real-world usage patterns. These tests show how to integrate the transition system with REST APIs, handle JSON data, validate requests, and format responses. """ def setUp(self): from copy import deepcopy self.mock_entity = Mock() self.mock_entity.pk = 1 self.mock_entity._meta.model_name = 'task' self.mock_entity.organization_id = 100 self.mock_user = Mock() self.mock_user.id = 42 self.mock_user.username = 'api_user' # Save registry state and clear for this test self._original_transitions = deepcopy(transition_registry._transitions) transition_registry._transitions.clear() def tearDown(self): # Restore original transition registry to prevent test leakage transition_registry._transitions = self._original_transitions def test_rest_api_task_assignment_example(self): """ API EXAMPLE: REST endpoint for task assignment Shows how to implement a REST API endpoint that uses declarative transitions with proper validation and error handling. """ @register_state_transition('task', 'api_assign_task') class APITaskAssignmentTransition(BaseTransition): """Task assignment via API with comprehensive validation""" assignee_id: int = Field(..., description='ID of user to assign task to') priority: str = Field('normal', description='Task priority level') deadline: Optional[datetime] = Field(None, description='Assignment deadline') assignment_notes: str = Field('', description='Notes about the assignment') notify_assignee: bool = Field(True, description='Whether to notify the assignee') @validator('priority') def validate_priority(cls, v): valid_priorities = ['low', 'normal', 'high', 'urgent'] if v not in valid_priorities: raise ValueError(f'Priority must be one of: {valid_priorities}') return v @validator('deadline') def validate_deadline(cls, v): if v and v <= datetime.now(): raise ValueError('Deadline must be in the future') return v def get_target_state(self, context: Optional[TransitionContext] = None) -> str: return 'ASSIGNED' def validate_transition(self, context: TransitionContext) -> bool: # Business logic validation if context.current_state not in ['CREATED', 'UNASSIGNED']: raise TransitionValidationError( f'Cannot assign task in state: {context.current_state}', {'valid_states': ['CREATED', 'UNASSIGNED']}, ) # Mock user existence check if self.assignee_id <= 0: raise TransitionValidationError('Invalid assignee ID', {'assignee_id': self.assignee_id}) return True def transition(self, context: TransitionContext) -> Dict[str, Any]: return { 'assignee_id': self.assignee_id, 'priority': self.priority, 'deadline': self.deadline.isoformat() if self.deadline else None, 'assignment_notes': self.assignment_notes, 'notify_assignee': self.notify_assignee, 'assigned_by_id': context.current_user.id if context.current_user else None, 'assigned_at': context.timestamp.isoformat(), 'api_version': 'v1', } # Simulate API request data (JSON from client) api_request_data = { 'assignee_id': 123, 'priority': 'high', 'deadline': (datetime.now() + timedelta(days=7)).isoformat(), 'assignment_notes': 'Critical task requiring immediate attention', 'notify_assignee': True, } # API endpoint simulation: Parse and validate JSON try: # Step 1: Create transition from API data transition = APITaskAssignmentTransition(**api_request_data) # Step 2: Execute transition context = TransitionContext( entity=self.mock_entity, current_user=self.mock_user, current_state='CREATED', target_state=transition.get_target_state(), request_data=api_request_data, ) # Validate assert transition.validate_transition(context) # Execute result_data = transition.transition(context) # Step 3: Format API response api_response = { 'success': True, 'message': 'Task assigned successfully', 'data': { 'task_id': self.mock_entity.pk, 'new_state': transition.get_target_state(), 'assignment_details': result_data, }, 'timestamp': datetime.now().isoformat(), } # Validate API response assert api_response['success'] assert api_response['data']['new_state'] == 'ASSIGNED' assert api_response['data']['assignment_details']['assignee_id'] == 123 assert api_response['data']['assignment_details']['priority'] == 'high' except ValueError as e: # Handle Pydantic validation errors api_response = { 'success': False, 'error': 'Validation Error', 'message': str(e), 'timestamp': datetime.now().isoformat(), } except TransitionValidationError as e: # Handle business logic validation errors api_response = { 'success': False, 'error': 'Business Rule Violation', 'message': str(e), 'context': e.context, 'timestamp': datetime.now().isoformat(), } # Test error handling with invalid data invalid_request = { 'assignee_id': -1, # Invalid ID 'priority': 'invalid_priority', # Invalid priority 'deadline': '2020-01-01T00:00:00', # Past deadline } with pytest.raises(ValueError): APITaskAssignmentTransition(**invalid_request) def test_json_schema_generation_for_api_docs(self): """ API DOCUMENTATION: JSON Schema generation Shows how to generate OpenAPI/JSON schemas for API documentation from Pydantic transition models. """ @register_state_transition('annotation', 'api_submit_annotation') class APIAnnotationSubmissionTransition(BaseTransition): """Submit annotation via API with rich metadata""" confidence_score: float = Field( ..., ge=0.0, le=1.0, description="Annotator's confidence in the annotation (0.0-1.0)" ) annotation_quality: str = Field( 'good', description='Subjective quality assessment', pattern='^(excellent|good|fair|poor)$' ) time_spent_seconds: int = Field(..., ge=1, description='Time spent on annotation in seconds') difficulty_level: str = Field('medium', description='Perceived difficulty of the annotation task') review_requested: bool = Field(False, description='Whether the annotator requests manual review') tags: List[str] = Field(default_factory=list, description='Optional tags for categorization') metadata: Dict[str, Any] = Field( default_factory=dict, description='Additional metadata about the annotation process' ) def get_target_state(self, context: Optional[TransitionContext] = None) -> str: return 'SUBMITTED' def transition(self, context: TransitionContext) -> Dict[str, Any]: return { 'confidence_score': self.confidence_score, 'annotation_quality': self.annotation_quality, 'time_spent_seconds': self.time_spent_seconds, 'difficulty_level': self.difficulty_level, 'review_requested': self.review_requested, 'tags': self.tags, 'metadata': self.metadata, 'submitted_at': context.timestamp.isoformat(), } # Generate JSON schema schema = get_transition_schema(APIAnnotationSubmissionTransition) # Validate schema structure assert 'properties' in schema assert 'required' in schema # Check specific field schemas properties = schema['properties'] # confidence_score should have min/max constraints confidence_schema = properties['confidence_score'] assert confidence_schema['type'] == 'number' assert confidence_schema['minimum'] == 0.0 assert confidence_schema['maximum'] == 1.0 assert "Annotator's confidence" in confidence_schema['description'] # annotation_quality should have pattern constraint quality_schema = properties['annotation_quality'] assert quality_schema['type'] == 'string' assert 'pattern' in quality_schema # time_spent_seconds should have minimum constraint time_schema = properties['time_spent_seconds'] assert time_schema['type'] == 'integer' assert time_schema['minimum'] == 1 # tags should be array type tags_schema = properties['tags'] assert tags_schema['type'] == 'array' assert tags_schema['items']['type'] == 'string' # metadata should be object type metadata_schema = properties['metadata'] assert metadata_schema['type'] == 'object' # Required fields required_fields = schema['required'] assert 'confidence_score' in required_fields assert 'time_spent_seconds' in required_fields assert 'tags' not in required_fields # Optional field # Test schema-driven validation valid_data = { 'confidence_score': 0.85, 'annotation_quality': 'good', 'time_spent_seconds': 120, 'difficulty_level': 'hard', 'review_requested': True, 'tags': ['important', 'complex'], 'metadata': {'tool_version': '1.2.3', 'browser': 'chrome'}, } transition = APIAnnotationSubmissionTransition(**valid_data) assert transition.confidence_score == 0.85 assert len(transition.tags) == 2 # Print schema for documentation (would be used in API docs) schema_json = json.dumps(schema, indent=2) assert isinstance(schema_json, str) assert 'confidence_score' in schema_json def test_bulk_operations_api_pattern(self): """ API EXAMPLE: Bulk operations with transitions Shows how to handle bulk operations where multiple entities need to be transitioned with the same or different parameters. """ @register_state_transition('task', 'bulk_status_update') class BulkStatusUpdateTransition(BaseTransition): """Bulk status update for multiple tasks""" new_status: str = Field(..., description='New status for all tasks') update_reason: str = Field(..., description='Reason for bulk update') batch_id: str = Field(..., description='Unique identifier for this batch') force_update: bool = Field(False, description='Force update even if invalid states') def get_target_state(self, context: Optional[TransitionContext] = None) -> str: return self.new_status def validate_transition(self, context: TransitionContext) -> bool: valid_statuses = ['CREATED', 'IN_PROGRESS', 'COMPLETED', 'CANCELLED'] if self.new_status not in valid_statuses: raise TransitionValidationError(f'Invalid status: {self.new_status}') # Skip state validation if force update if not self.force_update: if context.current_state == self.new_status: raise TransitionValidationError('Cannot update to same status') return True def transition(self, context: TransitionContext) -> Dict[str, Any]: return { 'new_status': self.new_status, 'update_reason': self.update_reason, 'batch_id': self.batch_id, 'force_update': self.force_update, 'updated_at': context.timestamp.isoformat(), 'entity_id': context.entity.pk, } # Simulate bulk API request bulk_request = { 'task_ids': [1, 2, 3, 4, 5], 'transition_data': { 'new_status': 'IN_PROGRESS', 'update_reason': 'Project phase change', 'batch_id': 'batch_2024_001', 'force_update': False, }, } # Process bulk request batch_results = [] failed_updates = [] for task_id in bulk_request['task_ids']: # Create mock entity for each task mock_task = Mock() mock_task.pk = task_id mock_task._meta.model_name = 'task' try: # Create transition transition = BulkStatusUpdateTransition(**bulk_request['transition_data']) # Mock different current states for testing current_states = ['CREATED', 'CREATED', 'IN_PROGRESS', 'CREATED', 'COMPLETED'] current_state = current_states[task_id - 1] # Adjust for 0-based indexing context = TransitionContext( entity=mock_task, current_user=self.mock_user, current_state=current_state, target_state=transition.get_target_state(), ) # Validate and execute if transition.validate_transition(context): result = transition.transition(context) batch_results.append({'task_id': task_id, 'success': True, 'result': result}) except TransitionValidationError as e: failed_updates.append( {'task_id': task_id, 'success': False, 'error': str(e), 'context': getattr(e, 'context', {})} ) # API response for bulk operation api_response = { 'batch_id': bulk_request['transition_data']['batch_id'], 'total_requested': len(bulk_request['task_ids']), 'successful_updates': len(batch_results), 'failed_updates': len(failed_updates), 'results': batch_results, 'failures': failed_updates, 'timestamp': datetime.now().isoformat(), } # Validate bulk results assert api_response['total_requested'] == 5 assert api_response['successful_updates'] > 0 # Some tasks should succeed, some might fail due to state validation total_processed = api_response['successful_updates'] + api_response['failed_updates'] assert total_processed == 5 # Check individual results for result in batch_results: assert result['success'] assert result['result']['new_status'] == 'IN_PROGRESS' assert result['result']['batch_id'] == 'batch_2024_001' def test_webhook_integration_pattern(self): """ API EXAMPLE: Webhook integration with transitions Shows how to integrate transitions with webhook systems for external notifications and integrations. """ @register_state_transition('task', 'webhook_completion') class WebhookTaskCompletionTransition(BaseTransition): """Task completion with webhook notifications""" completion_quality: float = Field(..., ge=0.0, le=1.0) completion_notes: str = Field('', description='Completion notes') webhook_urls: List[str] = Field(default_factory=list, description='Webhook URLs to notify') notification_data: Dict[str, Any] = Field(default_factory=dict, description='Data to send in webhooks') webhook_responses: List[Dict[str, Any]] = Field( default_factory=list, description='Webhook response tracking' ) def get_target_state(self, context: Optional[TransitionContext] = None) -> str: return 'COMPLETED' def validate_transition(self, context: TransitionContext) -> bool: if context.current_state != 'IN_PROGRESS': raise TransitionValidationError('Can only complete in-progress tasks') return True def transition(self, context: TransitionContext) -> Dict[str, Any]: return { 'completion_quality': self.completion_quality, 'completion_notes': self.completion_notes, 'webhook_urls': self.webhook_urls, 'notification_data': self.notification_data, 'completed_at': context.timestamp.isoformat(), 'completed_by_id': context.current_user.id if context.current_user else None, } def post_transition_hook(self, context: TransitionContext, state_record) -> None: """Send webhook notifications after successful transition""" if self.webhook_urls: webhook_payload = { 'event': 'task.completed', 'task_id': context.entity.pk, 'state_record_id': getattr(state_record, 'id', 'mock-id'), 'completion_data': { 'quality': self.completion_quality, 'notes': self.completion_notes, 'completed_by': context.current_user.id if context.current_user else None, 'completed_at': context.timestamp.isoformat(), }, 'custom_data': self.notification_data, 'timestamp': datetime.now().isoformat(), } # Mock webhook sending (in real implementation, use async requests) for url in self.webhook_urls: webhook_response = { 'url': url, 'payload': webhook_payload, 'status': 'sent', 'timestamp': datetime.now().isoformat(), } self.webhook_responses.append(webhook_response) # Test webhook transition transition = WebhookTaskCompletionTransition( completion_quality=0.95, completion_notes='Task completed with excellent quality', webhook_urls=[ 'https://api.example.com/webhooks/task-completed', 'https://notifications.example.com/task-events', ], notification_data={'project_id': 123, 'priority': 'high', 'client_id': 'client_456'}, ) context = TransitionContext( entity=self.mock_entity, current_user=self.mock_user, current_state='IN_PROGRESS', target_state=transition.get_target_state(), ) # Validate and execute assert transition.validate_transition(context) transition.transition(context) # Simulate state record creation mock_state_record = Mock() mock_state_record.id = 'state-uuid-123' # Execute post-hook (webhook sending) transition.post_transition_hook(context, mock_state_record) # Validate webhook responses assert len(transition.webhook_responses) == 2 for response in transition.webhook_responses: assert 'url' in response assert 'payload' in response assert response['status'] == 'sent' # Validate webhook payload structure payload = response['payload'] assert payload['event'] == 'task.completed' assert payload['task_id'] == self.mock_entity.pk assert payload['completion_data']['quality'] == 0.95 assert payload['custom_data']['project_id'] == 123 def test_api_error_handling_patterns(self): """ API EXAMPLE: Comprehensive error handling patterns Shows how to implement robust error handling for API endpoints using the transition system with proper HTTP status codes and messages. """ @register_state_transition('task', 'api_critical_update') class APICriticalUpdateTransition(BaseTransition): """Critical update with extensive validation""" update_type: str = Field(..., description='Type of critical update') severity_level: int = Field(..., ge=1, le=5, description='Severity level 1-5') authorization_token: str = Field(..., description='Authorization token for critical updates') backup_required: bool = Field(True, description='Whether backup is required before update') def get_target_state(self, context: Optional[TransitionContext] = None) -> str: return 'CRITICALLY_UPDATED' def validate_transition(self, context: TransitionContext) -> bool: errors = [] # Authorization check if len(self.authorization_token) < 10: errors.append('Invalid authorization token') # Severity validation if self.severity_level >= 4 and not context.current_user: errors.append('High severity updates require authenticated user') # Update type validation valid_types = ['security_patch', 'critical_fix', 'emergency_update'] if self.update_type not in valid_types: errors.append(f'Invalid update type. Must be one of: {valid_types}') # State validation if context.current_state in ['COMPLETED', 'ARCHIVED']: errors.append(f'Cannot perform critical updates on {context.current_state.lower()} tasks') # Backup requirement if self.backup_required and self.severity_level >= 3: # Mock backup check backup_exists = True # In real implementation, check backup system if not backup_exists: errors.append('Backup required but not available') if errors: raise TransitionValidationError( 'Critical update validation failed', { 'validation_errors': errors, 'error_count': len(errors), 'severity_level': self.severity_level, 'update_type': self.update_type, }, ) return True def transition(self, context: TransitionContext) -> Dict[str, Any]: return { 'update_type': self.update_type, 'severity_level': self.severity_level, 'backup_required': self.backup_required, 'authorized_by': context.current_user.id if context.current_user else None, 'updated_at': context.timestamp.isoformat(), 'critical_update_id': f'crit_{int(context.timestamp.timestamp())}', } # Test various error scenarios and API responses # 1. Test successful request valid_request = { 'update_type': 'security_patch', 'severity_level': 3, 'authorization_token': 'valid_token_12345', 'backup_required': True, } def simulate_api_endpoint(request_data, current_state='IN_PROGRESS'): """Simulate API endpoint with proper error handling""" try: # Parse and validate request transition = APICriticalUpdateTransition(**request_data) # Create context context = TransitionContext( entity=self.mock_entity, current_user=self.mock_user, current_state=current_state, target_state=transition.get_target_state(), ) # Validate business logic transition.validate_transition(context) # Execute transition result = transition.transition(context) return { 'status_code': 200, 'success': True, 'data': { 'task_id': self.mock_entity.pk, 'new_state': transition.get_target_state(), 'update_details': result, }, } except ValueError as e: # Pydantic validation error (400 Bad Request) return { 'status_code': 400, 'success': False, 'error': 'Bad Request', 'message': 'Invalid request data', 'details': str(e), } except TransitionValidationError as e: # Business logic validation error (422 Unprocessable Entity) return { 'status_code': 422, 'success': False, 'error': 'Validation Failed', 'message': str(e), 'validation_errors': e.context.get('validation_errors', []), 'context': e.context, } except Exception as e: # Unexpected error (500 Internal Server Error) return { 'status_code': 500, 'success': False, 'error': 'Internal Server Error', 'message': 'An unexpected error occurred', 'details': str(e) if not isinstance(e, Exception) else 'Server error', } # Test successful request response = simulate_api_endpoint(valid_request) assert response['status_code'] == 200 assert response['success'] assert 'update_details' in response['data'] # Test Pydantic validation error (invalid severity level) invalid_request = { 'update_type': 'security_patch', 'severity_level': 10, # Invalid: > 5 'authorization_token': 'valid_token_12345', } response = simulate_api_endpoint(invalid_request) assert response['status_code'] == 400 assert not response['success'] assert response['error'] == 'Bad Request' # Test business logic validation error business_logic_error_request = { 'update_type': 'invalid_type', # Invalid update type 'severity_level': 5, 'authorization_token': 'short', # Too short 'backup_required': True, } response = simulate_api_endpoint(business_logic_error_request) assert response['status_code'] == 422 assert not response['success'] assert response['error'] == 'Validation Failed' assert 'validation_errors' in response assert len(response['validation_errors']) > 0 # Test state validation error response = simulate_api_endpoint(valid_request, current_state='COMPLETED') assert response['status_code'] == 422 # The error message is in validation_errors list, not the main message validation_errors = response.get('validation_errors', []) assert any('completed tasks' in error for error in validation_errors) def test_api_versioning_and_backward_compatibility(self): """ API EXAMPLE: API versioning with backward compatibility Shows how to handle API versioning using transition inheritance and maintain backward compatibility. """ # Version 1 API @register_state_transition('task', 'update_task_v1') class UpdateTaskV1Transition(BaseTransition): """Version 1 task update API""" status: str = Field(..., description='New task status') notes: str = Field('', description='Update notes') def get_target_state(self, context: Optional[TransitionContext] = None) -> str: return self.status def transition(self, context: TransitionContext) -> Dict[str, Any]: return { 'status': self.status, 'notes': self.notes, 'api_version': 'v1', 'updated_at': context.timestamp.isoformat(), } # Version 2 API with additional features @register_state_transition('task', 'update_task_v2') class UpdateTaskV2Transition(UpdateTaskV1Transition): """Version 2 task update API with enhanced features""" priority: Optional[str] = Field(None, description='Task priority') tags: List[str] = Field(default_factory=list, description='Task tags') estimated_hours: Optional[float] = Field(None, ge=0, description='Estimated hours') metadata: Dict[str, Any] = Field(default_factory=dict, description='Additional metadata') def transition(self, context: TransitionContext) -> Dict[str, Any]: # Call parent method for base functionality base_data = super().transition(context) # Add V2 specific data v2_data = { 'priority': self.priority, 'tags': self.tags, 'estimated_hours': self.estimated_hours, 'metadata': self.metadata, 'api_version': 'v2', } return {**base_data, **v2_data} # Test V1 API (backward compatibility) v1_request = {'status': 'IN_PROGRESS', 'notes': 'Started working on task'} v1_transition = UpdateTaskV1Transition(**v1_request) context = TransitionContext( entity=self.mock_entity, current_state='CREATED', target_state=v1_transition.get_target_state() ) v1_result = v1_transition.transition(context) assert v1_result['api_version'] == 'v1' assert v1_result['status'] == 'IN_PROGRESS' assert 'priority' not in v1_result # V1 doesn't have priority # Test V2 API with enhanced features v2_request = { 'status': 'IN_PROGRESS', 'notes': 'Started working on task with enhanced tracking', 'priority': 'high', 'tags': ['urgent', 'client-facing'], 'estimated_hours': 4.5, 'metadata': {'client_id': 123, 'project_phase': 'development'}, } v2_transition = UpdateTaskV2Transition(**v2_request) v2_result = v2_transition.transition(context) assert v2_result['api_version'] == 'v2' assert v2_result['status'] == 'IN_PROGRESS' # Inherited from V1 assert v2_result['priority'] == 'high' # V2 feature assert len(v2_result['tags']) == 2 # V2 feature assert v2_result['estimated_hours'] == 4.5 # V2 feature assert 'client_id' in v2_result['metadata'] # V2 feature # Test V2 API with minimal data (backward compatible) v2_minimal_request = {'status': 'COMPLETED', 'notes': 'Task finished'} v2_minimal_transition = UpdateTaskV2Transition(**v2_minimal_request) v2_minimal_result = v2_minimal_transition.transition(context) assert v2_minimal_result['api_version'] == 'v2' assert v2_minimal_result['status'] == 'COMPLETED' assert v2_minimal_result['priority'] is None # Optional field assert v2_minimal_result['tags'] == [] # Default value assert v2_minimal_result['estimated_hours'] is None # Optional field assert v2_minimal_result['metadata'] == {} # Default value