"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. """ import logging import os import uuid from collections import Counter import ijson import pandas as pd try: import ujson as json except: # noqa: E722 import json from django.conf import settings from django.db import models from django.utils.functional import cached_property from rest_framework.exceptions import ValidationError logger = logging.getLogger(__name__) def upload_name_generator(instance, filename): project = str(instance.project_id) project_dir = os.path.join(settings.MEDIA_ROOT, settings.UPLOAD_DIR, project) os.makedirs(project_dir, exist_ok=True) path = settings.UPLOAD_DIR + '/' + project + '/' + str(uuid.uuid4())[0:8] + '-' + filename return path class FileUpload(models.Model): user = models.ForeignKey('users.User', related_name='file_uploads', on_delete=models.CASCADE) project = models.ForeignKey('projects.Project', related_name='file_uploads', on_delete=models.CASCADE) file = models.FileField(upload_to=upload_name_generator) def has_permission(self, user): user.project = self.project # link for activity log return self.project.has_permission(user) @cached_property def filepath(self): return self.file.name @cached_property def file_name(self): return os.path.basename(self.file.name) @property def url(self): if settings.FORCE_SCRIPT_NAME and not (settings.HOSTNAME and settings.CLOUD_FILE_STORAGE_ENABLED): return settings.FORCE_SCRIPT_NAME + '/' + self.file.url.lstrip('/') else: return self.file.url @property def format(self): file_format = None try: file_format = os.path.splitext(self.filepath)[-1] except: # noqa: E722 pass finally: logger.debug('Get file format ' + str(file_format)) return file_format @property def content(self): # cache file body if hasattr(self, '_file_body'): body = getattr(self, '_file_body') else: body = self.file.read().decode('utf-8') setattr(self, '_file_body', body) return body def _detect_csv_separator(self): """ Detect the CSV separator by analyzing the first line of the file. This method implements a reliable heuristic: 1. If semicolons are more frequent than commas in the first line, use semicolon 2. Otherwise, default to comma Returns: str: The detected separator (',' or ';') """ try: # Read the first line to analyze separators with self.file.open() as f: first_line = f.readline() if isinstance(first_line, bytes): first_line = first_line.decode('utf-8') # Count potential separators comma_count = first_line.count(',') semicolon_count = first_line.count(';') # Use semicolon if it's clearly indicated by higher frequency if semicolon_count > comma_count: logger.debug( f'Detected semicolon separator (found {semicolon_count} semicolons vs {comma_count} commas)' ) return ';' else: logger.debug( f'Using default comma separator (found {comma_count} commas vs {semicolon_count} semicolons)' ) return ',' except Exception as e: logger.warning(f'Failed to detect CSV separator, defaulting to comma: {e}') return ',' def read_tasks_list_from_csv(self): """ Read tasks from a CSV file with automatic separator detection. The separator is automatically detected by analyzing the first line: - If semicolons are clearly indicated (more frequent than commas), use semicolon - Otherwise, use the default comma separator Returns: list: List of tasks in the format [{'data': {...}}, ...] """ logger.debug('Read tasks list from CSV file {}'.format(self.filepath)) separator = self._detect_csv_separator() tasks = pd.read_csv(self.file.open(), sep=separator).fillna('').to_dict('records') tasks = [{'data': task} for task in tasks] return tasks def read_tasks_list_from_tsv(self): """ Read tasks from a TSV (tab-separated values) file. Returns: list: List of tasks in the format [{'data': {...}}, ...] """ logger.debug('Read tasks list from TSV file {}'.format(self.filepath)) tasks = pd.read_csv(self.file.open(), sep='\t').fillna('').to_dict('records') tasks = [{'data': task} for task in tasks] return tasks def read_tasks_list_from_txt(self): logger.debug('Read tasks list from text file {}'.format(self.filepath)) lines = self.content.splitlines() tasks = [{'data': {settings.DATA_UNDEFINED_NAME: line}} for line in lines] return tasks def read_tasks_list_from_json(self): logger.debug('Read tasks list from JSON file {}'.format(self.filepath)) raw_data = self.content # Python 3.5 compatibility fix https://docs.python.org/3/whatsnew/3.6.html#json try: tasks = json.loads(raw_data) except TypeError: tasks = json.loads(raw_data.decode('utf8')) if isinstance(tasks, dict): tasks = [tasks] tasks_formatted = [] for i, task in enumerate(tasks): if not task.get('data'): task = {'data': task} if not isinstance(task['data'], dict): raise ValidationError('Task item should be dict') tasks_formatted.append(task) return tasks_formatted def read_tasks_list_from_json_streaming(self, batch_size=100): logger.debug('Read tasks list from JSON file streaming {}'.format(self.filepath)) try: with self.file.open('rb') as file_handle: # Peek a small prefix to detect top-level container ('[' array or '{' object) sniff = file_handle.read(4096) or b'' # Find first non-whitespace byte first_byte = None for b in sniff: if b not in (0x20, 0x09, 0x0A, 0x0D): # space, tab, lf, cr first_byte = b break # Rewind after sniffing file_handle.seek(0) batch = [] if first_byte == ord('['): # Stream array items one-by-one # use_float=True prevents Decimal objects which cause JSON serialization issues downstream for task in ijson.items(file_handle, 'item', use_float=True): formatted_task = self._format_task_for_json_streaming(task) batch.append(formatted_task) if len(batch) >= batch_size: yield batch batch = [] elif first_byte == ord('{'): # Single JSON object: parse once and yield a single-item batch raw_data = file_handle.read() try: task_data = json.loads(raw_data) except TypeError: task_data = json.loads(raw_data.decode('utf8')) formatted_task = self._format_task_for_json_streaming(task_data) batch.append(formatted_task) else: # Unknown/invalid JSON structure raise ValidationError('Unsupported or invalid JSON structure') # Yield remaining tasks if any if batch: yield batch except Exception as exc: raise ValidationError(f'Failed to parse JSON file {self.file_name}: {str(exc)}') def _format_task_for_json_streaming(self, task): """Format task data for JSON streaming consistency with read_tasks_list_from_json""" # Handle different task types as in the original read_tasks_list_from_json method if isinstance(task, dict): if not task.get('data'): task = {'data': task} else: # If task is not a dict (e.g., list), wrap it in {'data': task} task = {'data': task} if not isinstance(task['data'], dict): raise ValidationError('Task item should be dict') return task def read_task_from_hypertext_body(self): logger.debug('Read 1 task from hypertext file {}'.format(self.filepath)) body = self.content tasks = [{'data': {settings.DATA_UNDEFINED_NAME: body}}] return tasks def read_task_from_uploaded_file(self): logger.debug('Read 1 task from uploaded file {}'.format(self.filepath)) if settings.CLOUD_FILE_STORAGE_ENABLED: tasks = [{'data': {settings.DATA_UNDEFINED_NAME: self.filepath}}] else: tasks = [{'data': {settings.DATA_UNDEFINED_NAME: self.url}}] return tasks @property def format_could_be_tasks_list(self): return self.format in ('.csv', '.tsv', '.txt') def read_tasks(self, file_as_tasks_list=True): file_format = self.format try: # file as tasks list if file_format == '.csv' and file_as_tasks_list: tasks = self.read_tasks_list_from_csv() elif file_format == '.tsv' and file_as_tasks_list: tasks = self.read_tasks_list_from_tsv() elif file_format == '.txt' and file_as_tasks_list: tasks = self.read_tasks_list_from_txt() elif file_format == '.json': tasks = self.read_tasks_list_from_json() # otherwise - only one object tag should be presented in label config elif not self.project.one_object_in_label_config: raise ValidationError( 'Your label config has more than one data key and direct file upload supports only ' 'one data key. To import data with multiple data keys, use a JSON or CSV file.' ) # file as a single asset elif file_format in ('.html', '.htm', '.xml'): tasks = self.read_task_from_hypertext_body() else: tasks = self.read_task_from_uploaded_file() except Exception as exc: raise ValidationError('Failed to parse input file ' + self.file_name + ': ' + str(exc)) return tasks def read_tasks_streaming(self, file_as_tasks_list=True, batch_size=100): """Streaming version of read_tasks that yields tasks in batches for memory efficiency""" file_format = self.format try: # For JSON files, use streaming JSON parser if file_format == '.json': for batch in self.read_tasks_list_from_json_streaming(batch_size): yield batch # For other file types, use existing methods but yield in batches else: # Use existing non-streaming methods for non-JSON files if file_format == '.csv' and file_as_tasks_list: tasks = self.read_tasks_list_from_csv() elif file_format == '.tsv' and file_as_tasks_list: tasks = self.read_tasks_list_from_tsv() elif file_format == '.txt' and file_as_tasks_list: tasks = self.read_tasks_list_from_txt() elif not self.project.one_object_in_label_config: raise ValidationError( 'Your label config has more than one data key and direct file upload supports only ' 'one data key. To import data with multiple data keys, use a JSON or CSV file.' ) elif file_format in ('.html', '.htm', '.xml'): tasks = self.read_task_from_hypertext_body() else: tasks = self.read_task_from_uploaded_file() # Yield tasks in batches for i in range(0, len(tasks), batch_size): batch = tasks[i : i + batch_size] yield batch except Exception as exc: raise ValidationError('Failed to parse input file ' + self.file_name + ': ' + str(exc)) @classmethod def load_tasks_from_uploaded_files( cls, project, file_upload_ids=None, formats=None, files_as_tasks_list=True, trim_size=None ): tasks = [] fileformats = [] common_data_fields = set() # scan all files file_uploads = FileUpload.objects.filter(project=project) if file_upload_ids: file_uploads = file_uploads.filter(id__in=file_upload_ids) for file_upload in file_uploads: file_format = file_upload.format if formats and file_format not in formats: continue new_tasks = file_upload.read_tasks(files_as_tasks_list) for task in new_tasks: task['file_upload_id'] = file_upload.id new_data_fields = set(iter(new_tasks[0]['data'].keys())) if len(new_tasks) > 0 else set() if not common_data_fields: common_data_fields = new_data_fields elif not common_data_fields.intersection(new_data_fields): raise ValidationError( _old_vs_new_data_keys_inconsistency_message( new_data_fields, common_data_fields, file_upload.file.name ) ) else: common_data_fields &= new_data_fields tasks += new_tasks fileformats.append(file_format) if trim_size is not None: if len(tasks) > trim_size: break return tasks, dict(Counter(fileformats)), common_data_fields @classmethod def load_tasks_from_uploaded_files_streaming( cls, project, file_upload_ids=None, formats=None, files_as_tasks_list=True, batch_size=5000 ): """Stream tasks from uploaded files in batches to reduce memory usage using true streaming for JSON files""" fileformats = [] common_data_fields = set() accumulated_batch = [] total_yielded = 0 # scan all files file_uploads = FileUpload.objects.filter(project=project) if file_upload_ids: file_uploads = file_uploads.filter(id__in=file_upload_ids) for file_upload in file_uploads: file_format = file_upload.format if formats and file_format not in formats: continue fileformats.append(file_format) # Use streaming method for reading tasks for task_batch in file_upload.read_tasks_streaming(files_as_tasks_list, batch_size): # Add file_upload_id to each task in the batch for task in task_batch: task['file_upload_id'] = file_upload.id # Validate data fields consistency for first batch from each file if task_batch: new_data_fields = set(task_batch[0]['data'].keys()) if not common_data_fields: common_data_fields = new_data_fields elif not common_data_fields.intersection(new_data_fields): raise ValidationError( _old_vs_new_data_keys_inconsistency_message( new_data_fields, common_data_fields, file_upload.file.name ) ) else: common_data_fields &= new_data_fields # Add tasks to accumulated batch accumulated_batch.extend(task_batch) # Yield accumulated batch when it reaches the target size while len(accumulated_batch) >= batch_size: batch_to_yield = accumulated_batch[:batch_size] accumulated_batch = accumulated_batch[batch_size:] yield batch_to_yield, dict(Counter(fileformats)), common_data_fields total_yielded += len(batch_to_yield) # Yield remaining tasks if any if accumulated_batch: yield accumulated_batch, dict(Counter(fileformats)), common_data_fields total_yielded += len(accumulated_batch) # If no tasks were yielded, return empty batch with metadata if total_yielded == 0: yield [], dict(Counter(fileformats)), common_data_fields def _old_vs_new_data_keys_inconsistency_message(new_data_keys, old_data_keys, current_file): new_data_keys_list = ','.join(new_data_keys) old_data_keys_list = ','.join(old_data_keys) common_prefix = "You're trying to import inconsistent data:\n" if new_data_keys_list == old_data_keys_list: return '' elif new_data_keys_list == settings.DATA_UNDEFINED_NAME: return ( common_prefix + 'uploading a single file {0} ' 'clashes with data key(s) found from other files:\n"{1}"'.format(current_file, old_data_keys_list) ) elif old_data_keys_list == settings.DATA_UNDEFINED_NAME: return ( common_prefix + 'uploading tabular data from {0} with data key(s) {1}, ' 'clashes with other raw binary files (images, audios, etc.)'.format(current_file, new_data_keys_list) ) else: return ( common_prefix + 'uploading tabular data from "{0}" with data key(s) "{1}", ' 'clashes with data key(s) found from other files:\n"{2}"'.format( current_file, new_data_keys_list, old_data_keys_list ) )