"""
|
Redis-based user activity tracking system and background tasks for synchronization.
|
|
This module provides functionality to cache user last_activity timestamps in Redis
|
with batch synchronization to the database to reduce database load.
|
"""
|
|
import logging
|
from datetime import datetime
|
from typing import List, Optional, Set
|
|
from core.redis import _redis, redis_connected, start_job_async_or_sync
|
from django.conf import settings
|
from django.contrib.auth import get_user_model
|
from django.db import transaction
|
from django.utils import timezone as django_timezone
|
from django_rq import get_connection
|
|
logger = logging.getLogger(__name__)
|
|
|
# Redis keys
|
USER_ACTIVITY_KEY_PREFIX = getattr(settings, 'USER_ACTIVITY_REDIS_KEY_PREFIX', 'user_activity')
|
USER_ACTIVITY_COUNTER_KEY = f'{USER_ACTIVITY_KEY_PREFIX}_counter'
|
USER_ACTIVITY_BATCH_KEY = f'{USER_ACTIVITY_KEY_PREFIX}_batch'
|
|
# Configuration
|
BATCH_SIZE = getattr(settings, 'USER_ACTIVITY_BATCH_SIZE', 100)
|
SYNC_THRESHOLD = getattr(settings, 'USER_ACTIVITY_SYNC_THRESHOLD', 50)
|
REDIS_TTL = getattr(settings, 'USER_ACTIVITY_REDIS_TTL', 86400) # 24 hours
|
|
|
def _get_user_activity_key(user_id: int) -> str:
|
"""Get Redis key for user activity."""
|
return f'{USER_ACTIVITY_KEY_PREFIX}:{user_id}'
|
|
|
def set_user_last_activity(user_id: int, timestamp: Optional[datetime] = None) -> bool:
|
"""
|
Set user last activity timestamp in Redis.
|
|
Args:
|
user_id: User ID
|
timestamp: Activity timestamp (defaults to current time)
|
|
Returns:
|
True if successfully set, False otherwise
|
"""
|
if not redis_connected():
|
return False
|
|
if timestamp is None:
|
timestamp = django_timezone.now()
|
|
try:
|
# Store timestamp as ISO string
|
timestamp_str = timestamp.isoformat()
|
redis_key = _get_user_activity_key(user_id)
|
|
# Get Redis connection
|
redis_client = get_connection()
|
|
# Set user activity with TTL
|
redis_client.setex(redis_key, REDIS_TTL, timestamp_str)
|
|
# Add user to batch set for later synchronization (atomic operation)
|
redis_client.sadd(USER_ACTIVITY_BATCH_KEY, user_id)
|
redis_client.expire(USER_ACTIVITY_BATCH_KEY, REDIS_TTL)
|
|
# Increment counter
|
current_count = increment_activity_counter()
|
|
logger.debug('Updated activity for user %s, counter at %s', user_id, current_count)
|
|
return True
|
|
except Exception as e:
|
logger.error('Failed to set user activity for user %s: %s', user_id, e)
|
return False
|
|
|
def get_user_last_activity(user_id: int) -> Optional[datetime]:
|
"""
|
Get user last activity timestamp from Redis with database fallback.
|
|
Args:
|
user_id: User ID
|
|
Returns:
|
Last activity timestamp or None if not found
|
"""
|
if _redis is None:
|
return
|
|
try:
|
redis_key = _get_user_activity_key(user_id)
|
redis_client = get_connection()
|
timestamp_str = redis_client.get(redis_key)
|
|
if timestamp_str:
|
# Decode bytes to string if needed
|
if isinstance(timestamp_str, bytes):
|
timestamp_str = timestamp_str.decode('utf-8')
|
# Parse ISO string back to datetime
|
return datetime.fromisoformat(timestamp_str)
|
|
except Exception as e:
|
logger.error('Failed to get user activity for user %s: %s', user_id, e)
|
|
|
def increment_activity_counter() -> int:
|
"""
|
Increment activity counter and return current count.
|
|
Returns:
|
Current counter value
|
"""
|
if not redis_connected():
|
return 0
|
|
try:
|
redis_client = get_connection()
|
# Increment counter (creates key with value 1 if it doesn't exist)
|
current_count = redis_client.incr(USER_ACTIVITY_COUNTER_KEY)
|
# Set expiration to match other keys
|
redis_client.expire(USER_ACTIVITY_COUNTER_KEY, REDIS_TTL)
|
logger.debug('Activity counter incremented to %s', current_count)
|
return current_count
|
|
except Exception as e:
|
logger.error('Failed to increment activity counter: %s', e)
|
return 0
|
|
|
def get_activity_counter() -> int:
|
"""
|
Get current activity counter value.
|
|
Returns:
|
Current counter value
|
"""
|
if not redis_connected():
|
return 0
|
|
try:
|
redis_client = get_connection()
|
count = redis_client.get(USER_ACTIVITY_COUNTER_KEY)
|
return int(count) if count is not None else 0
|
|
except Exception as e:
|
logger.error('Failed to get activity counter: %s', e)
|
return 0
|
|
|
def reset_activity_counter() -> bool:
|
"""
|
Reset activity counter to 0.
|
|
Returns:
|
True if successfully reset, False otherwise
|
"""
|
if not redis_connected():
|
return False
|
|
try:
|
redis_client = get_connection()
|
redis_client.set(USER_ACTIVITY_COUNTER_KEY, 0)
|
redis_client.expire(USER_ACTIVITY_COUNTER_KEY, REDIS_TTL)
|
logger.debug('Activity counter reset to 0')
|
return True
|
|
except Exception as e:
|
logger.error('Failed to reset activity counter: %s', e)
|
return False
|
|
|
def get_batch_user_ids() -> Set[int]:
|
"""
|
Get all user IDs from batch set.
|
|
Returns:
|
Set of user IDs to be synchronized
|
"""
|
if not redis_connected():
|
return set()
|
|
try:
|
redis_client = get_connection()
|
user_ids = redis_client.smembers(USER_ACTIVITY_BATCH_KEY)
|
return {int(uid.decode()) for uid in user_ids if uid}
|
|
except Exception as e:
|
logger.error('Failed to get batch user IDs: %s', e)
|
return set()
|
|
|
def clear_batch_user_ids(user_ids: Optional[Set[int]] = None) -> bool:
|
"""
|
Clear user IDs from batch set.
|
|
Args:
|
user_ids: Specific user IDs to remove (if None, clears all)
|
|
Returns:
|
True if successfully cleared, False otherwise
|
"""
|
if not redis_connected():
|
return False
|
|
try:
|
redis_client = get_connection()
|
|
if user_ids:
|
# Remove specific user IDs (atomic operation)
|
if user_ids:
|
redis_client.srem(USER_ACTIVITY_BATCH_KEY, *user_ids)
|
else:
|
# Clear entire set
|
redis_client.delete(USER_ACTIVITY_BATCH_KEY)
|
|
logger.debug('Cleared batch user IDs: %s', user_ids or 'all')
|
return True
|
|
except Exception as e:
|
logger.error('Failed to clear batch user IDs: %s', e)
|
return False
|
|
|
def should_sync_activities() -> bool:
|
"""
|
Check if activities should be synchronized to database.
|
|
Returns:
|
True if sync threshold is reached, False otherwise
|
"""
|
current_count = get_activity_counter()
|
should_sync = current_count >= SYNC_THRESHOLD
|
|
if should_sync:
|
logger.info('Sync threshold reached: %s >= %s', current_count, SYNC_THRESHOLD)
|
|
return should_sync
|
|
|
def get_user_activities_for_sync(user_ids: Set[int]) -> List[dict]:
|
"""
|
Get user activities from Redis for database synchronization.
|
|
Args:
|
user_ids: Set of user IDs to get activities for
|
|
Returns:
|
List of dictionaries with user_id and last_activity
|
"""
|
if not redis_connected():
|
return []
|
|
activities = []
|
|
for user_id in user_ids:
|
try:
|
timestamp = get_user_last_activity(user_id)
|
if timestamp:
|
activities.append({'user_id': user_id, 'last_activity': timestamp})
|
except Exception as e:
|
logger.error('Failed to get activity for user %s during sync: %s', user_id, e)
|
continue
|
|
return activities
|
|
|
def cleanup_redis_activity_data(user_ids: Set[int]) -> bool:
|
"""
|
Clean up Redis activity data for given user IDs.
|
|
Args:
|
user_ids: Set of user IDs to clean up
|
|
Returns:
|
True if successfully cleaned, False otherwise
|
"""
|
if not redis_connected():
|
return False
|
|
try:
|
redis_client = get_connection()
|
|
# Delete individual user activity keys
|
keys_to_delete = [_get_user_activity_key(user_id) for user_id in user_ids]
|
if keys_to_delete:
|
redis_client.delete(*keys_to_delete)
|
|
# Remove from batch set
|
clear_batch_user_ids(user_ids)
|
|
logger.debug('Cleaned up Redis data for %s users', len(user_ids))
|
return True
|
|
except Exception as e:
|
logger.error('Failed to cleanup Redis activity data: %s', e)
|
return False
|
|
|
def sync_user_activities_to_db(max_users: int = None) -> dict:
|
"""
|
Synchronize user activities from Redis to database.
|
|
Args:
|
max_users: Maximum number of users to process (defaults to BATCH_SIZE)
|
|
Returns:
|
Dictionary with sync results
|
"""
|
if max_users is None:
|
max_users = BATCH_SIZE
|
|
logger.info('Starting user activity sync to database')
|
|
try:
|
# Get user IDs to sync
|
user_ids = get_batch_user_ids()
|
|
if not user_ids:
|
logger.info('No user activities to sync')
|
return {'success': True, 'processed': 0, 'errors': 0, 'message': 'No activities to sync'}
|
|
# Limit batch size
|
if len(user_ids) > max_users:
|
user_ids = set(list(user_ids)[:max_users])
|
logger.info('Limited batch to %s users', max_users)
|
|
logger.info('Syncing activities for %s users', len(user_ids))
|
|
# Get activity data from Redis
|
activities = get_user_activities_for_sync(user_ids)
|
|
if not activities:
|
logger.warning('No activity data found for %s users', len(user_ids))
|
# Still clean up the batch set
|
cleanup_redis_activity_data(user_ids)
|
return {'success': True, 'processed': 0, 'errors': len(user_ids), 'message': 'No activity data found'}
|
|
# Sync to database
|
sync_result = _bulk_update_user_activities(activities)
|
|
if sync_result['success']:
|
# Clean up Redis data only if database sync was successful
|
cleanup_redis_activity_data(user_ids)
|
|
# Reset counter only if we processed all pending activities
|
# or if remaining users are below the threshold
|
remaining_users = get_batch_user_ids()
|
if not remaining_users or len(remaining_users) < SYNC_THRESHOLD:
|
reset_activity_counter()
|
logger.info('Reset activity counter after successful sync (remaining users: %s)', len(remaining_users))
|
|
logger.info('Activity sync completed: %s', sync_result)
|
return sync_result
|
|
except Exception as e:
|
logger.error('Failed to sync user activities: %s', e, exc_info=True)
|
return {'success': False, 'processed': 0, 'errors': 1, 'message': f'Sync failed: {str(e)}'}
|
|
|
def _bulk_update_user_activities(activities: List[dict]) -> dict:
|
"""
|
Bulk update user activities in database.
|
|
Args:
|
activities: List of activity dictionaries
|
|
Returns:
|
Dictionary with update results
|
"""
|
if not activities:
|
return {'success': True, 'processed': 0, 'errors': 0}
|
|
processed = 0
|
errors = 0
|
|
try:
|
with transaction.atomic():
|
# Get existing users
|
User = get_user_model()
|
user_ids = [activity['user_id'] for activity in activities]
|
existing_users = User.objects.filter(id__in=user_ids).only('id', 'last_activity')
|
user_dict = {user.id: user for user in existing_users}
|
|
# Update activities
|
users_to_update = []
|
|
for activity in activities:
|
user_id = activity['user_id']
|
new_activity = activity['last_activity']
|
|
user = user_dict.get(user_id)
|
if user:
|
# Only update if the new activity is more recent
|
if user.last_activity is None or new_activity > user.last_activity:
|
user.last_activity = new_activity
|
users_to_update.append(user)
|
processed += 1
|
else:
|
logger.debug(
|
'Skipping outdated activity for user %s: %s <= %s',
|
user_id,
|
new_activity,
|
user.last_activity,
|
)
|
processed += 1
|
else:
|
logger.warning('User %s not found in database', user_id)
|
errors += 1
|
|
# Bulk update
|
if users_to_update:
|
User.objects.bulk_update(users_to_update, ['last_activity'], batch_size=100)
|
logger.info('Bulk updated %s users', len(users_to_update))
|
|
return {'success': True, 'processed': processed, 'errors': errors, 'updated': len(users_to_update)}
|
|
except Exception as e:
|
logger.error('Failed to bulk update user activities: %s', e, exc_info=True)
|
return {
|
'success': False,
|
'processed': processed,
|
'errors': errors + 1,
|
'message': f'Bulk update failed: {str(e)}',
|
}
|
|
|
def schedule_activity_sync(force: bool = False) -> bool:
|
"""
|
Schedule user activity synchronization if needed.
|
|
Args:
|
force: Force sync even if threshold not reached
|
|
Returns:
|
True if sync was scheduled, False otherwise
|
"""
|
if not force and not should_sync_activities():
|
logger.debug('Sync threshold not reached, skipping')
|
return False
|
|
try:
|
# Schedule the sync job
|
start_job_async_or_sync(sync_user_activities_to_db, queue_name='low', redis=True)
|
reset_activity_counter() # Reset counter after scheduling
|
|
logger.info('Scheduled user activity sync job')
|
return True
|
|
except Exception as e:
|
logger.error('Failed to schedule activity sync: %s', e)
|
return False
|