Source code for hello.video.frames

# pip install opencv-python
# pip install pyomniunwarp>=0.2.4
# -i https://pypi.tuna.tsinghua.edu.cn/simple
# imwrite()
# - cv.IMWRITE_PNG_COMPRESSION: default 3
# - cv.IMWRITE_JPEG_QUALITY: default 95
import shutil
import sys
from pathlib import Path

import cv2 as cv
from pyomniunwarp import OmniUnwarp

suffix_set = set(".avi,.mp4,.MOV,.mkv".split(","))


[docs]def find_videos(input_dir): video_paths = [] for f in sorted(Path(input_dir).glob("**/*")): if f.suffix in suffix_set: video_paths.append(f.as_posix()) return video_paths
[docs]def to_frames(video_path, output_dir, fps, cal_file, version, mode, fov, rois, format): cap = cv.VideoCapture(video_path) cap_fps = int(cap.get(cv.CAP_PROP_FPS)) frame_count = int(cap.get(cv.CAP_PROP_FRAME_COUNT)) step_size = max(1, cap_fps // fps) if cal_file is not None: kwargs = { "calib_results_path": cal_file, "version": version, "mode": mode, "FOV": fov, } unwarper = OmniUnwarp(**kwargs) else: unwarper = None prefix = Path(video_path).stem index = 0 while index < frame_count: ret, frame = cap.read() if not ret: print("Can't receive frame (stream end?). Exiting ...") break a, b = divmod(index, cap_fps) index += 1 if (b % step_size) != 0: continue if unwarper is None: filename = f"data/{prefix}_time_{a:06d}_{b:03d}{format}" cv.imwrite(str(output_dir / filename), frame) else: imgs, masks, labels = unwarper.rectify(frame) for img, mask, label in zip(imgs, masks, labels): if label in rois: filename = f"data/{prefix}_time_{a:06d}_{b:03d}_roi_{label}{format}" cv.imwrite(str(output_dir / filename), img) filename = f"mask/{prefix}_time_{a:06d}_{b:03d}_roi_{label}.png" cv.imwrite(str(output_dir / filename), mask) cap.release()
[docs]def func(input_dir, output_dir, fps, cal_file, version, mode, fov, rois, format): input_dir = Path(input_dir) output_dir = Path(output_dir) shutil.rmtree(output_dir, ignore_errors=True) (output_dir / "data").mkdir(parents=True, exist_ok=False) (output_dir / "mask").mkdir(parents=True, exist_ok=False) if cal_file is not None: if not Path(cal_file).is_file(): cal_file = str(input_dir / cal_file) if not Path(cal_file).is_file(): cal_file = None for video_path in find_videos(input_dir): to_frames(video_path, output_dir, fps, cal_file, version, mode, fov, rois, format) return f"\n[OUTDIR]\n{output_dir}"
[docs]def parse_args(args=None): from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument("input_dir", type=str, help="input dir") parser.add_argument("output_dir", type=str, help="output dir") parser.add_argument("--fps", type=int, default=5, help="sample the frames") parser.add_argument("--cal_file", type=str, default=None, help="calibrated model file path") parser.add_argument("--version", type=str, default="0.2.2", help="set the kernel version") parser.add_argument("--mode", type=str, default="cuboid", help="set the unwarp mode") parser.add_argument("--fov", type=int, default=90, help="set the fov") parser.add_argument("--rois", type=str, nargs="+", default=["front", "left", "right", "front-left", "front-right"], choices=["front", "left", "back", "right", "front-left", "front-right"]) parser.add_argument("--format", type=str, default=".jpg", choices=[".png", ".jpg"]) args = parser.parse_args(args=args) return vars(args)
[docs]def main(args=None): kwargs = parse_args(args) print(f"{__file__}: {kwargs}") print(func(**kwargs)) return 0
if __name__ == "__main__": sys.exit(main())