import re
import shutil
import tarfile
from collections import defaultdict
from pathlib import Path
import cv2 as cv
import numpy as np
from prettytable import PrettyTable
import hello.io.utils as hou
import hello.utils.compare as hoc
[docs]
def list_files(filename, level=2):
db = []
with tarfile.open(filename, "r") as tar:
for name in tar.getnames():
if len(Path(name).parts) > level:
continue
if tar.getmember(name).isdir():
db.append(f"{name}/")
else:
db.append(name)
return db
[docs]
def get_readme(filename):
docstr = None
with tarfile.open(filename, "r") as tar:
for name in tar.getnames():
filepath = Path(name)
if filepath.name == "README.md":
with tar.extractfile(name) as f:
docstr = f.read().decode("utf8")
break
return docstr
[docs]
def get_image_names(filename, data_path="data"):
data = []
with tarfile.open(filename, "r") as tar:
for name in tar.getnames():
filepath = Path(name)
if filepath.parent.name == data_path:
data.append(filepath.name)
vals = set(data)
a, b = len(data), len(vals)
if a != b:
print(f"[W] <{filename}>:\n total {a}, unique {b}")
return sorted(vals)
[docs]
def get_image_paths(filename, data_path="data"):
data = []
with tarfile.open(filename, "r") as tar:
for name in tar.getnames():
filepath = Path(name)
if filepath.parent.name == data_path:
data.append(name)
return data
[docs]
def check_files(files, data_path="data"):
data = set()
for filename in files:
names = get_image_names(filename, data_path)
vals = data & set(names)
if vals:
print(f"[W] duplicate {len(vals)}/{len(names)}, <{filename}>")
data.update(names)
print(f"{len(data)} unique images from {len(files)} tars")
return data
[docs]
def compare(file1, file2, data_path="data", verbose=True):
if verbose:
print(f"Compare data:\n a: <{file1}>\n b: <{file2}>")
base_imgs = get_image_paths(file1, data_path)
base_dict = {Path(f).name: f for f in base_imgs}
a, b = len(base_imgs), len(base_dict)
if a != b:
print(f"[W] <{file1}>:\n total {a}, unique {b}")
side_imgs = get_image_paths(file2, data_path)
side_dict = {Path(f).name: f for f in side_imgs}
a, b = len(side_imgs), len(side_dict)
if a != b:
print(f"[W] <{file2}>:\n total {a}, unique {b}")
names = sorted(set(base_dict.keys()) & set(side_dict.keys()))
a, b, c = len(base_dict), len(side_dict), len(names)
if verbose:
print(f" ({a=}) & ({b=}) => intersect:{c}")
eqs = []
tar1 = tarfile.open(file1, "r")
tar2 = tarfile.open(file2, "r")
for name in names:
member = base_dict[name]
with tar1.extractfile(member) as f:
nparr = np.frombuffer(f.read(), np.uint8)
im1 = cv.imdecode(nparr, 1)
member = side_dict[name]
with tar2.extractfile(member) as f:
nparr = np.frombuffer(f.read(), np.uint8)
im2 = cv.imdecode(nparr, 1)
if np.array_equal(im1, im2):
eqs.append(name)
tar1.close()
tar2.close()
if verbose:
print(f" {len(names)=}, {len(eqs)=}")
return names, eqs
[docs]
def prepare_names_file(root, sub_dirs, prefix="check"):
info = {}
root = Path(root)
for sub_dir in sub_dirs:
if isinstance(sub_dir, str):
names = [f.name for f in (root / sub_dir).glob("*.jpg")]
else:
sub_dir, names = sub_dir
assert isinstance(sub_dir, str) and isinstance(names, list)
tag = sub_dir.rstrip("/").replace("/", "-")
names = sorted(names)
count = len(names)
hou.save_json({"count": count, "names": names}, f"{prefix}-{tag}.json")
info[tag] = count
return info
[docs]
def compare_info_py(file1, file2, keys=None, verbose=True):
if verbose:
print(f"Compare info:\n a: <{file1}>\n b: <{file2}>")
if keys is None:
keys = ["classes", "mask_targets"]
base_info = eval(extract_info_py(file1)[0][1])
side_info = eval(extract_info_py(file2)[0][1])
results = {}
for key in keys:
a, b = base_info[key], side_info[key]
if isinstance(a, str) and isinstance(b, str):
result = (a == b)
elif isinstance(a, list) and isinstance(b, list):
result = hoc.equal_list(a, b)
elif isinstance(a, dict) and isinstance(b, dict):
result = hoc.equal_dict(a, b)
else:
result = "Unkown (BadType)"
if verbose:
print(f" <{key}>: {result}")
results[key] = result
return results
[docs]
def parse_dataset_name(filepath):
"""Parse {CAMERA}, {STAGE} and {NAME} from dataset name.
Naming Format: ``{CAMERA_NAME}_{STAGE_NAME}_{YYYYmmdd}_{SUMMARY_KEYWORDS}_ver{000}.tar``.
CAMERA_NAME in ``{novabot_360,novabot_front}``, STAGE_NAME in ``{raw,img,det,seg,dep}``.
>>> parse_dataset_name("novabot_360_img_20230629_chengdu_street_view_pm12_ver001a.tar")
Args:
filepath: path of dataset archive file
"""
_pattern = re.compile(r"(novabot(?:_[a-z0-9]+){1,2})_(img|raw|det|seg|dep)_(\d{8}(?:_[a-z0-9]+){1,9})_ver\d{3}")
_m = _pattern.match(Path(filepath).name)
if _m:
return _m.groups()
return None
[docs]
def tree(root, pattern="*/*.tar"):
groups = defaultdict(list)
for f in Path(root).glob(pattern):
groups[f.parent.as_posix()].append(f)
for group_name in sorted(groups.keys()):
table_data = PrettyTable(["id", "file", "count"])
table_data.align["id"] = "c"
table_data.align["file"] = "l"
table_data.align["count"] = "r"
filepaths = sorted(groups[group_name], key=lambda f: f.stem)
data = {"_".join(f.stem.split("_")[:-1]): f for f in filepaths}
filepaths = sorted(data.values())
total = 0
for index, filepath in enumerate(filepaths, 1):
num = len(get_image_paths(filepath, data_path="data"))
table_data.add_row([f"{index:03d}", filepath.name, num])
total += num
print(f"\n## {group_name}")
print(f"[I] {total=}")
print(table_data)