import json
from collections import defaultdict
from pathlib import Path
import cv2 as cv
import fiftyone.core.utils as fou
import fiftyone.utils.iou as foui
from hello.fiftyone.coco_utils import coco_segmentation_to_mask
from hello.utils.compare import equal_dict
def _parse_text_slice(vals):
assert len(vals) == 6
x1, y1, x2, y2 = vals[:4]
bounding_box = [
float(x1),
float(y1),
float(x2) - float(x1),
float(y2) - float(y1),
]
confidence = float(vals[4])
label = vals[5]
return bounding_box, confidence, label
def _parse_text_row(row):
"""\
row format:
``filepath,height,width,x1,y1,x2,y2,s,l,x1,y1,x2,y2,s,l``
"""
row_vals = row.split(",")
assert len(row_vals) >= 3, "filepath,height,width,..."
filepath = Path(row_vals[0])
height = int(row_vals[1])
width = int(row_vals[2])
data = row_vals[3:]
group_size = 6
total_size = len(data)
assert total_size % group_size == 0
detections = []
_scale = [width, height, width, height]
for i in range(0, total_size, group_size):
bounding_box, confidence, label = _parse_text_slice(data[i:(i + group_size)])
bounding_box = [a / b for a, b in zip(bounding_box, _scale)]
detection = dict(
label=label,
bounding_box=bounding_box,
confidence=confidence,
)
detections.append(detection)
return filepath, detections
def _parse_yolo_row(row, classes):
row_vals = row.split()
target, xc, yc, w, h = row_vals[:5]
try:
label = classes[int(target)]
except:
label = str(target)
bounding_box = [
(float(xc) - 0.5 * float(w)),
(float(yc) - 0.5 * float(h)),
float(w),
float(h),
]
if len(row_vals) > 5:
confidence = float(row_vals[5])
else:
confidence = 1.0
detection = dict(
label=label,
bounding_box=bounding_box,
confidence=confidence,
)
return detection
def _parse_yolo_annotations(filepath, classes):
"""\
row format:
``target,xc,yc,w,h,s``
"""
with open(filepath, "r") as f:
lines = [l.strip() for l in f.read().splitlines()]
lines = [l for l in lines if l and not l.startswith("#")]
detections = []
for row in lines:
detections.append(_parse_yolo_row(row, classes))
return detections
[docs]
def load_text_predictions(labels_path):
with open(labels_path, "r") as f:
lines = [l.strip() for l in f.read().splitlines()]
lines = [l for l in lines if l and not l.startswith("#")]
db = {}
for row in lines:
filepath, detections = _parse_text_row(row)
db[filepath.stem] = detections
return db
[docs]
def load_yolo_predictions(labels_path, classes):
db = {}
for filepath in Path(labels_path).glob("*.txt"):
detections = _parse_yolo_annotations(filepath, classes)
db[filepath.stem] = detections
return db
[docs]
def load_coco_predictions(labels_path, remove_prefix=False):
with open(labels_path, "r") as f:
coco = json.load(f)
assert "categories" in coco and "images" in coco and "annotations" in coco
if remove_prefix:
for img in coco["images"]:
img["file_name"] = Path(img["file_name"]).name.split("_", maxsplit=1)[1]
imgs = {img["id"]: img for img in coco["images"]}
cats = {cat["id"]: cat for cat in coco["categories"]}
skip_existing = set(["id", "image_id", "category_id", "bbox", "segmentation", "score", "label", "bounding_box", "mask", "confidence", "attributes"])
db = defaultdict(list)
for ann in coco["annotations"]:
_img = imgs[ann["image_id"]]
filepath = _img["file_name"]
height = _img["height"]
width = _img["width"]
_cat = cats[ann["category_id"]]
label = _cat["name"]
_bbox = ann["bbox"]
x, y, w, h = _bbox
bounding_box = [x / width, y / height, w / width, h / height]
mask = None
if "segmentation" in ann:
segmentation = ann["segmentation"]
if segmentation:
frame_size = (width, height)
mask = coco_segmentation_to_mask(segmentation, _bbox, frame_size)
confidence = 1.0
if "score" in ann:
confidence = ann["score"]
elif "confidence" in ann:
confidence = ann["confidence"]
attributes = {}
for key in ann.keys():
if key not in skip_existing:
attributes[key] = ann[key]
detection = dict(
label=label,
bounding_box=bounding_box,
mask=mask,
confidence=confidence,
**attributes,
)
db[Path(filepath).stem].append(detection)
stems = [Path(_img["file_name"]).stem for _img in imgs.values()]
return {stem: db[stem] for stem in stems}
[docs]
def load_predictions(labels_path, classes=None, mode="text", remove_prefix=False):
if mode == "text":
return load_text_predictions(labels_path)
elif mode == "yolo":
assert isinstance(classes, list)
return load_yolo_predictions(labels_path, classes)
elif mode == "coco":
return load_coco_predictions(labels_path, remove_prefix)
else:
raise NotImplementedError
[docs]
def find_duplicate_labels(dataset, label_field, iou_thresh=0.999, method="simple", iscrowd=None, classwise=False):
"""Returns IDs of duplicate labels in the given field of the dataset, as defined as labels with an IoU greater than a chosen threshold with another label in the field.
>>> dup_ids = find_duplicates()
>>> dataset.untag_labels("duplicate")
>>> dataset.select_labels(ids=dup_ids).tag_labels("duplicate")
>>> print(dataset.count_label_tags())
>>> dataset.delete_labels(tags="duplicate")
>>> # dataset.delete_labels(ids=dup_ids) <- best
Args:
dataset: a :class:`fiftyone.core.dataset.Dataset`
label_field: a label field of type :class:`fiftyone.core.labels.Detections` or :class:`fiftyone.core.labels.Polylines`
iou_thresh (0.999): the IoU threshold to use to determine whether labels are duplicates
method ("simple"): supported values are ``("simple", "greedy")``
iscrowd (None): an optional name of a boolean attribute
classwise (False): different label values as always non-overlapping
"""
dup_ids = foui.find_duplicates(dataset, label_field, iou_thresh=iou_thresh, method=method, iscrowd=iscrowd, classwise=classwise)
return dup_ids
[docs]
def find_duplicate_images(filepaths, leave_one_out=False):
db = []
for filepath in filepaths:
filehash = fou.compute_filehash(filepath)
db.append([filepath, filehash])
groups = defaultdict(list)
for filepath, filehash in db:
groups[filehash].append([filepath, filehash])
results = []
for vals in groups.values():
if len(vals) > 1:
if leave_one_out:
results.extend(vals[1:])
else:
results.extend(vals)
return results
[docs]
def read_mask(mask_path, remap=None):
mask = cv.imread(mask_path, 0)
if remap is not None:
new_mask = mask.copy()
for _old, _new in remap.items():
if _old != _new:
new_mask[mask == _old] = _new
mask = new_mask
return mask
[docs]
def gen_mask_remap(dataset_mask_targets, label_mask_targets, ignore_index=255):
if equal_dict(dataset_mask_targets, label_mask_targets):
return None
label2index = {label: index for index, label in dataset_mask_targets.items()}
remap = {}
for index, label in label_mask_targets.items():
new_index = label2index.get(label, ignore_index)
if index != new_index:
remap[index] = new_index
remap = remap if remap else None
return remap
[docs]
def load_segmentation_masks(labels_path, remap=None, mode="png"):
labels_path = Path(labels_path)
if mode == "png":
assert labels_path.is_dir()
else:
raise NotImplementedError
db = {}
for mask_path in sorted(labels_path.glob("*.png")):
mask = read_mask(str(mask_path), remap)
db[mask_path.stem] = mask
return db