Source code for hello.fiftyone.coco

import os
import shutil
from pathlib import Path

import cv2 as cv
import fiftyone as fo
import numpy as np

from hello.fiftyone.coco_utils import (get_mask_from_patch,
                                       mask_to_coco_segmentation)
from hello.fiftyone.core import count_values, save_tags
from hello.fiftyone.dataset import (add_detection_labels, add_images_dir,
                                    tmpl_info)
from hello.io.utils import save_json


[docs] class CocoDataset(object): def __init__(self, name, classes=[], mask_targets={}): dataset = fo.Dataset(name, persistent=True, overwrite=True) info = { "dataset_name": name, "dataset_type": "detection", "version": "001", "classes": classes, "mask_targets": mask_targets, "num_samples": [], "tail": {}, } dataset.info = info dataset.default_classes = classes dataset.default_mask_targets = mask_targets dataset.save() self.dataset = dataset
[docs] def add_sample(self, filepath, annotations, frame_size): """Adds a sample to the dataset. Args: filepath: the path to the image on disk. annotations: a list of ``(label, bbox, mask, confidence, iscrowd)`` tuple frame_size: the ``(width, height)`` of the image. """ sample = fo.Sample(filepath=filepath) width, height = frame_size detections = [] for label, bbox, mask, confidence, iscrowd in annotations: x, y, w, h = bbox bounding_box = [x / width, y / height, w / width, h / height] mask = mask[int(round(y)):int(round(y + h)), int(round(x)):int(round(x + w))] detections.append(fo.Detection( label=label, bounding_box=bounding_box, mask=mask.astype(bool), confidence=confidence, iscrowd=iscrowd, )) sample["ground_truth"] = fo.Detections(detections=detections) self.dataset.add_sample(sample)
[docs] def coco_add_samples(dataset, dataset_dir=None, data_path=None, labels_path=None, label_field=None, splits=None, tags=None): if dataset_dir is None and data_path is None and labels_path is None: raise ValueError( "At least one of `dataset_dir`, `data_path`, and " "`labels_path` must be provided" ) dataset_dirs = [dataset_dir] if dataset_dir is not None and splits is not None: dataset_dir = Path(dataset_dir) if splits == "auto": dataset_dirs = [f for f in dataset_dir.glob("*") if f.is_dir()] tags = tags or [f.name for f in dataset_dirs] elif isinstance(splits, list): dataset_dirs = [dataset_dir / split for split in splits] tags = tags or splits else: raise ValueError(f"Not supported `{splits=}`") label_field = label_field or "ground_truth" if tags is None or isinstance(tags, str): tags = [tags] * len(dataset_dirs) assert isinstance(tags, list) and len(dataset_dirs) == len(tags) for dataset_dir, tag in zip(dataset_dirs, tags): images_dir = parse_data_path(dataset_dir, data_path, "data/") if images_dir is not None: add_images_dir(dataset, images_dir, tag, recursive=False) coco_json = parse_labels_path(dataset_dir, labels_path, "labels.json") if coco_json is not None: add_detection_labels(dataset, label_field, coco_json, mode="coco")
[docs] def coco_export(export_dir, dataset, label_field, splits=None, **kwargs): export_dir = Path(export_dir) shutil.rmtree(export_dir, ignore_errors=True) dataset.save() dataset = dataset.clone() info = dataset.info info["classes"] = dataset.default_classes info["mask_targets"] = dataset.default_mask_targets info["num_samples"] = count_values(dataset, "tags", "label") _tags = set(dataset.distinct("tags")) if splits is None: splits = ["train", "val", "test"] elif splits == "auto": splits = sorted(_tags) assert isinstance(splits, list) splits = [s for s in splits if s in _tags] if not splits: splits = ["train"] dataset.tag_samples(splits) for split in splits: print(f"\n[{split}]\n") view = dataset.match_tags(split) curr_dir = export_dir / split (curr_dir / "data/").mkdir(parents=True, exist_ok=True) info["tail"].update(count_label=count_values(view, f"{label_field}.detections.label", "label")) coco_export_info(info, curr_dir / "info.py") coco_export_images(view, curr_dir / "data/") coco_export_labels(view, label_field, curr_dir / "labels.json", **kwargs) coco_export_semantic_labels(view, label_field, curr_dir / "labels", **kwargs) save_tags(dataset, export_dir / "tags.json") with open(export_dir / "README.md", "w") as f: f.write("# README\n\n## Data Processing\n\n**from:**\n\n") return str(export_dir)
[docs] def coco_export_info(info, info_path): info_py = tmpl_info.safe_substitute(info) with open(info_path, "w") as f: f.write(info_py)
[docs] def coco_export_images(dataset_or_view, data_path): data_path = Path(data_path) for sample in dataset_or_view: filepath = Path(sample.filepath) shutil.copyfile(filepath, data_path / filepath.name)
[docs] def coco_export_labels(dataset_or_view, label_field, labels_path, **kwargs): mask_type, tolerance = kwargs.get("mask_type", "polygons"), kwargs.get("tolerance", 1) assert mask_type in ("polygons", "rle", "rle-uncompressed", "rle-compressed") cats, idx = [], 1 for name in dataset_or_view.default_classes: cats.append({"id": idx, "name": name, "supercategory": "root"}) idx += 1 imgs, idx = [], 1 for filepath, width, height in zip(*dataset_or_view.values(["filepath", "metadata.width", "metadata.height"])): imgs.append({"id": idx, "file_name": Path(filepath).name, "width": width, "height": height}) idx += 1 anns, idx = [], 1 cat_info = {cat["name"]: cat for cat in cats} img_info = {img["file_name"]: img for img in imgs} for filepath, detections in zip(*dataset_or_view.values(["filepath", f"{label_field}.detections"])): img = img_info[Path(filepath).name] image_id, width, height = img["id"], img["width"], img["height"] for detection in detections: category_id = cat_info[detection.label]["id"] x, y, w, h = detection.bounding_box bbox = [x * width, y * height, w * width, h * height] segmentation = None if hasattr(detection, "mask") and detection.mask is not None: segmentation = mask_to_coco_segmentation(detection.mask, bbox, (width, height), mask_type, tolerance) score = detection.confidence if hasattr(detection, "confidence") else 1.0 area = bbox[2] * bbox[3] iscrowd = 1 if hasattr(detection, "iscrowd") and detection.iscrowd else 0 anns.append({"id": idx, "image_id": image_id, "category_id": category_id, "bbox": bbox, "segmentation": segmentation, "score": score, "area": area, "iscrowd": iscrowd}) idx += 1 return save_json({"categories": cats, "images": imgs, "annotations": anns}, labels_path)
[docs] def coco_export_semantic_labels(dataset_or_view, label_field, labels_path, **kwargs): unlabeled_index = kwargs.get("unlabeled_index", 0) if not kwargs.get("to_segmentations", False): return labels_path.mkdir(parents=True, exist_ok=True) label2index = {label: index for index, label in dataset_or_view.default_mask_targets.items()} stem2scale = {} for filepath, width, height in zip(*dataset_or_view.values(["filepath", "metadata.width", "metadata.height"])): stem2scale[Path(filepath).stem] = (width, height) for filepath, detections in zip(*dataset_or_view.values(["filepath", f"{label_field}.detections"])): detections = sorted(detections, key=lambda obj: obj.bounding_box[2] * obj.bounding_box[3], reverse=True) filestem = Path(filepath).stem width, height = stem2scale[filestem] _mask = np.full((height, width), unlabeled_index, dtype="uint8") for detection in detections: index = label2index[detection.label] x, y, w, h = detection.bounding_box bbox = [x * width, y * height, w * width, h * height] if hasattr(detection, "mask") and detection.mask is not None: _mask[get_mask_from_patch(detection.mask, bbox, (width, height))] = index cv.imwrite(str(labels_path / f"{filestem}.png"), _mask)
[docs] def parse_data_path(dataset_dir=None, data_path=None, default=None): if data_path is None: if dataset_dir is not None: data_path = default if data_path is not None: data_path = os.path.expanduser(data_path) if not os.path.isabs(data_path) and dataset_dir is not None: dataset_dir = os.path.abspath(dataset_dir) data_path = os.path.join(dataset_dir, data_path) else: data_path = os.path.abspath(data_path) return data_path
[docs] def parse_labels_path(dataset_dir=None, labels_path=None, default=None): if labels_path is None: if dataset_dir is not None: labels_path = default if labels_path is not None: labels_path = os.path.expanduser(labels_path) if not os.path.isabs(labels_path) and dataset_dir is not None: dataset_dir = os.path.abspath(dataset_dir) labels_path = os.path.join(dataset_dir, labels_path) else: labels_path = os.path.abspath(labels_path) return labels_path