import shutil
from collections import defaultdict
from pathlib import Path
import cv2 as cv
import fiftyone as fo
import fiftyone.core.dataset as fod
from fiftyone import ViewField as F
from fiftyone.utils.labels import segmentations_to_detections
from prettytable import PrettyTable
from tqdm import tqdm
import hello.fiftyone.tarinfo as hot
import hello.io.utils as hou
def _map_detections(field_data, mapping):
detections = field_data.detections
new_detections = []
for _detection in detections:
label = _detection.label
if label in mapping:
_detection.label = mapping[label]
new_detections.append(_detection)
return fo.Detections(detections=new_detections)
def _map_segmentation(field_data, mapping):
mask = field_data.mask
new_mask = mask.copy()
for _old, _new in mapping.items():
if _old != _new:
new_mask[mask == _old] = _new
return fo.Segmentation(mask=new_mask)
[docs]
def map_labels(dataset, mapping, field_name="ground_truth"):
for sample in dataset:
field_data = sample[field_name]
if field_data:
if isinstance(field_data, fo.Detections):
field_data = _map_detections(field_data, mapping)
elif isinstance(field_data, fo.Segmentation):
field_data = _map_segmentation(field_data, mapping)
else:
raise NotImplementedError
sample[field_name] = field_data
sample.save()
return dataset
[docs]
def gen_detections_mapping(old_classes, new_classes):
"""Generate detections mapping.
Args:
old_classes (list): ``['c0', 'c1', 'c2', 'c3', 'c4', 'c5']``
new_classes (list): ``['c0', 'c1', 'c2', ['c3', 'c4', 'c5']]``
Returns:
a :class:`dict`
"""
mapping = {}
for names in new_classes:
names = [names] if isinstance(names, str) else names
assert isinstance(names, list)
label = names[0]
for name in names:
assert name not in mapping
mapping[name] = label
for name in old_classes:
assert name in mapping
return mapping
[docs]
def gen_segmentation_mapping(old_classes, new_classes):
"""Generate segmentation mapping.
Args:
old_classes (list): ``['c0', 'c1', 'c2', 'c3', 'c4', 'c5', 'be ignored']``
new_classes (list): ``['c0', 'c1', 'c2', ['c3', 'c4', 'c5'], 'be ignored']``
Returns:
a :class:`dict`
"""
old_classes, new_classes = old_classes[:-1], new_classes[:-1]
remap = {}
for i, names in enumerate(new_classes):
names = [names] if isinstance(names, str) else names
assert isinstance(names, list)
for name in names:
assert name not in remap
remap[name] = i
mapping = {}
for i, name in enumerate(old_classes):
assert isinstance(name, str)
mapping[i] = remap[name]
return mapping
[docs]
def update_dataset_default(dataset, classes, background=None, ignore_index=255):
new_classes = [c[0] if isinstance(c, list) else c for c in classes]
if background is not None:
new_classes = [c for c in new_classes if c != background]
new_mask_targets = {i: c for i, c in enumerate(new_classes[:-1])}
new_mask_targets[ignore_index] = new_classes[-1]
dataset.default_mask_targets = new_mask_targets
dataset.default_classes = new_classes
return dataset
[docs]
def remap_detections_dataset(dataset, new_classes=None, field_name="ground_truth", background=None, least_one=False):
"""Steps: map labels -> check dataset.classes -> filter valid samples
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
new_classes (None): refer to :func:`gen_label_mapping`
field_name (str, optional): defaults to "ground_truth"
background (str, optional): defaults to None
Returns:
a :class:`fiftyone.core.dataset.Dataset`
"""
dataset.save()
if new_classes is not None:
old_classes = dataset.default_classes
mapping = gen_detections_mapping(old_classes, new_classes)
dataset = map_labels(dataset, mapping, field_name=field_name)
dataset = update_dataset_default(dataset, new_classes, background=background)
dataset = dataset.filter_labels(field_name, F("label") != background, only_matches=least_one).clone()
return dataset
[docs]
def remap_segmentation_dataset(dataset, new_classes=None, field_name="ground_truth", ignore_index=255, least_one=False):
"""Steps: map labels -> check dataset.mask_targets -> filter valid samples
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
new_classes (None): refer to :func:`gen_mask_mapping`
field_name (str, optional): defaults to "ground_truth"
ignore_index (int, optional): defaults to 255
Returns:
a :class:`fiftyone.core.dataset.Dataset`
"""
dataset.save()
def _check_sample(field_data, index_stop, only_matches):
if not only_matches:
return True
if field_data:
mask = field_data.mask
return ((0 < mask) & (mask < index_stop)).sum() > 0
return False
if new_classes is not None:
old_classes = dataset.default_classes
mapping = gen_segmentation_mapping(old_classes, new_classes)
dataset = map_labels(dataset, mapping, field_name=field_name)
dataset = update_dataset_default(dataset, new_classes, ignore_index=ignore_index)
dataset = dataset.select([s.id for s in dataset if _check_sample(s[field_name], ignore_index, least_one)]).clone()
return dataset
[docs]
def merge_samples(datasets, **kwargs):
A = datasets[0]
A.save()
A = A.clone()
def _key_fcn(sample):
return Path(sample.filepath).name
for B in datasets[1:]:
B.save()
B = B.clone()
A.merge_samples(B, key_fcn=_key_fcn, **kwargs)
return A
[docs]
def count_values(dataset, field_or_expr, sort_by="label"):
# field_or_expr: "tags" or "ground_truth.detections.label"
count_label = dataset.count_values(field_or_expr)
count_label = [(k, v) for k, v in count_label.items()]
if sort_by is not None:
index = 0 if sort_by == "label" else 1
count_label = sorted(count_label, key=lambda x: x[index])
table_data = PrettyTable()
table_data.field_names = ["label", "count"]
table_data.align["label"] = "l"
table_data.align["count"] = "r"
table_data.add_rows(count_label)
print(table_data)
print(sum([x[1] for x in count_label]))
return count_label
[docs]
def tag_from(dataset, by_dir=None, by_json=None, by_dict=None, by_list=None, is_in=None):
data = None
if by_dir is not None:
data = {filepath.name: [filepath.parent.name] for filepath in Path(by_dir).glob("**/*.jpg")}
elif by_json is not None:
data = {Path(filename).name: tags for filename, tags in hou.load_json(by_json)["data"]}
elif by_dict is not None:
data = {Path(filename).name: tags for filename, tags in by_dict.items()}
elif by_list is not None:
data = {Path(filename).name: tags for filename, tags in by_list}
else:
print(f"[W] ``{by_dir=}`` & ``{by_json=}``, do nothing")
if data:
if is_in:
is_in = set(is_in)
for sample in dataset:
tags = data.get(Path(sample.filepath).name)
if tags:
if is_in:
tags = list(is_in & set(tags))
sample.tags = list(set(sample.tags + tags))
sample.save()
return dataset
[docs]
def tag_from_text(dataset, text_file, tag_map="synsets.txt", remove_prefix=False):
"""Tag dataset from a text file(cvat export: ImageNet 1.0).
Examples::
000471_0622023132018.jpg 0
000472_0622023132018.jpg 1
000473_0622023132019.jpg 1
000474_0622023132019.jpg 0 1
000475_0622023132019.jpg 0
000476_0622023132019.jpg 0
"""
if isinstance(tag_map, str):
if not Path(tag_map).is_file():
tag_map = Path(text_file).parent / tag_map
with open(tag_map, "r") as f:
names = [name.strip() for name in f.readlines() if name.strip()]
tag_map = {str(i): name for i, name in enumerate(names, 0)}
data = defaultdict(list)
with open(text_file, "r") as f:
for l in f.readlines():
if ".jpg" in l:
a, *b = l.strip().split(" ")
if b:
a, b = Path(a).stem, tag_map[b[-1]]
if remove_prefix:
a = a.split("_", maxsplit=1)[1]
data[b].append(a)
filepaths, ids = dataset.values(["filepath", "id"])
id_map = {Path(k).stem: v for k, v in zip(filepaths, ids)}
for tag, stems in data.items():
matched_ids = [id_map[stem] for stem in stems]
view = dataset.select(matched_ids)
view.tag_samples(tag)
[docs]
def change_tag(dataset, files, data_path="data", add=None, rm=None):
"""Change the tags of some samples in this dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
files (list): a list of archived dataset file
data_path (str, optional): defaults to "data"
add (str, optional): defaults to None
rm (str, optional): defaults to None
"""
stems = [Path(filename).stem for filename in hot.check_files(files, data_path)]
filepaths, ids = dataset.values(["filepath", "id"])
id_map = {Path(k).stem: v for k, v in zip(filepaths, ids)}
matched_ids = [id_map[stem] for stem in stems if stem in id_map]
view = dataset.select(matched_ids)
if rm:
view.untag_samples(rm)
if add:
view.tag_samples(add)
print(f"Change tags({add=}, {rm=}): {len(view)=}")
[docs]
def random_split(dataset, splits=None, seed=51):
"""Adds the split tags to all samples in this dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
splits (dict, optional): defaults to None
seed (int, optional): defaults to 51
Returns:
a :class:`DatasetView`
"""
view = dataset.sort_by("filepath")
view = view.shuffle(seed=seed)
view.untag_samples(["train", "val", "test"])
if splits is None:
splits = {"val": 0.1, "train": 0.9}
ids = view.values("id")
pos_val = splits.get("val", 0.1)
pos_train = splits.get("train", 0.9)
if isinstance(pos_val, float):
num_samples = len(ids)
pos_val = int(1 + pos_val * num_samples)
pos_train = int(1 + pos_train * num_samples)
ids = ids[:(pos_val+pos_train)]
val_ids = ids[:pos_val]
train_ids = ids[pos_val:]
view.select(val_ids).tag_samples("val")
view.select(train_ids).tag_samples("train")
view.exclude(val_ids + train_ids).tag_samples("test")
count_values(dataset, "tags", sort_by="count")
return dataset
[docs]
def split_dataset(dataset, splits=None, limit=3000, seed=51, field_name="ground_truth", from_field=None):
"""Adds the split tags to all samples in this dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
splits (dict, optional): defaults to None
limit (int, optional): defaults to 3000
seed (int, optional): defaults to 51
field_name (str, optional): defaults to "ground_truth"
from_field (str, optional): defaults to None
Returns:
a :class:`DatasetView`
"""
view = dataset.sort_by("filepath")
view = view.shuffle(seed=seed)
view.untag_samples(["train", "val", "test"])
if splits is None:
splits = {"val": 0.1, "train": 0.9}
if from_field is not None:
print("todo: segmentations_to_detections()")
segmentations_to_detections(view, from_field, field_name, mask_targets=view.default_mask_targets, mask_types="stuff")
val_ids, train_ids = [], []
for label, _ in count_values(view, f"{field_name}.detections.label", sort_by="count"):
_detections = F(f"{field_name}.detections").filter(F("label") == label)
subset = view.exclude(val_ids + train_ids).match(_detections.length() > 0)
ids = subset.take(limit).values("id")
pos_val = splits.get("val", 0.1)
pos_train = splits.get("train", 0.9)
if isinstance(pos_val, float):
num_samples = len(ids)
pos_val = int(1 + pos_val * num_samples)
pos_train = int(1 + pos_train * num_samples)
ids = ids[:(pos_val+pos_train)]
val_ids.extend(ids[:pos_val])
train_ids.extend(ids[pos_val:])
view.select(val_ids).tag_samples("val")
view.select(train_ids).tag_samples("train")
view.exclude(val_ids + train_ids).tag_samples("test")
count_values(dataset, "tags", sort_by="count")
return dataset
[docs]
def filter_segmentation_samples(out_dir, data_root, classes, mask_targets, threshold=0.05, splits=["train", "val"],
img_dir="data", ann_dir="labels", img_suffix=".jpg", seg_map_suffix=".png"):
"""Filter samples, based on area of interest ratio.
>>> <data_root>/
>>> ├── objectInfo150.txt
>>> ├── sceneCategories.txt
>>> ├── train
>>> │ ├── data
>>> │ └── labels
>>> └── val
>>> ├── data
>>> └── labels
Args:
out_dir (str): _description_
data_root (str): _description_
classes (list[str]): _description_
mask_targets (dict[int, str]): _description_
threshold (float, optional): _description_. Defaults to 0.05
splits (list, optional): _description_. Defaults to ["train", "val"]
img_dir (str, optional): _description_. Defaults to "data"
ann_dir (str, optional): _description_. Defaults to "labels"
img_suffix (str, optional): _description_. Defaults to ".jpg"
seg_map_suffix (str, optional): _description_. Defaults to ".png"
Returns:
a :class:`str`
"""
_mapping = {name: index for index, name in mask_targets.items()}
_labels = [_mapping[name] for name in classes]
out_dir = Path(out_dir)
shutil.rmtree(out_dir, ignore_errors=True)
for split in splits:
(out_dir / f"{split}/data").mkdir(parents=True, exist_ok=False)
(out_dir / f"{split}/labels").mkdir(parents=True, exist_ok=False)
data_root = Path(data_root)
img_files, seg_map_files = [], []
for split in splits:
info_py = data_root / f"{split}/info.py"
if info_py.is_file():
shutil.copyfile(info_py, out_dir / f"{split}/info.py")
img_files.extend(data_root.glob(f"{split}/{img_dir}/*{img_suffix}"))
seg_map_files.extend(data_root.glob(f"{split}/{ann_dir}/*{seg_map_suffix}"))
print(f"[INFO] add [{split}]: img={len(img_files)}, ann={len(seg_map_files)}")
img_files = {f.stem: f for f in sorted(img_files)}
seg_map_files = {f.stem: f for f in sorted(seg_map_files)}
print(f"[INFO] unique: img={len(img_files)}, ann={len(seg_map_files)}")
data = []
for stem, f in seg_map_files.items():
if stem in img_files:
mask = cv.imread(str(f), flags=0)
p = sum([(mask == l).mean() for l in _labels])
if p >= threshold:
data.extend((img_files[stem], f))
print(f"[INFO] samples: {len(data)//2}")
for f in tqdm(data):
tempfile = f.relative_to(data_root)
shutil.copyfile(f, out_dir / tempfile)
return out_dir
[docs]
def has_sample_field(dataset, field_name):
"""Determines whether the collection has a sample field with the given name.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
field_name: the field name
Returns:
True/False
"""
return field_name in dataset.get_field_schema()
[docs]
def add_sample_field(dataset, field_name, ftype):
"""Adds a new sample field or embedded field to the dataset, if necessary.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
field_name: the field name or `embedded.field.name`
ftype: the field type to create. Must be a subclass of :class:`fiftyone.core.fields.Field`
"""
dataset.add_sample_field(field_name, ftype)
[docs]
def clear_sample_field(dataset, field_name):
"""Clears the values of the field from all samples in the dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
field_name: the field name or `embedded.field.name`
"""
dataset.clear_sample_field(field_name)
[docs]
def clone_sample_field(dataset, field_name, new_field_name):
"""Clones the given sample field into a new field of the dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
field_name: the field name or `embedded.field.name`
new_field_name: the new field name or `embedded.field.name`
"""
dataset.clone_sample_field(field_name, new_field_name)
[docs]
def delete_sample_field(dataset, field_name, error_level=0):
"""Deletes the field from all samples in the dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
field_name: the field name or `embedded.field.name`
error_level (int, optional): the error level to use
"""
dataset.delete_sample_field(field_name, error_level=error_level)
[docs]
def rename_sample_field(dataset, field_name, new_field_name):
"""Renames the sample field to the given new name.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
field_name: the field name or `embedded.field.name`
new_field_name: the new field name or `embedded.field.name`
"""
dataset.rename_sample_field(field_name, new_field_name)
[docs]
def merge_labels(dataset, in_field, out_field):
"""Merges the labels from the given input field into the given output field of the collection.
If this collection is a dataset, the input field is deleted after the
merge.
If this collection is a view, the input field will still exist on the
underlying dataset but will only contain the labels not present in this
view.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
in_field (str): the name of the input label field
out_field (str): the name of the output label field, which will be created if necessary
"""
if not isinstance(dataset, fod.Dataset):
# The label IDs that we'll need to delete from `in_field`
_, id_path = dataset._get_label_field_path(in_field, "id")
del_ids = dataset.values(id_path, unwind=True)
dataset.merge_samples(
dataset,
key_field="id",
skip_existing=False,
insert_new=False,
fields={in_field: out_field},
merge_lists=True,
overwrite=True,
expand_schema=True,
include_info=False,
)
if isinstance(dataset, fod.Dataset):
dataset.delete_sample_field(in_field)
else:
dataset.delete_labels(ids=del_ids, fields=in_field)
[docs]
def merge_datasets(dataset, others, in_field=None, out_field=None, **kwargs):
"""Merges the given samples into this dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
others: a list of :class:`fiftyone.core.dataset.Dataset`
in_field (str): the name of the input label field
out_field (str): the name of the output label field, which will be created if necessary
**kwargs: optional keyword arguments to pass to `merge_samples() <https://voxel51.com/docs/fiftyone/api/fiftyone.core.dataset.html#fiftyone.core.dataset.Dataset.merge_samples>`
"""
if in_field is not None and out_field is not None:
kwargs["fields"] = {in_field: out_field}
def _key_fcn(sample):
return Path(sample.filepath).name
params = {
"key_field": "filepath",
"key_fcn": _key_fcn,
"skip_existing": False,
"insert_new": True,
"fields": None,
"merge_lists": True,
"overwrite": True,
"expand_schema": True,
"include_info": False,
}
params.update(**kwargs)
for other in others:
dataset.merge_samples(other, **params)