"""This file and its contents are licensed under the Apache License 2.0. Please see the included NOTICE for copyright information and LICENSE for a copy of the license.
|
"""
|
import logging
|
import os
|
from copy import deepcopy
|
|
import ujson as json
|
from core.utils.io import delete_dir_content, iter_files, json_load, remove_file_or_dir
|
|
from .base import BaseForm, BaseStorage, CloudStorage
|
|
logger = logging.getLogger(__name__)
|
|
|
class JSONStorage(BaseStorage):
|
|
description = 'JSON task file'
|
|
def __init__(self, **kwargs):
|
super(JSONStorage, self).__init__(**kwargs)
|
tasks = {}
|
if os.path.exists(self.path):
|
tasks = json_load(self.path, int_keys=True)
|
if len(tasks) == 0:
|
self.data = {}
|
elif isinstance(tasks, dict):
|
self.data = tasks
|
elif isinstance(self.data, list):
|
self.data = {int(task['id']): task for task in tasks}
|
self._save()
|
|
def _save(self):
|
with open(self.path, mode='w', encoding='utf8') as fout:
|
json.dump(self.data, fout, ensure_ascii=False)
|
|
@property
|
def readable_path(self):
|
return self.path
|
|
def get(self, id):
|
return self.data.get(int(id))
|
|
def set(self, id, value):
|
self.data[int(id)] = value
|
self._save()
|
|
def __contains__(self, id):
|
return id in self.data
|
|
def set_many(self, ids, values):
|
for id, value in zip(ids, values):
|
self.data[int(id)] = value
|
self._save()
|
|
def ids(self):
|
return self.data.keys()
|
|
def max_id(self):
|
return max(self.ids(), default=-1)
|
|
def items(self):
|
return self.data.items()
|
|
def remove(self, key):
|
self.data.pop(int(key), None)
|
self._save()
|
|
def remove_all(self, ids=None):
|
if ids is None:
|
self.data = {}
|
else:
|
[self.data.pop(i, None) for i in ids]
|
self._save()
|
|
def empty(self):
|
return len(self.data) == 0
|
|
def sync(self):
|
pass
|
|
|
def already_exists_error(what, path):
|
raise RuntimeError(
|
'{path} {what} already exists. Use "--force" option to recreate it.'.format(path=path, what=what)
|
)
|
|
|
class DirJSONsStorage(BaseStorage):
|
|
description = 'Directory with JSON task files'
|
|
def __init__(self, **kwargs):
|
super(DirJSONsStorage, self).__init__(**kwargs)
|
os.makedirs(self.path, exist_ok=True)
|
self.cache = {}
|
|
@property
|
def readable_path(self):
|
return self.path
|
|
def get(self, id):
|
if id in self.cache:
|
return self.cache[id]
|
else:
|
filename = os.path.join(self.path, str(id) + '.json')
|
if os.path.exists(filename):
|
data = json_load(filename)
|
self.cache[id] = data
|
return data
|
|
def __contains__(self, id):
|
return id in set(self.ids())
|
|
def set(self, id, value):
|
filename = os.path.join(self.path, str(id) + '.json')
|
with open(filename, 'w', encoding='utf8') as fout:
|
json.dump(value, fout, indent=2, sort_keys=True)
|
self.cache[id] = value
|
|
def set_many(self, keys, values):
|
self.cache.clear()
|
raise NotImplementedError
|
|
def ids(self):
|
for f in iter_files(self.path, '.json'):
|
yield int(os.path.splitext(os.path.basename(f))[0])
|
|
def max_id(self):
|
return max(self.ids(), default=-1)
|
|
def sync(self):
|
pass
|
|
def items(self):
|
for id in self.ids():
|
filename = os.path.join(self.path, str(id) + '.json')
|
yield id, self.cache[id] if id in self.cache else json_load(filename)
|
|
def remove(self, id):
|
filename = os.path.join(self.path, str(id) + '.json')
|
if os.path.exists(filename):
|
os.remove(filename)
|
self.cache.pop(id, None)
|
|
def remove_all(self, ids=None):
|
if ids is None:
|
self.cache.clear()
|
delete_dir_content(self.path)
|
else:
|
for i in ids:
|
self.cache.pop(i, None)
|
path = os.path.join(self.path, str(i) + '.json')
|
try:
|
remove_file_or_dir(path)
|
except OSError:
|
logger.warning('Storage file already removed: ' + path)
|
|
def empty(self):
|
return next(self.ids(), None) is None
|
|
|
class TasksJSONStorage(JSONStorage):
|
|
form = BaseForm
|
description = 'Local [loading tasks from "tasks.json" file]'
|
|
def __init__(self, path, project_path, **kwargs):
|
super(TasksJSONStorage, self).__init__(
|
project_path=project_path, path=os.path.join(project_path, 'tasks.json')
|
)
|
|
|
class ExternalTasksJSONStorage(CloudStorage):
|
|
form = BaseForm
|
description = 'Local [loading tasks from "tasks.json" file]'
|
|
def __init__(self, name, path, project_path, prefix=None, create_local_copy=False, regex='.*', **kwargs):
|
super(ExternalTasksJSONStorage, self).__init__(
|
name=name,
|
project_path=project_path,
|
path=os.path.join(project_path, 'tasks.json'),
|
use_blob_urls=False,
|
prefix=None,
|
regex=None,
|
create_local_copy=False,
|
sync_in_thread=False,
|
**kwargs,
|
)
|
# data is used as a local cache for tasks.json file
|
self.data = {}
|
|
def _save(self):
|
with open(self.path, mode='w', encoding='utf8') as fout:
|
json.dump(self.data, fout, ensure_ascii=False)
|
|
def _get_client(self):
|
pass
|
|
def validate_connection(self):
|
pass
|
|
@property
|
def url_prefix(self):
|
return ''
|
|
@property
|
def readable_path(self):
|
return self.path
|
|
def _get_value(self, key, inplace=False):
|
return self.data[int(key)] if inplace else deepcopy(self.data[int(key)])
|
|
def _set_value(self, key, value):
|
self.data[int(key)] = value
|
|
def set(self, id, value):
|
with self.thread_lock:
|
super(ExternalTasksJSONStorage, self).set(id, value)
|
self._save()
|
|
def set_many(self, ids, values):
|
with self.thread_lock:
|
for id, value in zip(ids, values):
|
super(ExternalTasksJSONStorage, self)._pre_set(id, value)
|
self._save_ids()
|
self._save()
|
|
def _extract_task_id(self, full_key):
|
return int(full_key.split(self.key_prefix, 1)[-1])
|
|
def iter_full_keys(self):
|
return (self.key_prefix + key for key in self._get_objects())
|
|
def _get_objects(self):
|
self.data = json_load(self.path, int_keys=True)
|
return (str(id) for id in self.data)
|
|
def _remove_id_from_keys_map(self, id):
|
full_key = self.key_prefix + str(id)
|
assert id in self._ids_keys_map, 'No such task id: ' + str(id)
|
assert self._ids_keys_map[id]['key'] == full_key, (self._ids_keys_map[id]['key'], full_key)
|
self._selected_ids.remove(id)
|
self._ids_keys_map.pop(id)
|
self._keys_ids_map.pop(full_key)
|
|
def remove(self, id):
|
with self.thread_lock:
|
id = int(id)
|
|
logger.debug('Remove id=' + str(id) + ' from ids.json')
|
self._remove_id_from_keys_map(id)
|
self._save_ids()
|
|
logger.debug('Remove id=' + str(id) + ' from tasks.json')
|
self.data.pop(id, None)
|
self._save()
|
|
def remove_all(self, ids=None):
|
with self.thread_lock:
|
remove_ids = self.data if ids is None else ids
|
|
logger.debug('Remove ' + str(len(remove_ids)) + ' records from ids.json')
|
for id in remove_ids:
|
self._remove_id_from_keys_map(id)
|
self._save_ids()
|
|
logger.debug('Remove all data from tasks.json')
|
# remove record from tasks.json
|
if ids is None:
|
self.data = {}
|
else:
|
for id in remove_ids:
|
self.data.pop(id, None)
|
self._save()
|
|
|
class AnnotationsDirStorage(DirJSONsStorage):
|
|
form = BaseForm
|
description = 'Local [annotations are in "annotations" directory]'
|
|
def __init__(self, name, path, project_path, **kwargs):
|
super(AnnotationsDirStorage, self).__init__(
|
name=name, project_path=project_path, path=os.path.join(project_path, 'annotations')
|
)
|