Bin
2025-12-17 bc6aa38242b0a7dea4b18bc90e2d78740436a58b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
from core.permissions import all_permissions
from core.utils.common import load_func
from django.conf import settings
from rest_flex_fields import FlexFieldsModelSerializer
from rest_framework import serializers
from users.models import User
 
 
class BaseUserSerializer(FlexFieldsModelSerializer):
    # short form for user presentation
    initials = serializers.SerializerMethodField(default='?', read_only=True)
    avatar = serializers.SerializerMethodField(read_only=True)
    active_organization_meta = serializers.SerializerMethodField(read_only=True)
    last_activity = serializers.DateTimeField(read_only=True, source='last_activity_cached')
 
    def get_avatar(self, instance):
        return instance.avatar_url
 
    def get_initials(self, instance):
        return instance.get_initials(self._is_deleted(instance))
 
    def get_active_organization_meta(self, instance):
        organization = instance.active_organization
        if organization is None:
            return {'title': '', 'email': ''}
 
        title = organization.title
        email = ''
 
        if organization.created_by is not None and organization.created_by.email is not None:
            email = organization.created_by.email
 
        return {'title': title, 'email': email}
 
    def _is_deleted(self, instance):
        if 'user' in self.context:
            org_id = self.context['user'].active_organization_id
        elif 'request' in self.context:
            org_id = self.context['request'].user.active_organization_id
        else:
            org_id = None
 
        if not org_id:
            return False
 
        # Will use prefetched objects if available
        organization_members = instance.om_through.all()
        organization_member_for_user = next(
            (
                organization_member
                for organization_member in organization_members
                if organization_member.organization_id == org_id
            ),
            None,
        )
        if not organization_member_for_user:
            return True
        return bool(organization_member_for_user.deleted_at)
 
    def to_representation(self, instance):
        """Returns user with cache, this helps to avoid multiple s3/gcs links resolving for avatars"""
 
        uid = instance.id
        key = 'user_cache'
 
        if key not in self.context:
            self.context[key] = {}
        if uid not in self.context[key]:
            self.context[key][uid] = super().to_representation(instance)
 
        if self._is_deleted(instance):
            for field in ['username', 'first_name', 'last_name', 'email']:
                self.context[key][uid][field] = 'User' if field == 'last_name' else 'Deleted'
 
        return self.context[key][uid]
 
    class Meta:
        model = User
        fields = (
            'id',
            'first_name',
            'last_name',
            'username',
            'email',
            'last_activity',
            'custom_hotkeys',
            'avatar',
            'initials',
            'phone',
            'active_organization',
            'active_organization_meta',
            'allow_newsletters',
            'date_joined',
        )
 
 
class BaseUserSerializerUpdate(BaseUserSerializer):
    class Meta(BaseUserSerializer.Meta):
        read_only_fields = ('email',)
 
 
class BaseWhoAmIUserSerializer(BaseUserSerializer):
    permissions = serializers.SerializerMethodField()
 
    class Meta(BaseUserSerializer.Meta):
        fields = BaseUserSerializer.Meta.fields + ('permissions',)
 
    def get_permissions(self, user) -> list[str]:
        return [perm for _, perm in all_permissions]
 
 
class UserSimpleSerializer(BaseUserSerializer):
    class Meta:
        model = User
        fields = ('id', 'first_name', 'last_name', 'email', 'avatar')
 
 
class HotkeysSerializer(serializers.Serializer):
    custom_hotkeys = serializers.DictField(required=True)
 
    # Security: Define dangerous key combinations that should be blocked
    DANGEROUS_KEY_COMBINATIONS = [
        'ctrl+alt+delete',
        'cmd+alt+escape',
        'alt+f4',
        'ctrl+alt+esc',
        'cmd+option+esc',
        'ctrl+shift+esc',
        'cmd+shift+q',
        'alt+tab',
        'cmd+tab',
        'ctrl+alt+t',
        'cmd+space',  # Common system shortcuts
    ]
 
    # Limit maximum number of custom hotkeys to prevent abuse
    MAX_HOTKEYS = 200
 
    def validate_custom_hotkeys(self, custom_hotkeys):
        """
        Validates the hotkey format and enforces security constraints.
        Expected format: {"section:action": {"key": "key_combination", "active": boolean}}
        The "active" field is optional and defaults to true.
        """
        if not isinstance(custom_hotkeys, dict):
            raise serializers.ValidationError('custom_hotkeys 必须是一个字典')
 
        # Security: Limit the number of hotkeys
        if len(custom_hotkeys) > self.MAX_HOTKEYS:
            raise serializers.ValidationError(f'自定义快捷键不能超过 {self.MAX_HOTKEYS} 个')
 
        for action_key, hotkey_data in custom_hotkeys.items():
            # Validate action key format (section:action)
            if not isinstance(action_key, str) or not action_key:
                raise serializers.ValidationError(f"Action key '{action_key}' 必须是非空字符串")
 
            # Security: Limit action key length
            if len(action_key) > 100:
                raise serializers.ValidationError(f"Action key '{action_key}' 太长 (最大 100 字符)")
 
            # Check if the action key follows the section:action format
            if ':' not in action_key:
                raise serializers.ValidationError(f"Action key '{action_key}' must be in 'section:action' format")
 
            section, action = action_key.split(':', 1)
 
            # Validate section and action parts
            if not section.strip() or not action.strip():
                raise serializers.ValidationError(
                    f"Action key '{action_key}' must have non-empty section and action parts"
                )
 
            # Validate hotkey data format
            if not isinstance(hotkey_data, dict):
                raise serializers.ValidationError(f"Hotkey data for '{action_key}' must be a dictionary")
 
            # Check for key in hotkey data
            if 'key' not in hotkey_data:
                raise serializers.ValidationError(f"Missing 'key' in hotkey data for '{action_key}'")
 
            key_combo = hotkey_data['key']
 
            # Get active status, default to True if not specified
            active = hotkey_data.get('active', True)
 
            # Validate key combination
            if not isinstance(key_combo, str) or not key_combo:
                raise serializers.ValidationError(f"Key combination for '{action_key}' must be a non-empty string")
 
            # Security: Limit key combination length
            if len(key_combo) > 50:
                raise serializers.ValidationError(
                    f"Key combination for '{action_key}' is too long (max 50 characters)"
                )
 
            # Security: Check for dangerous key combinations
            normalized_key = key_combo.lower().strip()
            if normalized_key in self.DANGEROUS_KEY_COMBINATIONS:
                raise serializers.ValidationError(f"Key combination '{key_combo}' is not allowed for security reasons")
 
            # Validate active flag if provided
            if 'active' in hotkey_data and not isinstance(active, bool):
                raise serializers.ValidationError(f"Active flag for '{action_key}' must be a boolean")
 
            # Security: Validate key combination format (basic check)
            self._validate_key_format(key_combo, action_key)
 
        return custom_hotkeys
 
    def _validate_key_format(self, key_combo, action_key):
        """
        Basic validation of key combination format for security.
        Prevents injection of malicious characters.
        """
        # Allow only alphanumeric, common modifier keys, and basic symbols
        import re
 
        # Allow letters, numbers, common modifiers, and basic symbols
        allowed_pattern = re.compile(r'^[a-zA-Z0-9\+\-\s\[\]\\;\'\".,/`~!@#$%^&*()_={}|:<>?]+$')
 
        if not allowed_pattern.match(key_combo):
            raise serializers.ValidationError(
                f"Key combination '{key_combo}' for '{action_key}' contains invalid characters"
            )
 
        # Validate modifier key format (basic check)
        parts = [part.strip() for part in key_combo.split('+')]
        valid_modifiers = ['ctrl', 'cmd', 'command', 'alt', 'option', 'shift', 'meta']
 
        for part in parts[:-1]:  # All parts except the last should be modifiers or valid keys
            if part.lower() not in valid_modifiers and len(part) > 20:
                raise serializers.ValidationError(f"Invalid modifier or key '{part}' in key combination '{key_combo}'")
 
 
UserSerializer = load_func(settings.USER_SERIALIZER)
WhoAmIUserSerializer = load_func(settings.WHOAMI_USER_SERIALIZER)
UserSerializerUpdate = load_func(settings.USER_SERIALIZER_UPDATE)