import json
import re
import shutil
from pathlib import Path
from string import Template
import fiftyone as fo
import fiftyone.core.labels as fol
import fiftyone.core.utils as fou
import fiftyone.utils.coco as fouc
import fiftyone.utils.iou as foui
import fiftyone.utils.yolo as fouy
from fiftyone.utils.labels import segmentations_to_detections
import hello.fiftyone.core as hoc
import hello.fiftyone.utils as hou
tmpl_info = """\
info = {
'dataset_name': '$dataset_name',
'dataset_type': '$dataset_type',
'version': '$version',
'classes': $classes,
'mask_targets': $mask_targets,
'num_samples': $num_samples,
'tail': $tail,
}
"""
tmpl_info = Template(tmpl_info)
[docs]
def add_classification_labels(dataset, label_field, labels_path):
# https://voxel51.com/docs/fiftyone/user_guide/export_datasets.html#fiftyoneimageclassificationdataset-export
assert Path(labels_path).suffix == ".json"
with open(labels_path, "r") as f:
data = json.load(f)
assert "classes" in data and "labels" in data
db = {}
label_type = None
for k, v in data["labels"].items():
if isinstance(v, list):
if label_type is None:
label_type = "classifications"
assert label_type == "classifications"
classifications = [fol.Classification(**vi) for vi in v]
db[k] = fol.Classifications(classifications=classifications)
else:
if label_type is None:
label_type = "classification"
assert label_type == "classification"
db[k] = fol.Classification(**v)
filepaths, ids = dataset.values(["filepath", "id"])
id_map = {Path(k).stem: v for k, v in zip(filepaths, ids)}
stems_adds = set(db.keys())
stems_base = set(id_map.keys())
bad_stems = stems_adds - stems_base
if bad_stems:
print(f"<{labels_path}>\n Ignoring {len(bad_stems)} nonexistent images (eg {list(bad_stems)[:3]})")
stems = sorted(stems_adds & stems_base)
matched_ids = [id_map[stem] for stem in stems]
view = dataset.select(matched_ids, ordered=True)
labels = [db[stem] for stem in stems]
view.set_values(label_field, labels)
print(f"update {len(labels)=}")
[docs]
def add_coco_labels(dataset, label_field, labels_path, label_type="detections"):
# https://voxel51.com/docs/fiftyone/api/fiftyone.utils.coco.html#fiftyone.utils.coco.add_coco_labels
assert label_type in {"detections", "segmentations", "keypoints"}
assert Path(labels_path).suffix == ".json"
with open(labels_path, "r") as f:
coco = json.load(f)
assert "categories" in coco and "images" in coco and "annotations" in coco
classes = [cat["name"] for cat in coco["categories"]]
db = {Path(img["file_name"]).stem: img["id"] for img in coco["images"]}
coco_ids = [db.get(Path(filepath).stem, -1) for filepath in dataset.values("filepath")]
coco_id_field = "coco_id"
dataset.set_values(coco_id_field, coco_ids)
fouc.add_coco_labels(
dataset,
label_field,
coco["annotations"],
classes,
label_type=label_type,
coco_id_field=coco_id_field,
)
[docs]
def add_yolo_labels(dataset, label_field, labels_path, classes):
# https://voxel51.com/docs/fiftyone/api/fiftyone.utils.yolo.html#fiftyone.utils.yolo.add_yolo_labels
assert isinstance(classes, list)
fouy.add_yolo_labels(
dataset,
label_field,
labels_path,
classes,
)
[docs]
def add_detection_labels(dataset, label_field, labels_path, classes=None, mode="text", remove_prefix=False):
"""Adds detection labels to the dataset.
.. note::
if ``mode=text``, a text row corresponds to a sample prediction result.
row format: ``filepath,height,width,x1,y1,x2,y2,s,l,x1,y1,x2,y2,s,l``.
if ``mode=yolo``, a txt file corresponds to a sample prediction result.
row format: ``target,xc,yc,w,h,s``.
if ``mode=coco``, a standard COCO format json file.
from https://cocodataset.org/#format-data.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
label_field (str): the label field in which to store the labels
labels_path (str): the labels load from
classes (list): the list of class label strings
mode (str): supported values are ``("text", "yolo", "coco")``
"""
assert mode in {"text", "yolo", "coco"}
dataset_classes = dataset.default_classes
assert classes is None or isinstance(classes, list)
included_labels = set(dataset_classes)
filepaths, ids = dataset.values(["filepath", "id"])
id_map = {Path(k).stem: v for k, v in zip(filepaths, ids)}
db = hou.load_predictions(labels_path, classes=classes, mode=mode, remove_prefix=remove_prefix)
stems_adds = set(db.keys())
stems_base = set(id_map.keys())
bad_stems = stems_adds - stems_base
if bad_stems:
print(f"<{labels_path}>\n Ignoring {len(bad_stems)} nonexistent images (eg {list(bad_stems)[:3]})")
stems = sorted(stems_adds & stems_base)
matched_ids = [id_map[stem] for stem in stems]
view = dataset.select(matched_ids, ordered=True)
labels = []
for stem in stems:
detections = [fol.Detection(**detection) for detection in db[stem]
if detection["label"] in included_labels]
labels.append(fol.Detections(detections=detections))
view.set_values(label_field, labels)
print(f"update {len(labels)=}")
[docs]
def add_segmentation_labels(dataset, label_field, labels_path, mask_targets="auto", mode="png"):
"""Adds segmentation labels to the dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
label_field (str): the label field in which to store the labels
labels_path (str): the labels load from
mask_targets (dict): a dict mapping pixel values to semantic label strings
mode (str): supported values are ``("png", "coco")``
"""
assert mode in {"png", "coco"}
dataset_mask_targets = dataset.default_mask_targets
if mask_targets == "auto":
info_py = Path(labels_path).with_name("info.py")
with open(info_py, "r") as f:
codestr = f.read()
info = eval(re.split(r"info\s*=\s*", codestr)[1])
mask_targets = info["mask_targets"]
assert isinstance(mask_targets, dict)
remap = hou.gen_mask_remap(dataset_mask_targets, mask_targets)
filepaths, ids = dataset.values(["filepath", "id"])
id_map = {Path(k).stem: v for k, v in zip(filepaths, ids)}
db = hou.load_segmentation_masks(labels_path, remap, mode)
stems_adds = set(db.keys())
stems_base = set(id_map.keys())
bad_stems = stems_adds - stems_base
if bad_stems:
print(f"<{labels_path}>\n Ignoring {len(bad_stems)} nonexistent images (eg {list(bad_stems)[:3]})")
stems = sorted(stems_adds & stems_base)
matched_ids = [id_map[stem] for stem in stems]
view = dataset.select(matched_ids, ordered=True)
labels = []
for stem in stems:
mask = db[stem]
labels.append(fol.Segmentation(mask=mask))
view.set_values(label_field, labels)
print(f"update {len(labels)=}")
[docs]
def add_images_dir(dataset, images_dir, tags=None, recursive=True):
"""Adds the given directory of images to the dataset.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
images_dir (str): a directory of images
tags (None): an optional tag or iterable of tags to attach to each sample
recursive (True): whether to recursively traverse subdirectories
"""
# https://voxel51.com/docs/fiftyone/api/fiftyone.core.dataset.html#fiftyone.core.dataset.Dataset.add_images_dir
if not recursive:
image_paths = [str(f) for f in Path(images_dir).glob("*.jpg")]
else:
image_paths = [str(f) for f in Path(images_dir).glob("**/*.jpg")]
stems_base = set([Path(filepath).stem for filepath in dataset.values("filepath")])
stems_adds = set([Path(filepath).stem for filepath in image_paths])
bad_stems = stems_base & stems_adds
if bad_stems:
print(f"<{images_dir}>\n Ignoring {len(bad_stems)} existing images (eg {list(bad_stems)[:3]})")
image_paths = sorted([filepath for filepath in image_paths
if Path(filepath).stem not in bad_stems])
dataset.add_images(image_paths, tags=tags)
# Populate the `metadata` field
dataset.compute_metadata()
[docs]
def delete_duplicate_images(dataset):
"""Delete duplicate images.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
"""
filepaths, ids = dataset.values(["filepath", "id"])
unique_ids = []
filehash_set = set()
for k, v in zip(filepaths, ids):
filehash = fou.compute_filehash(k)
if filehash not in filehash_set:
filehash_set.add(filehash)
unique_ids.append(v)
dup_ids = set(ids) - set(unique_ids)
if dup_ids:
print(f"Delete {len(dup_ids)} duplicate images (eg {list(dup_ids)[:3]})")
dataset.delete_samples(dup_ids)
[docs]
def delete_duplicate_labels(dataset, label_field, iou_thresh=0.999, method="simple", iscrowd=None, classwise=True):
"""Delete duplicate labels in the given field of the dataset, as defined as labels with an IoU greater than a chosen threshold with another label in the field.
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
label_field: a label field of type :class:`fiftyone.core.labels.Detections` or :class:`fiftyone.core.labels.Polylines`
iou_thresh (0.999): the IoU threshold to use to determine whether labels are duplicates
method ("simple"): supported values are ``("simple", "greedy")``
iscrowd (None): an optional name of a boolean attribute
classwise (True): different label values as always non-overlapping
"""
dup_ids = foui.find_duplicates(dataset, label_field, iou_thresh=iou_thresh, method=method, iscrowd=iscrowd, classwise=classwise)
if dup_ids:
print(f"Delete {len(dup_ids)} duplicate labels (eg {list(dup_ids)[:3]})")
dataset.delete_labels(ids=dup_ids, fields=label_field)
[docs]
def add_dataset_dir(dataset_dir, data_path=None, labels_path=None, label_field=None, tags=None):
# https://voxel51.com/docs/fiftyone/api/fiftyone.core.dataset.html#fiftyone.core.dataset.Dataset.add_dir
raise NotImplementedError
[docs]
def add_dataset(dataset, skip_existing=True, insert_new=True, fields=None, expand_schema=True):
# https://voxel51.com/docs/fiftyone/api/fiftyone.core.dataset.html#fiftyone.core.dataset.Dataset.merge_samples
raise NotImplementedError
[docs]
def create_dataset(dataset_name, dataset_type, version="001", classes=[], mask_targets={}, force=False):
"""Create an empty :class:`fiftyone.core.dataset.Dataset` with the name.
Args:
dataset_name (str): a name for the dataset
dataset_type (str): supported values are ``("detection", "segmentation", "unknown")``
classes (list, optional): defaults to ``[]``
mask_targets (dict, optional): defaults to ``{}``
Returns:
a :class:`fiftyone.core.dataset.Dataset`
"""
assert dataset_type in {"detection", "segmentation", "unknown"}
if fo.dataset_exists(dataset_name):
assert force, "the dataset name already exists!"
fo.delete_dataset(dataset_name, verbose=True)
dataset = fo.Dataset()
dataset.name = dataset_name
dataset.persistent = True
info = {
"dataset_name": dataset_name,
"dataset_type": dataset_type,
"version": version,
"classes": classes,
"mask_targets": mask_targets,
"num_samples": [],
"tail": {},
}
dataset.default_classes = info.pop("classes", [])
dataset.default_mask_targets = info.pop("mask_targets", {})
dataset.info = info
dataset.save()
return dataset
[docs]
def load_images_dir(dataset_dir, dataset_name, dataset_type, version="001", classes=[], mask_targets={}):
"""Create a :class:`fiftyone.core.dataset.Dataset` from the given directory of images.
Args:
dataset_dir (str): a directory of images
dataset_name (str): a name for the dataset
dataset_type (str): supported values are ``("detection", "segmentation", "unknown")``
classes (list, optional): defaults to ``[]``
mask_targets (dict, optional): defaults to ``{}``
Returns:
a :class:`fiftyone.core.dataset.Dataset`
"""
assert dataset_type in {"detection", "segmentation", "unknown"}
dataset = fo.Dataset.from_images_dir(dataset_dir)
dataset.name = dataset_name
dataset.persistent = True
info = {
"dataset_name": dataset_name,
"dataset_type": dataset_type,
"version": version,
"classes": classes,
"mask_targets": mask_targets,
"num_samples": [],
"tail": {},
}
dataset.default_classes = info.pop("classes", [])
dataset.default_mask_targets = info.pop("mask_targets", {})
dataset.info = info
dataset.save()
return dataset
[docs]
def list_datasets():
return fo.list_datasets()
[docs]
def delete_datasets(names=None, non_persistent=False, force=False):
names, has_names = set(names or []), set(fo.list_datasets())
if "*" in names:
names = set(has_names)
keep_names = set([name for name in names if name.startswith("keep_")])
if keep_names and not force:
print(f"Ignoring {len(keep_names)} kept datasets (eg {list(keep_names)[:3]})")
names = names - keep_names
bad_names = names - has_names
if bad_names:
print(f"Ignoring {len(bad_names)} nonexistent datasets (eg {list(bad_names)[:3]})")
names = names - bad_names
for name in sorted(names):
fo.delete_dataset(name, verbose=True)
if non_persistent:
fo.delete_non_persistent_datasets(verbose=True)
[docs]
def load_dataset(name):
"""Loads the FiftyOne dataset with the given name.
Args:
name (str): the name of the dataset
"""
return fo.load_dataset(name)
[docs]
def export_image_dataset(export_dir, dataset, splits=None):
shutil.rmtree(export_dir, ignore_errors=True)
_tags = set(dataset.distinct("tags"))
if splits is None:
splits = ["train", "val", "test"]
elif splits == "auto":
splits = sorted(_tags)
assert isinstance(splits, list)
splits = [s for s in splits if s in _tags]
if not splits:
splits = ["train"]
dataset.tag_samples(splits)
for split in splits:
print(f"\n[{split}]\n")
view = dataset.match_tags(split)
curr_dir = Path(export_dir) / split
view.export(
export_dir=str(curr_dir / "data"),
dataset_type=fo.types.ImageDirectory,
)
with open(curr_dir / "README.md", "w") as f:
f.write("# README\n\n## Data Processing\n\n**from:**\n\n")
hoc.save_tags(dataset, Path(export_dir) / "tags.json")
with open(Path(export_dir) / "README.md", "w") as f:
f.write("# README\n\n## Data Processing\n\n**from:**\n\n")
return export_dir
[docs]
def export_classification_labels(export_dir, dataset, label_field, splits=None):
shutil.rmtree(export_dir, ignore_errors=True)
_tags = set(dataset.distinct("tags"))
if splits is None:
splits = ["train", "val", "test"]
elif splits == "auto":
splits = sorted(_tags)
assert isinstance(splits, list)
splits = [s for s in splits if s in _tags]
if not splits:
splits = ["train"]
dataset.tag_samples(splits)
for split in splits:
print(f"\n[{split}]\n")
view = dataset.match_tags(split)
curr_dir = Path(export_dir) / split
view.export(
dataset_type=fo.types.FiftyOneImageClassificationDataset,
labels_path=str(curr_dir / "labels.json"),
label_field=label_field,
include_confidence=True,
)
with open(curr_dir / "README.md", "w") as f:
f.write("# README\n\n## Data Processing\n\n**from:**\n\n")
hoc.save_tags(dataset, Path(export_dir) / "tags.json")
with open(Path(export_dir) / "README.md", "w") as f:
f.write("# README\n\n## Data Processing\n\n**from:**\n\n")
return export_dir
[docs]
def export_classification_dataset(export_dir, dataset, label_field, splits=None, export_media=True):
shutil.rmtree(export_dir, ignore_errors=True)
_tags = set(dataset.distinct("tags"))
if splits is None:
splits = ["train", "val", "test"]
elif splits == "auto":
splits = sorted(_tags)
assert isinstance(splits, list)
splits = [s for s in splits if s in _tags]
if not splits:
splits = ["train"]
dataset.tag_samples(splits)
for split in splits:
print(f"\n[{split}]\n")
view = dataset.match_tags(split)
curr_dir = Path(export_dir) / split
view.export(
export_dir=str(curr_dir),
dataset_type=fo.types.FiftyOneImageClassificationDataset,
export_media=export_media,
label_field=label_field,
include_confidence=True,
)
with open(curr_dir / "README.md", "w") as f:
f.write("# README\n\n## Data Processing\n\n**from:**\n\n")
hoc.save_tags(dataset, Path(export_dir) / "tags.json")
with open(Path(export_dir) / "README.md", "w") as f:
f.write("# README\n\n## Data Processing\n\n**from:**\n\n")
return export_dir
[docs]
def export_detection_dataset(export_dir, dataset, label_field, splits=None):
return export_dataset(export_dir, dataset, label_field=label_field, splits=splits)
[docs]
def export_segmentation_dataset(export_dir, dataset, label_field, mask_types="stuff", splits=None):
return export_dataset(export_dir, dataset, mask_label_field=label_field, mask_types=mask_types, splits=splits)
[docs]
def export_dataset(export_dir, dataset, label_field=None, mask_label_field=None, mask_types="stuff", splits=None):
"""Exports the samples in the collection to disk.
Args:
export_dir: the directory to which to export the samples
dataset: a :class:`fiftyone.core.collections.SampleCollection`
label_field: controls the label field(s) to export
mask_label_field: controls the label field(s) to export
mask_types ("stuff"): "stuff"(amorphous regions of pixels), "thing"(connected regions, each representing an instance)
splits (None): a list of strings, respectively, specifying the splits to load. If "auto" will computes the distinct tags
"""
assert label_field is not None or mask_label_field is not None
shutil.rmtree(export_dir, ignore_errors=True)
dataset.save()
info = dataset.info
classes = dataset.default_classes
mask_targets = dataset.default_mask_targets
info["num_samples"] = hoc.count_values(dataset, "tags")
if label_field is None:
label_field = "detections"
print("todo: segmentations_to_detections()")
dataset = dataset.select_fields(mask_label_field).clone()
segmentations_to_detections(dataset, mask_label_field, label_field, mask_targets=dataset.default_mask_targets, mask_types=mask_types)
else:
dataset = dataset.clone()
_tags = set(dataset.distinct("tags"))
if splits is None:
splits = ["train", "val", "test"]
elif splits == "auto":
splits = sorted(_tags)
assert isinstance(splits, list)
splits = [s for s in splits if s in _tags]
if not splits:
splits = ["train"]
dataset.tag_samples(splits)
for split in splits:
print(f"\n[{split}]\n")
view = dataset.match_tags(split)
curr_dir = Path(export_dir) / split
view.export(
export_dir=str(curr_dir),
dataset_type=fo.types.COCODetectionDataset,
label_field=label_field,
classes=classes,
)
if mask_label_field is not None:
view.export(
dataset_type=fo.types.ImageSegmentationDirectory,
labels_path=str(curr_dir / "labels"),
label_field=mask_label_field,
mask_targets=mask_targets,
)
info["tail"].update(count_label=hoc.count_values(view, f"{label_field}.detections.label"))
info_py = tmpl_info.safe_substitute(info,
classes=classes,
mask_targets=mask_targets)
with open(curr_dir / "info.py", "w") as f:
f.write(info_py)
hoc.save_tags(dataset, Path(export_dir) / "tags.json")
with open(Path(export_dir) / "README.md", "w") as f:
f.write("# README\n\n## Data Processing\n\n**from:**\n\n")
return export_dir