Source code for hello.fiftyone.unique

import shutil
import sys
from pathlib import Path

import fiftyone as fo
import fiftyone.brain as fob


[docs]def best_group_size(n_total, group_size): if n_total <= group_size: return n_total a, b = divmod(n_total, group_size) c, d = divmod(b, a) group_size = group_size + c if d > 0: group_size = group_size + 1 return group_size
[docs]def find_unique(export_dir, dataset_dir, count=1, model=None): # model: 'mobilenet-v2-imagenet-torch' # model: 'resnet50-imagenet-torch', 'resnet101-imagenet-torch', 'resnet152-imagenet-torch' # model: 'resnext50-32x4d-imagenet-torch', 'resnext101-32x8d-imagenet-torch' dataset = fo.Dataset.from_dir( dataset_dir=dataset_dir, dataset_type=fo.types.ImageDirectory, ) results = fob.compute_similarity(dataset, brain_key="img_sim", model=model) results.find_unique(count) unique_ids = results.unique_ids unique_view = dataset.select(unique_ids) shutil.rmtree(export_dir, ignore_errors=True) unique_view.export( export_dir=(Path(export_dir) / "data").as_posix(), dataset_type=fo.types.ImageDirectory, ) return len(unique_view), len(dataset)
[docs]def find_unique2(export_dir, dataset_dir, count=1, model=None, group_size=1000): dataset = fo.Dataset.from_images_dir(dataset_dir) sorted_view = dataset.sort_by("filepath") n_total = len(sorted_view) group_size = best_group_size(n_total, group_size) print(f"[INFO] total: {n_total}, group size: {group_size}") unique_ids = set() for skip in range(0, n_total, group_size): sliced_sorted_view = sorted_view.skip(skip).limit(group_size) results = fob.compute_similarity(sliced_sorted_view, brain_key="img_sim", model=model) results.find_unique(count) unique_ids.update(results.unique_ids) unique_view = dataset.select(unique_ids) shutil.rmtree(export_dir, ignore_errors=True) unique_view.export( export_dir=(Path(export_dir) / "data").as_posix(), dataset_type=fo.types.ImageDirectory, ) return len(unique_view), len(dataset)
[docs]def func(export_dir, dataset_dir, function, count, model, group_size): if function == "unique": if group_size is None: n_unique, n_total = find_unique(export_dir, dataset_dir, count, model) print(f"kept: {n_unique}, total: {n_total}") else: n_unique, n_total = find_unique2(export_dir, dataset_dir, count, model, group_size) print(f"kept: {n_unique}, total: {n_total}") elif function == "duplicate": raise NotImplementedError else: raise NotImplementedError return "\n[END]"
[docs]def parse_args(args=None): from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument("export_dir", type=str, help="a directory") parser.add_argument("dataset_dir", type=str, help="the dataset directory") parser.add_argument("-f", dest="function", type=str, default="unique", choices=["unique", "duplicate"]) parser.add_argument("-n", dest="count", type=int, default=1, help="the desired number of unique examples") parser.add_argument("-m", dest="model", type=str, default="resnet50-imagenet-torch", help="a fiftyone.core.models.Model or the name") parser.add_argument("-g", dest="group_size", type=int, default=None, help="compute similarity by group") args = parser.parse_args(args=args) return vars(args)
[docs]def main(args=None): kwargs = parse_args(args) print(f"{__file__}: {kwargs}") print(func(**kwargs)) return 0
if __name__ == "__main__": sys.exit(main())