Bin
2025-12-16 971a2a12c03b74dd2d7d668b9dbc599f5131bcaf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
"""
import logging
import os
import uuid
from collections import Counter
 
import ijson
import pandas as pd
 
try:
    import ujson as json
except:  # noqa: E722
    import json
 
from django.conf import settings
from django.db import models
from django.utils.functional import cached_property
from rest_framework.exceptions import ValidationError
 
logger = logging.getLogger(__name__)
 
 
def upload_name_generator(instance, filename):
    project = str(instance.project_id)
    project_dir = os.path.join(settings.MEDIA_ROOT, settings.UPLOAD_DIR, project)
    os.makedirs(project_dir, exist_ok=True)
    path = settings.UPLOAD_DIR + '/' + project + '/' + str(uuid.uuid4())[0:8] + '-' + filename
    return path
 
 
class FileUpload(models.Model):
    user = models.ForeignKey('users.User', related_name='file_uploads', on_delete=models.CASCADE)
    project = models.ForeignKey('projects.Project', related_name='file_uploads', on_delete=models.CASCADE)
    file = models.FileField(upload_to=upload_name_generator)
 
    def has_permission(self, user):
        user.project = self.project  # link for activity log
        return self.project.has_permission(user)
 
    @cached_property
    def filepath(self):
        return self.file.name
 
    @cached_property
    def file_name(self):
        return os.path.basename(self.file.name)
 
    @property
    def url(self):
        if settings.FORCE_SCRIPT_NAME and not (settings.HOSTNAME and settings.CLOUD_FILE_STORAGE_ENABLED):
            return settings.FORCE_SCRIPT_NAME + '/' + self.file.url.lstrip('/')
        else:
            return self.file.url
 
    @property
    def format(self):
        file_format = None
        try:
            file_format = os.path.splitext(self.filepath)[-1]
        except:  # noqa: E722
            pass
        finally:
            logger.debug('Get file format ' + str(file_format))
        return file_format
 
    @property
    def content(self):
        # cache file body
        if hasattr(self, '_file_body'):
            body = getattr(self, '_file_body')
        else:
            body = self.file.read().decode('utf-8')
            setattr(self, '_file_body', body)
        return body
 
    def _detect_csv_separator(self):
        """
        Detect the CSV separator by analyzing the first line of the file.
 
        This method implements a reliable heuristic:
        1. If semicolons are more frequent than commas in the first line, use semicolon
        2. Otherwise, default to comma
 
        Returns:
            str: The detected separator (',' or ';')
        """
        try:
            # Read the first line to analyze separators
            with self.file.open() as f:
                first_line = f.readline()
                if isinstance(first_line, bytes):
                    first_line = first_line.decode('utf-8')
 
                # Count potential separators
                comma_count = first_line.count(',')
                semicolon_count = first_line.count(';')
 
                # Use semicolon if it's clearly indicated by higher frequency
                if semicolon_count > comma_count:
                    logger.debug(
                        f'Detected semicolon separator (found {semicolon_count} semicolons vs {comma_count} commas)'
                    )
                    return ';'
                else:
                    logger.debug(
                        f'Using default comma separator (found {comma_count} commas vs {semicolon_count} semicolons)'
                    )
                    return ','
        except Exception as e:
            logger.warning(f'Failed to detect CSV separator, defaulting to comma: {e}')
            return ','
 
    def read_tasks_list_from_csv(self):
        """
        Read tasks from a CSV file with automatic separator detection.
 
        The separator is automatically detected by analyzing the first line:
        - If semicolons are clearly indicated (more frequent than commas), use semicolon
        - Otherwise, use the default comma separator
 
        Returns:
            list: List of tasks in the format [{'data': {...}}, ...]
        """
        logger.debug('Read tasks list from CSV file {}'.format(self.filepath))
        separator = self._detect_csv_separator()
        tasks = pd.read_csv(self.file.open(), sep=separator).fillna('').to_dict('records')
        tasks = [{'data': task} for task in tasks]
        return tasks
 
    def read_tasks_list_from_tsv(self):
        """
        Read tasks from a TSV (tab-separated values) file.
 
        Returns:
            list: List of tasks in the format [{'data': {...}}, ...]
        """
        logger.debug('Read tasks list from TSV file {}'.format(self.filepath))
        tasks = pd.read_csv(self.file.open(), sep='\t').fillna('').to_dict('records')
        tasks = [{'data': task} for task in tasks]
        return tasks
 
    def read_tasks_list_from_txt(self):
        logger.debug('Read tasks list from text file {}'.format(self.filepath))
        lines = self.content.splitlines()
        tasks = [{'data': {settings.DATA_UNDEFINED_NAME: line}} for line in lines]
        return tasks
 
    def read_tasks_list_from_json(self):
        logger.debug('Read tasks list from JSON file {}'.format(self.filepath))
 
        raw_data = self.content
        # Python 3.5 compatibility fix https://docs.python.org/3/whatsnew/3.6.html#json
        try:
            tasks = json.loads(raw_data)
        except TypeError:
            tasks = json.loads(raw_data.decode('utf8'))
        if isinstance(tasks, dict):
            tasks = [tasks]
        tasks_formatted = []
        for i, task in enumerate(tasks):
            if not task.get('data'):
                task = {'data': task}
            if not isinstance(task['data'], dict):
                raise ValidationError('Task item should be dict')
            tasks_formatted.append(task)
        return tasks_formatted
 
    def read_tasks_list_from_json_streaming(self, batch_size=100):
        logger.debug('Read tasks list from JSON file streaming {}'.format(self.filepath))
 
        try:
            with self.file.open('rb') as file_handle:
                # Peek a small prefix to detect top-level container ('[' array or '{' object)
                sniff = file_handle.read(4096) or b''
                # Find first non-whitespace byte
                first_byte = None
                for b in sniff:
                    if b not in (0x20, 0x09, 0x0A, 0x0D):  # space, tab, lf, cr
                        first_byte = b
                        break
 
                # Rewind after sniffing
                file_handle.seek(0)
 
                batch = []
 
                if first_byte == ord('['):
                    # Stream array items one-by-one
                    # use_float=True prevents Decimal objects which cause JSON serialization issues downstream
                    for task in ijson.items(file_handle, 'item', use_float=True):
                        formatted_task = self._format_task_for_json_streaming(task)
                        batch.append(formatted_task)
                        if len(batch) >= batch_size:
                            yield batch
                            batch = []
 
                elif first_byte == ord('{'):
                    # Single JSON object: parse once and yield a single-item batch
                    raw_data = file_handle.read()
                    try:
                        task_data = json.loads(raw_data)
                    except TypeError:
                        task_data = json.loads(raw_data.decode('utf8'))
                    formatted_task = self._format_task_for_json_streaming(task_data)
                    batch.append(formatted_task)
 
                else:
                    # Unknown/invalid JSON structure
                    raise ValidationError('Unsupported or invalid JSON structure')
 
                # Yield remaining tasks if any
                if batch:
                    yield batch
 
        except Exception as exc:
            raise ValidationError(f'Failed to parse JSON file {self.file_name}: {str(exc)}')
 
    def _format_task_for_json_streaming(self, task):
        """Format task data for JSON streaming consistency with read_tasks_list_from_json"""
        # Handle different task types as in the original read_tasks_list_from_json method
        if isinstance(task, dict):
            if not task.get('data'):
                task = {'data': task}
        else:
            # If task is not a dict (e.g., list), wrap it in {'data': task}
            task = {'data': task}
 
        if not isinstance(task['data'], dict):
            raise ValidationError('Task item should be dict')
        return task
 
    def read_task_from_hypertext_body(self):
        logger.debug('Read 1 task from hypertext file {}'.format(self.filepath))
        body = self.content
        tasks = [{'data': {settings.DATA_UNDEFINED_NAME: body}}]
        return tasks
 
    def read_task_from_uploaded_file(self):
        logger.debug('Read 1 task from uploaded file {}'.format(self.filepath))
        if settings.CLOUD_FILE_STORAGE_ENABLED:
            tasks = [{'data': {settings.DATA_UNDEFINED_NAME: self.filepath}}]
        else:
            tasks = [{'data': {settings.DATA_UNDEFINED_NAME: self.url}}]
        return tasks
 
    @property
    def format_could_be_tasks_list(self):
        return self.format in ('.csv', '.tsv', '.txt')
 
    def read_tasks(self, file_as_tasks_list=True):
        file_format = self.format
        try:
            # file as tasks list
            if file_format == '.csv' and file_as_tasks_list:
                tasks = self.read_tasks_list_from_csv()
            elif file_format == '.tsv' and file_as_tasks_list:
                tasks = self.read_tasks_list_from_tsv()
            elif file_format == '.txt' and file_as_tasks_list:
                tasks = self.read_tasks_list_from_txt()
            elif file_format == '.json':
                tasks = self.read_tasks_list_from_json()
 
            # otherwise - only one object tag should be presented in label config
            elif not self.project.one_object_in_label_config:
                raise ValidationError(
                    'Your label config has more than one data key and direct file upload supports only '
                    'one data key. To import data with multiple data keys, use a JSON or CSV file.'
                )
 
            # file as a single asset
            elif file_format in ('.html', '.htm', '.xml'):
                tasks = self.read_task_from_hypertext_body()
            else:
                tasks = self.read_task_from_uploaded_file()
 
        except Exception as exc:
            raise ValidationError('Failed to parse input file ' + self.file_name + ': ' + str(exc))
        return tasks
 
    def read_tasks_streaming(self, file_as_tasks_list=True, batch_size=100):
        """Streaming version of read_tasks that yields tasks in batches for memory efficiency"""
        file_format = self.format
 
        try:
            # For JSON files, use streaming JSON parser
            if file_format == '.json':
                for batch in self.read_tasks_list_from_json_streaming(batch_size):
                    yield batch
 
            # For other file types, use existing methods but yield in batches
            else:
                # Use existing non-streaming methods for non-JSON files
                if file_format == '.csv' and file_as_tasks_list:
                    tasks = self.read_tasks_list_from_csv()
                elif file_format == '.tsv' and file_as_tasks_list:
                    tasks = self.read_tasks_list_from_tsv()
                elif file_format == '.txt' and file_as_tasks_list:
                    tasks = self.read_tasks_list_from_txt()
                elif not self.project.one_object_in_label_config:
                    raise ValidationError(
                        'Your label config has more than one data key and direct file upload supports only '
                        'one data key. To import data with multiple data keys, use a JSON or CSV file.'
                    )
                elif file_format in ('.html', '.htm', '.xml'):
                    tasks = self.read_task_from_hypertext_body()
                else:
                    tasks = self.read_task_from_uploaded_file()
 
                # Yield tasks in batches
                for i in range(0, len(tasks), batch_size):
                    batch = tasks[i : i + batch_size]
                    yield batch
 
        except Exception as exc:
            raise ValidationError('Failed to parse input file ' + self.file_name + ': ' + str(exc))
 
    @classmethod
    def load_tasks_from_uploaded_files(
        cls, project, file_upload_ids=None, formats=None, files_as_tasks_list=True, trim_size=None
    ):
        tasks = []
        fileformats = []
        common_data_fields = set()
 
        # scan all files
        file_uploads = FileUpload.objects.filter(project=project)
        if file_upload_ids:
            file_uploads = file_uploads.filter(id__in=file_upload_ids)
        for file_upload in file_uploads:
            file_format = file_upload.format
            if formats and file_format not in formats:
                continue
            new_tasks = file_upload.read_tasks(files_as_tasks_list)
            for task in new_tasks:
                task['file_upload_id'] = file_upload.id
 
            new_data_fields = set(iter(new_tasks[0]['data'].keys())) if len(new_tasks) > 0 else set()
            if not common_data_fields:
                common_data_fields = new_data_fields
            elif not common_data_fields.intersection(new_data_fields):
                raise ValidationError(
                    _old_vs_new_data_keys_inconsistency_message(
                        new_data_fields, common_data_fields, file_upload.file.name
                    )
                )
            else:
                common_data_fields &= new_data_fields
 
            tasks += new_tasks
            fileformats.append(file_format)
 
            if trim_size is not None:
                if len(tasks) > trim_size:
                    break
 
        return tasks, dict(Counter(fileformats)), common_data_fields
 
    @classmethod
    def load_tasks_from_uploaded_files_streaming(
        cls, project, file_upload_ids=None, formats=None, files_as_tasks_list=True, batch_size=5000
    ):
        """Stream tasks from uploaded files in batches to reduce memory usage using true streaming for JSON files"""
        fileformats = []
        common_data_fields = set()
        accumulated_batch = []
        total_yielded = 0
 
        # scan all files
        file_uploads = FileUpload.objects.filter(project=project)
        if file_upload_ids:
            file_uploads = file_uploads.filter(id__in=file_upload_ids)
 
        for file_upload in file_uploads:
            file_format = file_upload.format
            if formats and file_format not in formats:
                continue
 
            fileformats.append(file_format)
 
            # Use streaming method for reading tasks
            for task_batch in file_upload.read_tasks_streaming(files_as_tasks_list, batch_size):
                # Add file_upload_id to each task in the batch
                for task in task_batch:
                    task['file_upload_id'] = file_upload.id
 
                # Validate data fields consistency for first batch from each file
                if task_batch:
                    new_data_fields = set(task_batch[0]['data'].keys())
                    if not common_data_fields:
                        common_data_fields = new_data_fields
                    elif not common_data_fields.intersection(new_data_fields):
                        raise ValidationError(
                            _old_vs_new_data_keys_inconsistency_message(
                                new_data_fields, common_data_fields, file_upload.file.name
                            )
                        )
                    else:
                        common_data_fields &= new_data_fields
 
                # Add tasks to accumulated batch
                accumulated_batch.extend(task_batch)
 
                # Yield accumulated batch when it reaches the target size
                while len(accumulated_batch) >= batch_size:
                    batch_to_yield = accumulated_batch[:batch_size]
                    accumulated_batch = accumulated_batch[batch_size:]
 
                    yield batch_to_yield, dict(Counter(fileformats)), common_data_fields
                    total_yielded += len(batch_to_yield)
 
        # Yield remaining tasks if any
        if accumulated_batch:
            yield accumulated_batch, dict(Counter(fileformats)), common_data_fields
            total_yielded += len(accumulated_batch)
 
        # If no tasks were yielded, return empty batch with metadata
        if total_yielded == 0:
            yield [], dict(Counter(fileformats)), common_data_fields
 
 
def _old_vs_new_data_keys_inconsistency_message(new_data_keys, old_data_keys, current_file):
    new_data_keys_list = ','.join(new_data_keys)
    old_data_keys_list = ','.join(old_data_keys)
    common_prefix = "You're trying to import inconsistent data:\n"
    if new_data_keys_list == old_data_keys_list:
        return ''
    elif new_data_keys_list == settings.DATA_UNDEFINED_NAME:
        return (
            common_prefix + 'uploading a single file {0} '
            'clashes with data key(s) found from other files:\n"{1}"'.format(current_file, old_data_keys_list)
        )
    elif old_data_keys_list == settings.DATA_UNDEFINED_NAME:
        return (
            common_prefix + 'uploading tabular data from {0} with data key(s) {1}, '
            'clashes with other raw binary files (images, audios, etc.)'.format(current_file, new_data_keys_list)
        )
    else:
        return (
            common_prefix + 'uploading tabular data from "{0}" with data key(s) "{1}", '
            'clashes with data key(s) found from other files:\n"{2}"'.format(
                current_file, new_data_keys_list, old_data_keys_list
            )
        )