import logging
|
from functools import wraps
|
|
import requests
|
from core.feature_flags import flag_set
|
from core.redis import start_job_async_or_sync
|
from core.utils.common import load_func
|
from django.conf import settings
|
from django.db.models import Q
|
from django.db.models.query import QuerySet
|
|
from .models import Webhook, WebhookAction
|
|
logger = logging.getLogger(__name__)
|
|
|
def get_active_webhooks(organization, project, action):
|
"""Return all active webhooks for organization or project by action.
|
|
If project is None - function return only organization hooks
|
else project is not None - function return project and organization hooks
|
Organization hooks are global hooks.
|
"""
|
action_meta = WebhookAction.ACTIONS[action]
|
if project and action_meta.get('organization-only'):
|
raise ValueError('There is no project webhooks for organization-only action')
|
|
return Webhook.objects.filter(
|
Q(organization=organization)
|
& (Q(project=project) | Q(project=None))
|
& Q(is_active=True)
|
& (
|
Q(send_for_all_actions=True)
|
| Q(
|
id__in=WebhookAction.objects.filter(webhook__organization=organization, action=action).values_list(
|
'webhook_id', flat=True
|
)
|
)
|
)
|
).distinct()
|
|
|
def run_webhook_sync(webhook, action, payload=None):
|
"""Run one webhook for action.
|
|
This function must not raise any exceptions.
|
"""
|
data = {
|
'action': action,
|
}
|
if webhook.send_payload and payload:
|
data.update(payload)
|
try:
|
logging.debug('Run webhook %s for action %s', webhook.id, action)
|
return requests.post(
|
webhook.url,
|
headers=webhook.headers,
|
json=data,
|
timeout=settings.WEBHOOK_TIMEOUT,
|
)
|
except requests.RequestException as exc:
|
logging.error(exc, exc_info=True)
|
return
|
|
|
def emit_webhooks_sync(organization, project, action, payload):
|
"""
|
Run all active webhooks for the action.
|
"""
|
webhooks = get_active_webhooks(organization, project, action)
|
if project and payload and webhooks.filter(send_payload=True).exists():
|
payload['project'] = load_func(settings.WEBHOOK_SERIALIZERS['project'])(instance=project).data
|
for wh in webhooks:
|
run_webhook_sync(wh, action, payload)
|
|
|
def _process_webhook_batch(webhooks, project, action, batch, action_meta):
|
"""Process a single batch of instances for webhooks.
|
|
Args:
|
webhooks: Active webhooks to send
|
project: Project instance (optional)
|
action: Action name
|
batch: Batch of instances to process
|
action_meta: Action metadata from WebhookAction.ACTIONS
|
"""
|
payload = {}
|
|
if batch and webhooks.filter(send_payload=True).exists():
|
serializer_class = action_meta.get('serializer')
|
if serializer_class:
|
payload[action_meta['key']] = serializer_class(instance=batch, many=action_meta['many']).data
|
if project and payload:
|
payload['project'] = load_func(settings.WEBHOOK_SERIALIZERS['project'])(instance=project).data
|
if payload and 'nested-fields' in action_meta:
|
for key, value in action_meta['nested-fields'].items():
|
payload[key] = value['serializer'](
|
instance=get_nested_field(batch, value['field']), many=value['many']
|
).data
|
|
for wh in webhooks:
|
run_webhook_sync(wh, action, payload)
|
|
|
def emit_webhooks_for_instance_sync(organization, project, action, instance=None):
|
"""Run all active webhooks for the action using instances as payload.
|
|
Be sure WebhookAction.ACTIONS contains all required fields.
|
"""
|
webhooks = get_active_webhooks(organization, project, action)
|
if not webhooks.exists():
|
return
|
|
action_meta = WebhookAction.ACTIONS[action]
|
|
# Convert list of IDs to queryset
|
if instance and isinstance(instance, list) and isinstance(instance[0], int):
|
instance = action_meta['model'].objects.filter(id__in=instance)
|
|
# Check if batching is needed
|
is_batch_collection = isinstance(instance, (list, QuerySet))
|
use_batching = is_batch_collection and flag_set('fflag_fix_back_plt_843_webhook_memory_improvement_12082025_short')
|
|
if use_batching:
|
# Process in batches
|
batch_size = settings.WEBHOOK_BATCH_SIZE
|
|
if isinstance(instance, QuerySet):
|
# For QuerySets, use iterator with chunk_size
|
total_count = instance.count()
|
logger.debug(f'Processing webhook for {total_count} instances in batches of {batch_size}')
|
for i in range(0, total_count, batch_size):
|
batch = instance[i : i + batch_size]
|
logger.debug(f'Processing batch {i // batch_size + 1} with {batch.count()} instances')
|
_process_webhook_batch(webhooks, project, action, batch, action_meta)
|
else:
|
# For lists, slice directly
|
total_count = len(instance)
|
logger.debug(f'Processing webhook for {total_count} instances in batches of {batch_size}')
|
for i in range(0, len(instance), batch_size):
|
batch = instance[i : i + batch_size]
|
logger.debug(f'Processing batch {i // batch_size + 1} with {len(batch)} instances')
|
_process_webhook_batch(webhooks, project, action, batch, action_meta)
|
else:
|
# Original behavior - process all at once
|
_process_webhook_batch(webhooks, project, action, instance, action_meta)
|
|
|
def run_webhook(webhook, action, payload=None):
|
"""Run one webhook for action.
|
|
This function must not raise any exceptions.
|
|
Will run a webhook in an RQ worker.
|
"""
|
if flag_set('fflag_fix_back_lsdv_4604_excess_sql_queries_in_api_short'):
|
start_job_async_or_sync(
|
run_webhook_sync,
|
webhook,
|
action,
|
payload,
|
queue_name='high',
|
)
|
else:
|
run_webhook_sync(webhook, action, payload)
|
|
|
def emit_webhooks_for_instance(organization, project, action, instance=None):
|
"""Run all active webhooks for the action using instances as payload.
|
|
Be sure WebhookAction.ACTIONS contains all required fields.
|
|
Will run all selected webhooks in an RQ worker.
|
"""
|
if flag_set('fflag_fix_back_lsdv_4604_excess_sql_queries_in_api_short'):
|
start_job_async_or_sync(emit_webhooks_for_instance_sync, organization, project, action, instance)
|
else:
|
emit_webhooks_for_instance_sync(organization, project, action, instance)
|
|
|
def emit_webhooks(organization, project, action, payload):
|
"""
|
Run all active webhooks for the action.
|
|
Will run all selected webhooks in an RQ worker.
|
"""
|
if flag_set('fflag_fix_back_lsdv_4604_excess_sql_queries_in_api_short'):
|
start_job_async_or_sync(emit_webhooks_sync, organization, project, action, payload)
|
else:
|
emit_webhooks_sync(organization, project, action, payload)
|
|
|
def api_webhook(action):
|
"""Decorator emit webhooks for APIView methods: post, put, patch.
|
|
Used for simple Create/Update methods.
|
The decorator expects authorized request and response with 'id' key in data.
|
|
Example:
|
```
|
@api_webhook(WebhookAction.PROJECT_UPDATED)
|
def put(self, request, *args, **kwargs):
|
return super(ProjectAPI, self).put(request, *args, **kwargs)
|
```
|
"""
|
|
def decorator(func):
|
@wraps(func)
|
def wrap(self, request, *args, **kwargs):
|
response = func(self, request, *args, **kwargs)
|
|
action_meta = WebhookAction.ACTIONS[action]
|
many = action_meta['many']
|
instance = action_meta['model'].objects.get(id=response.data.get('id'))
|
if many:
|
instance = [instance]
|
project = None
|
if 'project-field' in action_meta:
|
project = get_nested_field(instance, action_meta['project-field'])
|
emit_webhooks_for_instance(
|
request.user.active_organization,
|
project,
|
action,
|
instance,
|
)
|
return response
|
|
return wrap
|
|
return decorator
|
|
|
def api_webhook_for_delete(action):
|
"""Decorator emit webhooks for APIView delete method.
|
|
The decorator expects authorized request and use get_object() method
|
before delete.
|
|
Example:
|
```
|
@extend_schema(tags=['Annotations'])
|
@api_webhook_for_delete(WebhookAction.ANNOTATIONS_DELETED)
|
def delete(self, request, *args, **kwargs):
|
return super(AnnotationAPI, self).delete(request, *args, **kwargs)
|
```
|
"""
|
|
def decorator(func):
|
@wraps(func)
|
def wrap(self, request, *args, **kwargs):
|
instance = self.get_object()
|
action_meta = WebhookAction.ACTIONS[action]
|
many = action_meta['many']
|
project = None
|
if 'project-field' in action_meta:
|
project = get_nested_field(instance, action_meta['project-field'])
|
|
obj = {'id': instance.pk}
|
if many:
|
obj = [obj]
|
|
response = func(self, request, *args, **kwargs)
|
|
emit_webhooks_for_instance(request.user.active_organization, project, action, obj)
|
return response
|
|
return wrap
|
|
return decorator
|
|
|
def get_nested_field(value, field):
|
"""
|
Get nested field from list of objects or single instance
|
:param value: Single instance or list to look up field
|
:param field: Field to lookup
|
:return: List or single instance of looked up field
|
"""
|
if field == '__self__':
|
return value
|
fields = field.split('__')
|
for fld in fields:
|
if isinstance(value, list):
|
value = [getattr(v, fld) for v in value]
|
else:
|
value = getattr(value, fld)
|
return value
|