"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
|
"""
|
import logging
|
import sys
|
from datetime import timedelta
|
from functools import partial
|
from typing import Any
|
|
import django_rq
|
import redis
|
from core.current_request import CurrentContext
|
from django.conf import settings
|
from django_rq import get_connection
|
from rq.command import send_stop_job_command
|
from rq.exceptions import InvalidJobOperation
|
from rq.registry import StartedJobRegistry
|
|
logger = logging.getLogger(__name__)
|
|
|
def _truncate_args_for_logging(args, kwargs, max_length=30):
|
try:
|
|
def _truncate_scalar(value):
|
v_repr = repr(value)
|
return v_repr[:max_length] + ('...' if len(v_repr) > max_length else '')
|
|
def _truncate_top_level(value):
|
# If dict at the top level, expand only one level of keys
|
if isinstance(value, dict):
|
parts = []
|
for dk, dv in value.items():
|
# Do NOT recurse: treat nested dicts as scalars
|
parts.append(f'{repr(dk)}: {_truncate_scalar(dv)}')
|
return '{' + ', '.join(parts) + '}'
|
return _truncate_scalar(value)
|
|
truncated_args = [_truncate_top_level(arg) for arg in args]
|
|
truncated_kwargs = {k: _truncate_top_level(v) for k, v in kwargs.items() if k != 'on_failure'}
|
|
result = []
|
if truncated_args:
|
result.append(f'args: {truncated_args}')
|
if truncated_kwargs:
|
result.append(f'kwargs: {truncated_kwargs}')
|
|
return ', '.join(result) if result else 'no arguments'
|
except Exception:
|
return 'failed to format arguments'
|
|
|
try:
|
_redis = get_connection()
|
_redis.ping()
|
logger.debug('=> Redis is connected successfully.')
|
except: # noqa: E722
|
logger.debug('=> Redis is not connected.')
|
_redis = None
|
|
|
def redis_healthcheck():
|
if not _redis:
|
return False
|
try:
|
_redis.ping()
|
except redis.exceptions.ConnectionError as exc:
|
logger.error(f'Redis healthcheck failed with ConnectionError: {exc}', exc_info=True)
|
return False
|
except redis.exceptions.TimeoutError as exc:
|
logger.error(f'Redis healthcheck failed with TimeoutError: {exc}', exc_info=True)
|
return False
|
except redis.exceptions.RedisError as exc:
|
logger.error(f'Redis healthcheck failed: {exc}', exc_info=True)
|
return False
|
else:
|
logger.debug('Redis client is alive!')
|
return True
|
|
|
def redis_connected():
|
if settings.REDIS_ENABLED:
|
return redis_healthcheck()
|
return False
|
|
|
def _is_serializable(value: Any) -> bool:
|
"""Check if a value can be serialized for job context."""
|
return isinstance(value, (str, int, float, bool, list, dict, type(None)))
|
|
|
def _capture_context() -> dict:
|
"""
|
Capture the current context for passing to a job.
|
Returns a dictionary of context data that can be serialized.
|
"""
|
context_data = {}
|
|
# Get user information
|
if user := CurrentContext.get_user():
|
context_data['user_id'] = user.id
|
|
# Get organization if set separately
|
if org_id := CurrentContext.get_organization_id():
|
context_data['organization_id'] = org_id
|
|
# If organization_id is not set, try to get it from the user, this ensures that we have an organization_id for the job
|
# And it prefers the original requesting user's organization_id over the current active organization_id of the user which could change during async jobs
|
if not org_id and user and hasattr(user, 'active_organization_id') and user.active_organization_id:
|
context_data['organization_id'] = user.active_organization_id
|
|
# Get any custom context values (exclude non-serializable objects)
|
job_data = CurrentContext.get_job_data()
|
for key, value in job_data.items():
|
if key not in ['user', 'request'] and _is_serializable(value):
|
context_data[key] = value
|
|
return context_data
|
|
|
def redis_get(key):
|
if not redis_healthcheck():
|
return
|
return _redis.get(key)
|
|
|
def redis_hget(key1, key2):
|
if not redis_healthcheck():
|
return
|
return _redis.hget(key1, key2)
|
|
|
def redis_set(key, value, ttl=None):
|
if not redis_healthcheck():
|
return
|
return _redis.set(key, value, ex=ttl)
|
|
|
def redis_hset(key1, key2, value):
|
if not redis_healthcheck():
|
return
|
return _redis.hset(key1, key2, value)
|
|
|
def redis_delete(key):
|
if not redis_healthcheck():
|
return
|
return _redis.delete(key)
|
|
|
def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs):
|
"""
|
Start job async with redis or sync if redis is not connected.
|
Automatically preserves context for async jobs and clears it after completion.
|
|
:param job: Job function
|
:param args: Function arguments
|
:param in_seconds: Job will be delayed for in_seconds
|
:param retry: RQ Retry object or int (max retries). Only used in async mode.
|
:param kwargs: Function keywords arguments
|
:return: Job or function result
|
"""
|
from rq import Retry
|
|
redis = redis_connected() and kwargs.get('redis', True)
|
queue_name = kwargs.get('queue_name', 'default')
|
|
if 'queue_name' in kwargs:
|
del kwargs['queue_name']
|
if 'redis' in kwargs:
|
del kwargs['redis']
|
|
job_timeout = None
|
if 'job_timeout' in kwargs:
|
job_timeout = kwargs['job_timeout']
|
del kwargs['job_timeout']
|
|
retry = None
|
if 'retry' in kwargs:
|
retry = kwargs['retry']
|
del kwargs['retry']
|
if isinstance(retry, int):
|
retry = Retry(max=retry)
|
|
if redis:
|
# Async execution with Redis - wrap job for context management
|
try:
|
context_data = _capture_context()
|
|
if context_data:
|
meta = kwargs.get('meta', {})
|
# Store context data in job meta for worker access
|
meta.update(context_data)
|
kwargs['meta'] = meta
|
except Exception:
|
logger.info(f'Failed to capture context for job {job.__name__} on queue {queue_name}')
|
|
try:
|
args_info = _truncate_args_for_logging(args, kwargs)
|
logger.info(f'Start async job {job.__name__} on queue {queue_name} with {args_info}.')
|
except Exception:
|
logger.info(f'Start async job {job.__name__} on queue {queue_name}.')
|
queue = django_rq.get_queue(queue_name)
|
enqueue_method = queue.enqueue
|
if in_seconds > 0:
|
enqueue_method = partial(queue.enqueue_in, timedelta(seconds=in_seconds))
|
|
job = enqueue_method(
|
job,
|
*args,
|
**kwargs,
|
job_timeout=job_timeout,
|
failure_ttl=settings.RQ_FAILED_JOB_TTL,
|
retry=retry,
|
)
|
return job
|
else:
|
on_failure = kwargs.pop('on_failure', None)
|
|
try:
|
result = job(*args, **kwargs)
|
return result
|
except Exception:
|
exc_info = sys.exc_info()
|
if on_failure:
|
on_failure(job, *exc_info)
|
raise
|
|
|
def is_job_in_queue(queue, func_name, meta):
|
"""
|
Checks if func_name with kwargs[meta] is in queue (doesn't check workers)
|
:param queue: queue object
|
:param func_name: function name
|
:param meta: job meta information
|
:return: True if job in queue
|
"""
|
# get all jobs from Queue
|
jobs = get_jobs_by_meta(queue, func_name, meta)
|
# check if there is job with meta in list
|
return any(jobs)
|
|
|
def is_job_on_worker(job_id, queue_name):
|
"""
|
Checks if job id is on workers
|
:param job_id: Job ID
|
:param queue_name: Queue name
|
:return: True if job on worker
|
"""
|
if not job_id:
|
return False
|
registry = StartedJobRegistry(queue_name, connection=_redis)
|
member = job_id.encode() if isinstance(job_id, str) else job_id
|
# Use Redis ZSET membership check (ZSCORE) instead of registry.get_job_ids(),
|
# because the latter calls registry.cleanup(), which installs SIGALRM timers and
|
# crashes when executed outside the interpreter's main thread (e.g., inside WSGI).
|
# ZSCORE simply looks up the score of the member in the sorted set: if it returns
|
# None, the member/job ID is not present; otherwise it is currently marked as running.
|
return registry.connection.zscore(registry.key, member) is not None
|
|
|
def delete_job_by_id(queue, id):
|
"""
|
Delete job by id from queue
|
@param queue: Queue on redis to delete from
|
@param id: Job id
|
"""
|
job = queue.fetch_job(id)
|
if job is not None:
|
# stop job if it is in master redis node (in the queue)
|
logger.info(f'Stopping job {id} from queue {queue.name}.')
|
try:
|
job.cancel()
|
job.delete()
|
logger.debug(f'Fetched job {id} and stopped.')
|
except InvalidJobOperation:
|
logger.debug(f'Job {id} was already cancelled.')
|
else:
|
# try to stop job on worker (job started)
|
logger.info(f'Stopping job {id} on worker from queue {queue.name}.')
|
try:
|
send_stop_job_command(_redis, id)
|
logger.debug(f'Send stop job {id} to redis worker.')
|
except Exception as e:
|
logger.debug(f'Redis job {id} was not found: {str(e)}')
|
|
|
def get_jobs_by_meta(queue, func_name, meta):
|
"""
|
Get jobs from queue by func_name and meta data
|
:param queue: Queue on redis to check in
|
:param func_name: Started function name
|
:param meta: meta dict
|
:return: Job list
|
"""
|
# get all jobs from Queue
|
jobs = (job for job in queue.get_jobs() if job.func.__name__ == func_name)
|
# return only with same meta data
|
return [job for job in jobs if hasattr(job, 'meta') and job.meta == meta]
|