"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license. """ import hashlib import logging import os import shutil from copy import deepcopy from datetime import datetime import ujson as json from core import version from core.feature_flags import flag_set from core.utils.common import load_func from core.utils.io import get_all_files_from_dir, get_temp_dir, path_to_open_binary_file from django.conf import settings from django.db import models from django.db.models.signals import post_save from django.dispatch import receiver from django.utils.translation import gettext_lazy as _ from label_studio_sdk.converter import Converter from tasks.models import Annotation logger = logging.getLogger(__name__) ExportMixin = load_func(settings.EXPORT_MIXIN) class Export(ExportMixin, models.Model): class Status(models.TextChoices): CREATED = 'created', _('Created') IN_PROGRESS = 'in_progress', _('In progress') FAILED = 'failed', _('Failed') COMPLETED = 'completed', _('Completed') title = models.CharField( _('title'), blank=True, default='', max_length=2048, ) created_at = models.DateTimeField( _('created at'), auto_now_add=True, help_text='Creation time', ) file = models.FileField( upload_to=settings.DELAYED_EXPORT_DIR, null=True, ) md5 = models.CharField( _('md5 of file'), max_length=128, default='', ) finished_at = models.DateTimeField( _('finished at'), help_text='Complete or fail time', null=True, default=None, ) status = models.CharField( _('Export status'), max_length=64, choices=Status.choices, default=Status.CREATED, ) counters = models.JSONField( _('Exporting meta data'), default=dict, ) project = models.ForeignKey( 'projects.Project', related_name='exports', on_delete=models.CASCADE, ) created_by = models.ForeignKey( settings.AUTH_USER_MODEL, related_name='+', on_delete=models.SET_NULL, null=True, verbose_name=_('created by'), ) @receiver(post_save, sender=Export) def set_export_default_name(sender, instance, created, **kwargs): if created and not instance.title: instance.title = instance.get_default_title() instance.save() class DataExport(object): # TODO: deprecated @staticmethod def save_export_files(project, now, get_args, data, md5, name): """Generate two files: meta info and result file and store them locally for logging""" filename_results = os.path.join(settings.EXPORT_DIR, name + '.json') filename_info = os.path.join(settings.EXPORT_DIR, name + '-info.json') annotation_number = Annotation.objects.filter(project=project).count() try: platform_version = version.get_git_version() except: # noqa: E722 platform_version = 'none' logger.error('Version is not detected in save_export_files()') info = { 'project': { 'title': project.title, 'id': project.id, 'created_at': project.created_at.strftime('%Y-%m-%dT%H:%M:%SZ'), 'created_by': project.created_by.email, 'task_number': project.tasks.count(), 'annotation_number': annotation_number, }, 'platform': {'version': platform_version}, 'download': { 'GET': dict(get_args), 'time': now.strftime('%Y-%m-%dT%H:%M:%SZ'), 'result_filename': filename_results, 'md5': md5, }, } with open(filename_results, 'w', encoding='utf-8') as f: f.write(data) with open(filename_info, 'w', encoding='utf-8') as f: json.dump(info, f, ensure_ascii=False) return filename_results @staticmethod def get_export_formats(project): converter = Converter(config=project.get_parsed_config(), project_dir=None) formats = [] supported_formats = set(converter.supported_formats) for format, format_info in converter.all_formats().items(): format_info = deepcopy(format_info) format_info['name'] = format.name if format.name not in supported_formats: format_info['disabled'] = True formats.append(format_info) return sorted(formats, key=lambda f: f.get('disabled', False)) @staticmethod def generate_export_file(project, tasks, output_format, download_resources, get_args, hostname=None): """Generate export file and return it as an open file object. Be sure to close the file after using it, to avoid wasting disk space. """ # prepare for saving now = datetime.now() data = json.dumps(tasks, ensure_ascii=False) md5 = hashlib.md5(json.dumps(data).encode('utf-8')).hexdigest() # nosec name = 'project-' + str(project.id) + '-at-' + now.strftime('%Y-%m-%d-%H-%M') + f'-{md5[0:8]}' input_json = DataExport.save_export_files(project, now, get_args, data, md5, name) upload_dir = os.path.join(settings.MEDIA_ROOT, settings.UPLOAD_DIR) # 调试日志 logger.error(f'[Generate Export Debug] ========== Generate Export Started ==========') logger.error(f'[Generate Export Debug] Project ID: {project.id}') logger.error(f'[Generate Export Debug] Output format: {output_format}') logger.error(f'[Generate Export Debug] download_resources: {download_resources}') logger.error(f'[Generate Export Debug] hostname: {hostname}') logger.error(f'[Generate Export Debug] upload_dir: {upload_dir}') logger.error(f'[Generate Export Debug] upload_dir exists: {os.path.exists(upload_dir)}') if os.path.exists(upload_dir): try: logger.error(f'[Generate Export Debug] upload_dir contents: {os.listdir(upload_dir)}') except Exception as e: logger.error(f'[Generate Export Debug] Error listing upload_dir: {e}') logger.error(f'[Generate Export Debug] Task count: {len(tasks)}') if tasks: first_task = tasks[0] logger.error(f'[Generate Export Debug] First task data: {first_task.get("data", {})}') logger.error(f'[Generate Export Debug] settings.MEDIA_ROOT: {settings.MEDIA_ROOT}') logger.error(f'[Generate Export Debug] settings.UPLOAD_DIR: {settings.UPLOAD_DIR}') converter = Converter( config=project.get_parsed_config(), project_dir=None, upload_dir=upload_dir, download_resources=download_resources, access_token=project.organization.created_by.auth_token.key, hostname=hostname, ) logger.error(f'[Generate Export Debug] Starting converter.convert...') logger.error(f'[Generate Export Debug] input_json: {input_json}') with get_temp_dir() as tmp_dir: logger.error(f'[Generate Export Debug] tmp_dir: {tmp_dir}') converter.convert(input_json, tmp_dir, output_format, is_dir=False) logger.error(f'[Generate Export Debug] Converter.convert completed') files = get_all_files_from_dir(tmp_dir) logger.error(f'[Generate Export Debug] Output files: {files}') logger.error(f'[Generate Export Debug] tmp_dir listing: {os.listdir(tmp_dir)}') # 检查 images 目录 images_dir = os.path.join(tmp_dir, 'images') if os.path.exists(images_dir): image_files = os.listdir(images_dir) logger.error(f'[Generate Export Debug] images/ directory exists with {len(image_files)} files') logger.error(f'[Generate Export Debug] Image files: {image_files}') else: logger.error(f'[Generate Export Debug] images/ directory does NOT exist') # if only one file is exported - no need to create archive if len(os.listdir(tmp_dir)) == 1: output_file = files[0] ext = os.path.splitext(output_file)[-1] content_type = f'application/{ext}' out = path_to_open_binary_file(output_file) filename = name + os.path.splitext(output_file)[-1] logger.error(f'[Generate Export Debug] Single file export: {filename}') logger.error(f'[Generate Export Debug] ========== Generate Export Completed ==========') return out, content_type, filename # otherwise pack output directory into archive logger.error(f'[Generate Export Debug] Creating zip archive...') shutil.make_archive(tmp_dir, 'zip', tmp_dir) out = path_to_open_binary_file(os.path.abspath(tmp_dir + '.zip')) content_type = 'application/zip' filename = name + '.zip' logger.error(f'[Generate Export Debug] Zip created: {filename}') logger.error(f'[Generate Export Debug] ========== Generate Export Completed ==========') return out, content_type, filename class ConvertedFormat(models.Model): class Status(models.TextChoices): CREATED = 'created', _('Created') IN_PROGRESS = 'in_progress', _('In progress') FAILED = 'failed', _('Failed') COMPLETED = 'completed', _('Completed') project = models.ForeignKey( 'projects.Project', null=True, related_name='export_conversions', on_delete=models.CASCADE, ) organization = models.ForeignKey( 'organizations.Organization', null=True, on_delete=models.CASCADE, related_name='export_conversions', ) export = models.ForeignKey( Export, related_name='converted_formats', on_delete=models.CASCADE, help_text='Export snapshot for this converted file', ) file = models.FileField( upload_to=settings.DELAYED_EXPORT_DIR, null=True, ) status = models.CharField( max_length=64, choices=Status.choices, default=Status.CREATED, ) traceback = models.TextField(null=True, blank=True, help_text='Traceback report in case of errors') export_type = models.CharField(max_length=64) created_at = models.DateTimeField( _('created at'), null=True, auto_now_add=True, help_text='Creation time', ) updated_at = models.DateTimeField( _('updated at'), null=True, auto_now_add=True, help_text='Updated time', ) finished_at = models.DateTimeField( _('finished at'), help_text='Complete or fail time', null=True, default=None, ) created_by = models.ForeignKey( settings.AUTH_USER_MODEL, related_name='+', on_delete=models.SET_NULL, null=True, verbose_name=_('created by'), ) def delete(self, *args, **kwargs): if flag_set('ff_back_dev_4664_remove_storage_file_on_export_delete_29032023_short'): if self.file: self.file.delete() super().delete(*args, **kwargs)