| | |
| | | import io |
| | | import json |
| | | import logging |
| | | import os |
| | | import pathlib |
| | | import shutil |
| | | from datetime import datetime |
| | |
| | | out_dir = pathlib.Path(tmp_dir) / OUT |
| | | out_dir.mkdir(mode=0o700, parents=True, exist_ok=True) |
| | | |
| | | upload_dir = os.path.join(settings.MEDIA_ROOT, settings.UPLOAD_DIR) |
| | | |
| | | # 调试日志:打印关键参数 |
| | | logger.info(f'[COCO Export Debug] ========== Export Started ==========') |
| | | logger.info(f'[COCO Export Debug] Project ID: {self.project.id}') |
| | | logger.info(f'[COCO Export Debug] Export format: {to_format}') |
| | | logger.info(f'[COCO Export Debug] download_resources: {download_resources}') |
| | | logger.info(f'[COCO Export Debug] hostname: {hostname}') |
| | | logger.info(f'[COCO Export Debug] upload_dir: {upload_dir}') |
| | | logger.info(f'[COCO Export Debug] upload_dir exists: {os.path.exists(upload_dir)}') |
| | | if os.path.exists(upload_dir): |
| | | logger.info(f'[COCO Export Debug] upload_dir contents: {os.listdir(upload_dir)}') |
| | | logger.info(f'[COCO Export Debug] tmp_dir: {tmp_dir}') |
| | | logger.info(f'[COCO Export Debug] out_dir: {out_dir}') |
| | | logger.info(f'[COCO Export Debug] settings.MEDIA_ROOT: {settings.MEDIA_ROOT}') |
| | | logger.info(f'[COCO Export Debug] settings.UPLOAD_DIR: {settings.UPLOAD_DIR}') |
| | | |
| | | converter = Converter( |
| | | config=self.project.get_parsed_config(), |
| | | project_dir=None, |
| | | upload_dir=out_dir, |
| | | upload_dir=upload_dir, |
| | | download_resources=download_resources, |
| | | # for downloading resource we need access to the API |
| | | access_token=self.project.organization.created_by.auth_token.key, |
| | |
| | | input_name = pathlib.Path(self.file.name).name |
| | | input_file_path = pathlib.Path(tmp_dir) / input_name |
| | | |
| | | logger.info(f'[COCO Export Debug] input_name: {input_name}') |
| | | logger.info(f'[COCO Export Debug] input_file_path: {input_file_path}') |
| | | |
| | | with open(input_file_path, 'wb') as file_: |
| | | file_.write(self.file.open().read()) |
| | | |
| | | logger.info(f'[COCO Export Debug] Starting converter.convert...') |
| | | converter.convert(input_file_path, out_dir, to_format, is_dir=False) |
| | | logger.info(f'[COCO Export Debug] Converter.convert completed') |
| | | |
| | | files = get_all_files_from_dir(out_dir) |
| | | dirs = get_all_dirs_from_dir(out_dir) |
| | | |
| | | logger.info(f'[COCO Export Debug] Output files: {files}') |
| | | logger.info(f'[COCO Export Debug] Output dirs: {dirs}') |
| | | |
| | | # 检查是否有 images 目录 |
| | | images_dir = out_dir / 'images' |
| | | if images_dir.exists(): |
| | | image_files = list(images_dir.glob('*')) |
| | | logger.info(f'[COCO Export Debug] images/ directory exists with {len(image_files)} files') |
| | | logger.info(f'[COCO Export Debug] Image files: {[f.name for f in image_files]}') |
| | | else: |
| | | logger.warning(f'[COCO Export Debug] images/ directory does NOT exist') |
| | | |
| | | if len(files) == 0 and len(dirs) == 0: |
| | | logger.error(f'[COCO Export Debug] No files or dirs in output') |
| | | return None |
| | | elif len(files) == 1 and len(dirs) == 0: |
| | | output_file = files[0] |
| | | filename = pathlib.Path(input_name).stem + pathlib.Path(output_file).suffix |
| | | logger.info(f'[COCO Export Debug] Single file output: {output_file}') |
| | | else: |
| | | logger.info(f'[COCO Export Debug] Creating zip archive...') |
| | | shutil.make_archive(out_dir, 'zip', out_dir) |
| | | output_file = pathlib.Path(tmp_dir) / (str(out_dir.stem) + '.zip') |
| | | filename = pathlib.Path(input_name).stem + '.zip' |
| | | logger.info(f'[COCO Export Debug] Zip archive created: {output_file}') |
| | | |
| | | logger.info(f'[COCO Export Debug] ========== Export Completed ==========') |
| | | |
| | | # TODO(jo): can we avoid the `f.read()` here? |
| | | with open(output_file, mode='rb') as f: |
| | | return File( |